rambrain
managedFileSwap.cpp
Go to the documentation of this file.
1 /* rambrain - a dynamical physical memory extender
2  * Copyright (C) 2015 M. Imgrund, A. Arth
3  * mimgrund (at) mpifr-bonn.mpg.de
4  * arth (at) usm.uni-muenchen.de
5  *
6  * This program is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "managedFileSwap.h"
21 #include "common.h"
22 #include <unistd.h>
23 #include <string.h>
24 #include <sys/signal.h>
25 #include <sys/stat.h>
26 #include "exceptions.h"
27 #include "managedMemory.h"
28 #include <libaio.h>
29 #include <signal.h>
30 #include <sys/types.h>
31 #include <fcntl.h>
32 #include <sys/ioctl.h>
33 #include <mm_malloc.h>
34 #include <sys/statvfs.h>
35 #include <iostream>
36 #include <limits>
37 #ifndef OpenMP_NOT_FOUND
38 #include <omp.h>
39 #endif
40 namespace rambrain
41 {
42 
43 //#define DBG_AIO
44 managedFileSwap::managedFileSwap ( global_bytesize size, const char *filemask, global_bytesize oneFile, bool enableDMA ) : managedSwap ( size ), pageSize ( sysconf ( _SC_PAGE_SIZE ) )
45 {
46  setDMA ( enableDMA );
47  if ( oneFile == 0 ) { // Layout this on your own:
48 
49  global_bytesize myg = size / 16;
50  oneFile = min ( 4 * gig, myg );
51  oneFile = max ( mib, oneFile );
52  }
53 
54  oneFile += oneFile % memoryAlignment == 0 ? 0 : memoryAlignment - ( oneFile % memoryAlignment );
55  pageFileSize = oneFile;
56  if ( size % oneFile != 0 ) {
57  pageFileNumber = size / oneFile + 1;
58  } else {
59  pageFileNumber = size / oneFile;
60  }
61 
62  //initialize inherited members:
63  swapSize = pageFileNumber * oneFile;
64  swapUsed = 0;
66 
67  //copy filemask:
68  this->filemask = ( char * ) malloc ( sizeof ( char ) * ( strlen ( filemask ) + 1 ) );
69  strcpy ( ( char * ) this->filemask, filemask );
70 
71 
72  swapFiles = NULL;
73  if ( !openSwapFiles() ) {
74  throw memoryException ( "Could not create swap files" );
75  }
76 
77  //Initialize swapmalloc:
78  for ( unsigned int n = 0; n < pageFileNumber; n++ ) {
79  pageFileLocation *pfloc = new pageFileLocation ( n, 0, pageFileSize );
80  free_space[n * pageFileSize] = pfloc;
81  all_space[n * pageFileSize] = pfloc;
82  }
83 
84  instance = this;
85 #ifdef SWAPSTATS
86  signal ( SIGUSR2, managedFileSwap::sigStat );
87 #endif
88 
89  aio_eventarr = ( struct io_event * ) malloc ( sizeof ( struct io_event ) * aio_max_transactions );
90  memset ( aio_eventarr, 0, sizeof ( struct io_event ) *aio_max_transactions );
91  int ioSetupErr = io_setup ( aio_max_transactions, &aio_context );
92  if ( 0 != ioSetupErr ) {
93  throw ( memoryException ( "Could not initialize aio!" ) );
94  }
95  memset ( &aio_template, 0, sizeof ( aio_template ) );
96  aio_template.aio_reqprio = 0;
97 
98 #ifndef OpenMP_NOT_FOUND
99  io_submit_num_threads = omp_get_max_threads() / 2; //chosen by fair dice roll...
100 #endif
101 
102 
103 
104  io_submit_threads = ( pthread_t * ) malloc ( sizeof ( pthread_t ) * io_submit_num_threads );
105  for ( unsigned int n = 0; n < io_submit_num_threads; ++n )
106  if ( pthread_create ( io_submit_threads + n, NULL, &io_submit_worker, this ) ) {
107  throw memoryException ( "Could not create worker threads for aio" );
108  }
109 
110  pthread_create ( &io_arrive_thread, NULL, &io_arrrive_worker, this );
111 }
112 
114 {
115  close();
116 }
117 
118 
120 {
121  if ( swapFiles ) {
122  for ( unsigned int n = 0; n < pageFileNumber; ++n ) {
123  ::close ( swapFiles[n].fileno );
124  }
125  free ( swapFiles );
126  }
127 
128 
129  for ( unsigned int n = 0; n < pageFileNumber; ++n ) {
130  char fname[1024];
131  snprintf ( fname, 1024, filemask, getpid(), n );
132  unlink ( fname );
133  }
134 
135 }
136 
137 void managedFileSwap::setDMA ( bool arg1 )
138 {
139  enableDMA = arg1;
140  memoryAlignment = arg1 ? 512 : 1; //Dynamical detection is tricky if not impossible to solve in general
141 }
142 
144 {
145  if ( !closed ) {
146  free ( aio_eventarr );
147  closeSwapFiles();
148  free ( ( void * ) filemask );
149  if ( all_space.size() > 0 ) {
150  std::map<global_offset, pageFileLocation *>::iterator it = all_space.begin();
151  do {
152  delete it->second;
153  } while ( ++it != all_space.end() );
154  }
155  //Kill worker threads by issing suicidal command:
156  for ( unsigned int n = 0; n < io_submit_num_threads; ++n ) {
157  my_io_submit ( NULL );
158  }
159  io_arrive_work = false;
160  for ( unsigned int n = 0; n < io_submit_num_threads; ++n ) {
161  pthread_join ( io_submit_threads[n], NULL );
162  }
163  pthread_join ( io_arrive_thread, NULL );
164  free ( io_submit_threads );
165  io_destroy ( aio_context );
166  }
167 
168  closed = true;
169 }
170 
171 bool managedFileSwap::openSwapFileRange ( unsigned int start, unsigned int stop )
172 {
173  for ( unsigned int n = start; n < stop; ++n ) {
174  char fname[1024];
175  snprintf ( fname, 1024, filemask, getpid(), n );
176  swapFiles[n].fileno = open ( fname, O_RDWR | O_TRUNC | O_CREAT | ( enableDMA ? O_DIRECT : 0 << 0 ), S_IRUSR | S_IWUSR );
177  if ( swapFiles[n].fileno == -1 ) {
178  if ( errno == EINVAL && n == 0 && enableDMA ) { //Probably happens because we have O_DIRECT set even though file system does not support this...
179  warnmsg ( "Could not open first swapfile. Probably DMA is not supported on underlying filesystem. Trying again without dma" )
180  setDMA ( false );
181  return false;
182  }
183  errmsgf ( "Encountered error code %d when opening file %s\n", errno, fname );
184  throw memoryException ( "Could not open swap file." );
185  return false;
186  }
187  swapFiles[n].currentSize = 0;
188  }
189  return true;
190 }
191 
193 {
194  if ( swapFiles ) {
195  throw memoryException ( "Swap files already opened. Close first" );
196  return false;
197  }
198  swapFiles = ( struct swapFileDesc * ) malloc ( sizeof ( struct swapFileDesc ) * pageFileNumber );
199 
200  if ( !openSwapFileRange ( 0, pageFileNumber ) ) // DMA may fail on first / we recover in openSwapFileRange
201  if ( !openSwapFileRange ( 0, pageFileNumber ) ) { // if we did not recover, fail...
202  return false;
203  }
204  return true;
205 }
206 
208 {
209 
210  global_bytesize extendby = 0;
211  global_bytesize freeondisk = 0;
212  global_bytesize needed = min_size + ( min_size % pageFileSize == 0 ? 0 : ( pageFileSize - min_size % pageFileSize ) );
213  switch ( policy ) {
214  case swapPolicy::fixed:
215  return false;
216  break;
218  //Check for still free space at swapfile location:
219  freeondisk = getFreeDiskSpace();
220  if ( freeondisk < needed ) {
221  return false;
222  }
223 
224  warnmsgf ( "Extending possible swap space by %lu MB ( %lu MB left on hdd)\n", pageFileSize / mib, freeondisk / mib );
225  extendby = needed;
226  break;
228  freeondisk = getFreeDiskSpace();
229  while ( ( extendby < needed || extendby > freeondisk ) ) {
230 
231  warnmsgf ( "I am out of swap.\n\tI can increase in steps of ~%luMB\n\tWe need at least %lu steps, possible steps until disk is full: %lu \n\t please type in an integer number and press enter", pageFileSize / mib, needed / pageFileSize, freeondisk / pageFileSize );
232  do {
233  std::cin >> extendby;
234  if ( !std::cin.fail() ) {
235  break;
236  }
237  errmsg ( "I don't feel like this being an integer." );
238  std::cin.clear();
239  std::cin.ignore ( std::numeric_limits<streamsize>::max(), '\n' );
240  } while ( true );
241 
242  //again check disk free space as this may have changed due to user interaction:
243  freeondisk = getFreeDiskSpace();
244  extendby *= pageFileSize;
245  if ( extendby > freeondisk ) {
246  errmsg ( "You want to assign more disk space than you have." );
247  }
248  if ( extendby < needed ) {
249  errmsg ( "You want to assign less than we need." );
250  }
251  }
252  break;
253 
254  }
255  //Extend by extendby:
256  if ( !extendSwap ( extendby ) ) {
257  return false;
258  }
259  return true;
260 }
261 
263 {
264  size += size % pageFileSize == 0 ? 0 : pageFileSize - ( size % pageFileSize );
265  if ( size > getFreeDiskSpace() ) {
266  return false;
267  }
268  unsigned int oldpn = pageFileNumber;
269  pageFileNumber += size / pageFileSize;
270  struct swapFileDesc *oldList = swapFiles;
271  swapFiles = ( struct swapFileDesc * ) malloc ( sizeof ( struct swapFileDesc ) * pageFileNumber );
272  memcpy ( swapFiles, oldList, sizeof ( swapFileDesc ) *oldpn );
273  if ( !openSwapFileRange ( oldpn, pageFileNumber ) ) {
274  return false;
275  }
276  for ( unsigned int n = oldpn; n < pageFileNumber; n++ ) {
277  pageFileLocation *pfloc = new pageFileLocation ( n, 0, pageFileSize );
278  free_space[n * pageFileSize] = pfloc;
279  all_space[n * pageFileSize] = pfloc;
280  }
281 
282  swapFree += size;
283  swapSize += size;
284  free ( oldList );
285  return true;
286 }
287 
288 
290 {
291  string directory ( filemask );
292  std::size_t found = directory.find_last_of ( "/" );
293  if ( found == directory.npos ) {
294  directory = ".";
295  } else {
296  directory = directory.substr ( 0, found );
297  }
298  struct statvfs stats;
299  statvfs ( directory.c_str(), &stats );
300  global_bytesize bytesfree = stats.f_bfree * stats.f_bsize;
301  return bytesfree;
302 }
303 
304 
305 
307 {
314  pageFileLocation *found = NULL;
315  std::map<global_offset, pageFileLocation *>::iterator it;
316  if ( free_space.size() == 0 ) {
317  return NULL;
318  }
319  for ( it = free_space.begin(); it != free_space.end(); ++it ) {
320  if ( !it->second ) {
321  printf ( "Tuuuuut!\n" );
322  }
323  if ( it->second->size >= size ) {
324  found = it->second;
325  break;
326  }
327  }
328  pageFileLocation *res = NULL;
329  pageFileLocation *former = NULL;
330  if ( found ) {
331  res = allocInFree ( found, size );
332  res->status = PAGE_END;//Don't forget to set the status of the allocated memory.
333  res->glob_off_next.chunk = chunk;
334  } else { //We need to write out the data in parts.
335 
336 
337  //check for enough space:
338  global_bytesize total_space = 0;
339  it = free_space.begin();
340  do {
341  total_space -= total_space % memoryAlignment; // We have to pad splitted chunks.
342  total_space += it->second->size;
343  } while ( total_space < size && ++it != free_space.end() );
344  if ( total_space < size ) {
345  //Try to free cached elements:
346  global_bytesize bz = size - total_space;
347  if ( cleanupCachedElements ( bz ) ) {
348  total_space += bz;
349  }
350  }
351 
352 
353  if ( total_space >= size ) { //We can concat enough free chunks to satisfy memory requirements
354  it = free_space.begin();
355  while ( true ) {
356  global_bytesize avail_space = it->second->size;
357  global_bytesize alloc_here = min ( avail_space, size );
358  if ( size > alloc_here ) {
359  alloc_here -= alloc_here % memoryAlignment;
360  }
361  auto it2 = it;
362  ++it2;
363  global_offset nextFreeOffset = it2->first;
364  pageFileLocation *neu = allocInFree ( it->second, alloc_here );
365 
366  size -= alloc_here;
367  neu->status = ( size == 0 ? PAGE_END : PAGE_PART );
368 
369  if ( !res ) {
370  res = neu;
371  }
372  if ( former ) {
373  former->glob_off_next.glob_off_next = neu;
374  }
375  if ( size == 0 ) {
376  neu->glob_off_next.chunk = chunk;
377  break;
378  }
379  former = neu;
380  it = free_space.find ( nextFreeOffset );
381  if ( it == free_space.end() ) {
382  neu->status = PAGE_END;
383  break;
384  }
385  };
386  if ( size != 0 ) {
387  pffree ( res );
388  return NULL;
389  }
390  } else {
391  throw memoryException ( "Out of swap space" );
392 
393  }
394 
395  }
396  return res;
397 }
398 
400 {
401  //Hook out the block of free space:
402  global_offset formerfree_off = determineGlobalOffset ( *freeChunk );
403  free_space.erase ( formerfree_off );
404 
405  //We want to allocate a new chunk or use the chunk at hand.
406  global_bytesize padded_size = ( size / memoryAlignment + ( size % memoryAlignment == 0 ? 0 : 1 ) ) * memoryAlignment;
407  if ( padded_size != size ) {
408  if ( padded_size > freeChunk->size ) {
409  return NULL;
410  }
411  swapFree -= padded_size - size;
412  }
413 
414  if ( freeChunk->size - padded_size < sizeof ( pageFileLocation ) ) { //Memory to manage free space exceeds free space (actually more than)
415  //Thus, use the free chunk for your data.
416  swapFree -= freeChunk->size - padded_size; //Account for not mallocable overhead, rest is done by claimUse
417  freeChunk->size = size;
418  return freeChunk;
419  } else {
420  pageFileLocation *neu = new pageFileLocation ( *freeChunk );
421  freeChunk->offset += padded_size;
422  freeChunk->size -= padded_size;
423  freeChunk->glob_off_next.glob_off_next = NULL;
424  global_offset newfreeloc = determineGlobalOffset ( *freeChunk );
425  free_space[newfreeloc] = freeChunk;
426  neu->size = size;
427  all_space[newfreeloc] = freeChunk; //inserts.
428  all_space[formerfree_off] = neu;//overwrites.
429  return neu;
430  }
431 
432 }
433 
435 {
436 
437  bool endIsReached = false;
438  do { //We possibly need multiple frees.
440  endIsReached = ( pagePtr->status == PAGE_END );
441  global_offset goff = determineGlobalOffset ( *pagePtr );
442  auto it = all_space.find ( goff );
443 
444 
445  //Delete possible pending aio_requests
446  //Check whether we're about to be deleted
447 
448  if ( pagePtr->aio_ptr ) { //Pending aio
449  while ( pagePtr->aio_lock != 0 )
450  if ( !checkForAIO() ) {
451  pthread_cond_wait ( &managedMemory::swappingCond, &managedMemory::defaultManager->stateChangeMutex );
452  };
453  }
454 
455 
456 
457 
458  //Check if we have free space before us to merge with:
459  if ( pagePtr->offset != 0 && it != all_space.begin() )
460  if ( ( --it )->second->status == PAGE_FREE ) {
461  //Merge previous free space with this chunk
462  pageFileLocation *prev = it->second;
463  prev->size += pagePtr->size;
464  delete pagePtr;
465  all_space.erase ( goff );
466  pagePtr = prev;
467  }
468  goff = determineGlobalOffset ( *pagePtr );
469  it = all_space.find ( goff );
470  ++it;
471  //Check if we have unusable space after us to reuse:
472  global_offset nextoff = ( it == all_space.end() ? swapSize : determineGlobalOffset ( * ( it->second ) ) );
473  global_bytesize size = nextoff - goff;
474  if ( pagePtr->size != size ) {
475  swapFree += size - pagePtr->size;
476  pagePtr->size = size;
477  };
478 
479  //Check if we have free space after us to merge with;
480 
481  if ( it != all_space.end() && it->second->offset != 0 && it->second->status == PAGE_FREE ) {
482  //We may merge the pageFileLocation after ourselve:
483  global_offset gofffree = determineGlobalOffset ( * ( it->second ) );
484  pagePtr->size += it->second->size;
485  //The second one may go completely:
486  delete it->second;
487  free_space.erase ( gofffree );
488  all_space.erase ( gofffree );
489 
490  }
491 
492  //We are left with our free chunk, lets mark it free (possibly redundant.)
493  pagePtr->status = PAGE_FREE;
494  free_space[goff] = pagePtr;
495  pagePtr = next;
496 
497  } while ( !endIsReached );
498 
499 
500 }
501 
502 
503 //Actual interface:
505 {
506  if ( chunk->swapBuf ) { //Must not be swapped, as read-only access should lead to keeping the swapped out locs for the moment.
507  pageFileLocation *loc = ( pageFileLocation * ) chunk->swapBuf;
508  pffree ( loc );
509  chunk->swapBuf = NULL;
510  if ( !chunk->locPtr ) { //Check if this was a cached element...
511  claimUsageof ( chunk->size, false, false );
512  }
513  }
514 }
515 
517 {
518 #ifdef DBG_AIO
519  printf ( "swapping in chunk %lu\n", chunk->id );
520 #endif
521  void *buf = _mm_malloc ( chunk->size, memoryAlignment );
522  if ( !chunk->swapBuf || chunk->status == MEM_SWAPIN ) {
523  return 0;
524  }
525  if ( buf ) {
526  if ( chunk->status & MEM_ALLOCATED || chunk->status == MEM_SWAPIN ) {
527  return 0; //chunk is available or will become available
528  }
529  chunk->locPtr = buf;
530  claimUsageof ( chunk->size, true, true );
531  chunk->status = MEM_SWAPIN;
532  copyMem ( chunk->locPtr, * ( ( pageFileLocation * ) chunk->swapBuf ) );
533  return chunk->size;
534  } else {
535  return 0;
536  }
537 }
538 
539 global_bytesize managedFileSwap::swapIn ( managedMemoryChunk **chunklist, unsigned int nchunks )
540 {
541  global_bytesize n_swapped = 0;
542  for ( unsigned int n = 0; n < nchunks; ++n ) {
543  n_swapped += swapIn ( chunklist[n] ) ;
544  }
545  return n_swapped;
546 }
547 
549 {
550 #ifdef DBG_AIO
551  printf ( "swapping out chunk %lu\n", chunk->id );
552 #endif
553  if ( chunk->size > swapFree ) {
554  return 0;
555  }
556  if ( chunk->status == MEM_SWAPPED || chunk->status == MEM_SWAPOUT ) {
557  return chunk->size; //chunk is or will be swapped
558  }
559  if ( chunk->swapBuf ) { //We already have a position to store to! (happens when read-only was triggered)
560  //Nothing to do here, we have read the element and swapOut is trivial from our point of view
561 
562 #ifdef DBG_AIO
563  printf ( "chunks is cached, we have no need to schedule: %lu\n", chunk->id );
564 #endif
565  //We may just mark the chunk as swapped out.
566  _mm_free ( chunk->locPtr );
567  chunk->locPtr = NULL;
568  chunk->status = MEM_SWAPPED;
569  claimUsageof ( chunk->size, true, false );//Double booking :-)
570  claimUsageof ( chunk->size, false, true );
572  return chunk->size;
573 
574  } else {
575 #ifdef DBG_AIO
576  printf ( "chunks will be scheduled: %lu\n", chunk->id );
577 #endif
578  pageFileLocation *newAlloced = pfmalloc ( chunk->size, chunk );
579  if ( newAlloced ) {
580  chunk->swapBuf = newAlloced;
581  claimUsageof ( chunk->size, false, true );
583  chunk->status = MEM_SWAPOUT;
584  copyMem ( *newAlloced, chunk->locPtr );
585  return chunk->size;
586  } else {
587  return 0;
588  }
589  }
590  return 0;
591 
592 }
593 
594 global_bytesize managedFileSwap::swapOut ( managedMemoryChunk **chunklist, unsigned int nchunks )
595 {
596  global_bytesize n_swapped = 0;
597  for ( unsigned int n = 0; n < nchunks; ++n ) {
598  n_swapped += swapOut ( chunklist[n] );
599  }
600  return n_swapped;
601 }
602 
603 
605 {
606  return ref.file * pageFileSize + ref.offset;
607 }
609 {
610  unsigned int file = g_offset / pageFileSize;
611  pageFileLocation pfLoc ( file, g_offset - file * pageFileSize, length, PAGE_UNKNOWN_STATE );
612  return pfLoc;
613 }
614 
615 
616 
617 void managedFileSwap::scheduleCopy ( pageFileLocation &ref, void *ramBuf, int *tracker, bool reverse )
618 {
619 
620  //We possibly need to resize swap file:
621  global_bytesize neededSize = ref.size + ref.offset;
622  if ( neededSize > swapFiles[ref.file].currentSize ) { // We need to resize swapFileDesc
624  neededSize = neededSize % ( resizeStep ) == 0 ? neededSize : resizeStep * ( neededSize / resizeStep + 1 );
625 
626  int errcode;
627  if ( 0 != ( errcode = ftruncate ( swapFiles[ref.file].fileno, neededSize ) ) ) {
628  errmsgf ( "Could not resize swap file with error code %d", errcode );
629  throw memoryException ( "Could not resize swap file" );
630  };
631  swapFiles[ref.file].currentSize = neededSize;
632  }
633  ++ ( *tracker );
635  ref.aio_ptr = new struct aiotracker;
636 
637  ref.aio_lock = 1;
638 
639  struct iocb *aio = & ( ref.aio_ptr->aio );
640 
641  ref.aio_ptr->tracker = tracker;
642 #ifdef DBG_AIO
643  reverse ? printf ( "scheduling read\n" ) : printf ( "scheduling write\n" );
644 #endif
645 
646  global_bytesize length = ref.size + ( ref.size % memoryAlignment == 0 ? 0 : memoryAlignment - ref.size % memoryAlignment );
647  int fd = swapFiles[ref.file].fileno;
648  reverse ? io_prep_pread ( aio, fd, ramBuf, length, ref.offset ) : io_prep_pwrite ( aio, fd, ramBuf, length, ref.offset );
649 
650  pendingAios[aio] = &ref;
651  my_io_submit ( aio );
652 
653 }
654 
656 {
657  managedFileSwap *dhis = ( managedFileSwap * ) ptr;
658  do {
659  rambrain_pthread_mutex_lock ( & ( dhis->io_submit_lock ) );
660  while ( dhis->io_submit_requests.size() == 0 ) {
661  pthread_cond_wait ( & ( dhis->io_submit_cond ), & ( dhis->io_submit_lock ) );
662  }
663  struct iocb *aio = dhis->io_submit_requests.front();
664 
665  dhis->io_submit_requests.pop();
666  rambrain_pthread_mutex_unlock ( & ( dhis->io_submit_lock ) );
667  if ( aio == 0 ) {
668  break;
669  }
670  int retcode = -EAGAIN;
671  while ( 1 != ( retcode = io_submit ( dhis->aio_context, 1, &aio ) ) ) {
672  if ( retcode != -EAGAIN ) {
673  throw memoryException ( "Could not enqueue request" );
674  }
675  usleep ( 10 );
676  }
677  } while ( true );
678 
679  return NULL;
680 }
681 
683 {
684  managedFileSwap *dhis = ( managedFileSwap * ) ptr;
685  while ( dhis->io_arrive_work ) {
686  if ( dhis->totalSwapActionsQueued > 0 ) {
687  pthread_mutex_lock ( &managedMemory::stateChangeMutex );
688  dhis->checkForAIO();
689  pthread_mutex_unlock ( &managedMemory::stateChangeMutex );
690  }
691  usleep ( 1000 );
692  }
693  return NULL;
694 
695 }
696 
697 
698 
699 void managedFileSwap::my_io_submit ( struct iocb *aio )
700 {
701  rambrain_pthread_mutex_lock ( &io_submit_lock );
702  io_submit_requests.push ( aio );
703  pthread_cond_signal ( &io_submit_cond );
704  rambrain_pthread_mutex_unlock ( &io_submit_lock );
705 }
706 
707 
709 {
710 #ifdef DBG_AIO
711  printf ( "got a call\n" );
712 #endif
713  while ( ref->status != PAGE_END ) {
714  ref = ref->glob_off_next.glob_off_next;
715  }
716  managedMemoryChunk *chunk = ref->glob_off_next.chunk;
717 #ifdef DBG_AIO
718  printf ( "Working on chunk %lu\n", chunk->id );
719 #endif
720  switch ( chunk->status ) {
721  case MEM_SWAPIN:
722 #ifdef DBG_AIO
723  printf ( "Accounting for a swapin of chunk %lu\n", chunk->id );
724 #endif
725  if ( lock ) {
726  rambrain_pthread_mutex_lock ( &managedMemory::stateChangeMutex );
727  }
728  //if we have a user for this object, protect it from being swapped out again
729  chunk->status = chunk->useCnt == 0 ? MEM_ALLOCATED : MEM_ALLOCATED_INUSE_READ;
730  claimUsageof ( chunk->size, false, false );
732 #ifdef SWAPSTATS
734 #endif
735  if ( lock ) {
736  rambrain_pthread_mutex_unlock ( &managedMemory::stateChangeMutex );
737  }
738  break;
739  case MEM_SWAPOUT:
740 #ifdef DBG_AIO
741  printf ( "Accounting for a swapout\n" );
742 #endif
743  if ( lock ) {
744  rambrain_pthread_mutex_lock ( &managedMemory::stateChangeMutex );
745  }
746  _mm_free ( chunk->locPtr );
747  chunk->locPtr = NULL; // not strictly required.
748  chunk->status = MEM_SWAPPED;
749  claimUsageof ( chunk->size, true, false );
750 #ifdef SWAPSTATS
752 #endif
755  if ( lock ) {
756  rambrain_pthread_mutex_unlock ( &managedMemory::stateChangeMutex );
757  }
758  break;
759  default:
760  throw memoryException ( "AIO Synchronization broken!" );
761  break;
762  }
763 
764 }
765 
766 
768 {
769  //This may be called by different threads. We only want one waiting for aio-arrivals, the others may continue.
770  if ( 0 != pthread_mutex_trylock ( &aioWaiterLock ) ) {
771  return false;
772  }
773 
774  //We're the only thread here. Check if there's still pending requests to wait for:
775  if ( totalSwapActionsQueued == 0 ) { //We do not need to wait as nothing is coming.
776  rambrain_pthread_mutex_unlock ( &aioWaiterLock );
777  return true; //Do not wait in caller for something to arrive.
778  }
779 
780 #ifdef DBG_AIO
781  printf ( "checkForAIO called, we'll wait for an event, got %d in queue\n", totalSwapActionsQueued );
782 #endif
783  //As there's at least one pending transaction, we may wait blocking indefinitely:
784 
785  int no_arrived;
786 tryagain:
787  no_arrived = io_getevents ( aio_context, 0, aio_max_transactions, aio_eventarr, NULL );
788  struct timespec timeout = {0, 100000};
789  if ( no_arrived == 0 ) {
790  rambrain_pthread_mutex_unlock ( & ( managedMemory::stateChangeMutex ) );
791  no_arrived = io_getevents ( aio_context, 1, aio_max_transactions, aio_eventarr, &timeout );
792  rambrain_pthread_mutex_lock ( & ( managedMemory::stateChangeMutex ) );
793  }
794 
795  if ( no_arrived < 0 ) {
796  if ( no_arrived == -EINTR ) { //We've been interrupted by a system call
797  goto tryagain;
798  }
799  rambrain_pthread_mutex_unlock ( &aioWaiterLock );
800  printf ( "We got an error back: %d\n", -no_arrived );
801  throw memoryException ( "AIO Error" );
802 
803  }
804 #ifdef DBG_AIO
805  printf ( "we got %d events\n", no_arrived );
806 #endif
807  for ( int n = 0; n < no_arrived; ++n ) {
808  //Try to find mapping:
809 #ifdef DBG_AIO
810  printf ( "Processing event %d \n", n );
811 #endif
812  auto found = pendingAios.find ( aio_eventarr[n].obj );
813  if ( found != pendingAios.end() ) {
814  pageFileLocation *ref = found->second;
815  pendingAios.erase ( found );
816  //Deal with element:
817  asyncIoArrived ( ref, aio_eventarr + n );
818  }
819 
820  }
821 
822 
823  rambrain_pthread_mutex_unlock ( &aioWaiterLock );
824  return true;
825 
826 };
827 
829 {
830 
831 #ifdef DBG_AIO
832  printf ( "aio: async io arrived, chunk of size %lu\n", ref->size );
833 #endif
834 
835  //Check whether we're about to be deleted, if so, don't touch the element
836  int *tracker = ref->aio_ptr->tracker;
837  //Check if aio was completed:
838 
839 
840  int err = event->res2; //Seems to be that a value of zero here indicates success.
841  if ( err == 0 && event->res == ref->size + ( ref->size % memoryAlignment == 0 ? 0 : memoryAlignment - ref->size % memoryAlignment ) ) { //This part arrived successfully
842  delete ref->aio_ptr;
843  ref->aio_ptr = NULL;
844  ref->aio_lock = 0;
845 
846  int lastval = ( *tracker )--;
847  if ( lastval == 1 ) {
848  completeTransactionOn ( ref , false );
849  delete tracker;
850  }
851 
852  --totalSwapActionsQueued; //Do this at the very last line, as completeTransactionOn() has to be done beforehands.
853 
854  } else {
855 
856  errmsgf ( "We have trouble in chunk %lu, %d ; aio_size %lu, size %lu, transfer size %lu", ref->glob_off_next.chunk->id, err, event->res, ref->size, event->obj->u.c.nbytes );
857  errmsgf ( "file-align %lu, err %d, sizeWritten = %lu", ref->offset % memoryAlignment, err, event->res );
858 
859  throw ( memoryException ( "unknown aio error" ) );
860  }
861 }
862 
863 
864 
865 void managedFileSwap::copyMem ( pageFileLocation &ref, void *ramBuf , bool reverse )
866 {
867  pageFileLocation *cur = &ref;
868  char *cramBuf = ( char * ) ramBuf;
869  global_bytesize offset = 0;
870  int *tracker = new int ( 1 );
872  while ( true ) { //Sift through all pageChunks that have to be read
873  scheduleCopy ( *cur, ( void * ) ( cramBuf + offset ), tracker, reverse );
874  if ( cur->status == PAGE_END ) {//I have completely written this pageChunk.
875  break;
876  }
877  offset += cur->size;
878  cur = cur->glob_off_next.glob_off_next;
879  };
880  unsigned int trval = ( *tracker )--;
882  if ( trval == 1 ) {
883  completeTransactionOn ( cur , false ); //We already call having aquired the lock and know that nothing fatal happens
884  delete tracker;
885  }
886 }
887 
889 
890 void managedFileSwap::sigStat ( int signum )
891 {
892  global_bytesize total_space = instance->swapSize;
894  global_bytesize free_space2 = 0;
895  global_bytesize fractured = 0;
896  global_bytesize partend = 0;
897  auto it = instance->free_space.begin();
898  do {
899  free_space += it->second->size;
900  } while ( ++it != instance->free_space.end() );
901 
902  it = instance->all_space.begin();
903  do {
904  switch ( it->second->status ) {
905  case PAGE_END:
906  partend += it->second->size;
907  break;
908  case PAGE_FREE:
909  free_space2 += it->second->size;
910  break;
911  case PAGE_PART:
912  fractured += it->second->size;
913  break;
914  default:
915  break;
916  }
917  } while ( ++it != instance->all_space.end() );
918 
919  printf ( "%ld\t%ld\t%ld\t%e\t%e\t%s\n", free_space, partend, fractured, ( ( double ) free_space ) / ( partend + fractured + free_space ), ( ( ( double ) ( total_space ) - ( partend + fractured + free_space ) ) / ( total_space ) ), ( free_space == instance->swapFree ? "sane" : "insane" ) );
920 
921 
922 }
923 
925 {
926  //It would be way nicer to do this entirely within the knowledge of managedFileSwap,
927  //However, this is not performant. We will search for chunks that are allocated but have still cached swap space.
928 
929  global_bytesize cleanedUp = 0;
930  auto it = managedMemory::defaultManager->memChunks.begin();
931  while ( ( minimum_size == 0 || cleanedUp < minimum_size ) && it != managedMemory::defaultManager->memChunks.end() ) {
932  managedMemoryChunk *chunk = it->second;
933  if ( chunk->status & MEM_ALLOCATED && chunk->swapBuf != NULL ) { // We may safely delete the pageFileLocation
934  cleanedUp += chunk->size;
935  pffree ( ( pageFileLocation * ) chunk->swapBuf );
936  chunk->swapBuf = NULL;
937  }
938  ++it;
939 
940  }
941  if ( cleanedUp > minimum_size ) {
942  return true;
943  }
944  return false;
945 }
946 
948 {
949  if ( chunk.swapBuf ) {
950  pffree ( ( pageFileLocation * ) chunk.swapBuf );
951  chunk.swapBuf = NULL;
952  }
953 }
954 
955 
956 }
957 
958 
Exception for errors with the memory.
Definition: exceptions.h:87
void scheduleCopy(rambrain::pageFileLocation &ref, void *ramBuf, int *tracker, bool reverse=false)
Schedules an elementary pageFileLocation chunk for copying (in or out)
void completeTransactionOn(rambrain::pageFileLocation *ref, bool lock=true)
called to finish a transaction when all pending aio on a managedMemoryChunk has completed ...
memoryID id
an ID to identify the object in scheduler or elsewhere
static void signalSwappingCond()
signals that a swapping action has completed and memory limits have changed
void * locPtr
pointer to the actual data in RAM
virtual void invalidateCacheFor(managedMemoryChunk &chunk)
tells managedFileSwap that the chunk under consideration might have been changed by user and needs to...
bool cleanupCachedElements(rambrain::global_bytesize minimum_size=0)
throws out cached elements still in ram but also resident on disk. This makes space in situations of ...
void * swapBuf
a place to store additional swapping information
static pthread_cond_t swappingCond
std::unordered_map< struct iocb *, pageFileLocation * > pendingAios
Class that serves as a backend to managedMemory to actual write/read managedMemoryChunks to/from hard...
Definition: managedSwap.h:35
bool openSwapFileRange(unsigned int start, unsigned int stop)
opens certain range of swap files according to settings
managedMemoryChunk * chunk
pthread_mutex_t io_submit_lock
static managedMemory * defaultManager
virtual bool extendSwap(global_bytesize size)
extend swap by size number of bytes
void claimUsageof(global_bytesize bytes, bool rambytes, bool used)
account for memory usage change
Definition: managedSwap.cpp:35
global_bytesize swapUsed
Definition: managedSwap.h:147
An implementation of managedSwap that is capable of kernel asynchronousIO.
void claimTobefreed(global_bytesize bytes, bool tobefreed)
account for future availability of bytes
uint64_t global_bytesize
Definition: common.h:65
unsigned int totalSwapActionsQueued
Definition: managedSwap.h:149
pageFileLocation * allocInFree(pageFileLocation *freeChunk, global_bytesize size)
Helper function for pfmalloc.
virtual void close()
Close the swap if not already closed.
pageFileLocation determinePFLoc(global_offset g_offset, global_bytesize length) const
generate a pageFileLocation object given a global offset and a length of the data. This maps our "virtual" adress space to physical locations in a certain file
void asyncIoArrived(rambrain::pageFileLocation *ref, struct io_event *aio)
deals with a single asynchronous IO event completion
pthread_mutex_t aioWaiterLock
struct aiotracker * aio_ptr
void copyMem(rambrain::pageFileLocation &ref, void *ramBuf, bool reverse=false)
Schedules copying on level of whole managedMemoryChunks and calls scheduleCopy on the assigned parts...
void closeSwapFiles()
closes swap files
bool openSwapFiles()
opens swap files according to settings
pageFileLocation * pfmalloc(rambrain::global_bytesize size, rambrain::managedMemoryChunk *chunk)
Tries to find space in the swapFiles to write out an object of size size and returns first pageFileLo...
global_bytesize currentSize
const global_bytesize gig
Definition: common.h:69
static pthread_mutex_t stateChangeMutex
std::queue< struct iocb * > io_submit_requests
pageFileLocation * glob_off_next
union glob_off_union glob_off_next
manages all managed Chunks of raw memory
tracks page file allocations while objects are preferably written continuous to page file...
std::map< memoryID, managedMemoryChunk * > memChunks
global_offset determineGlobalOffset(const pageFileLocation &ref) const
maps from physical location to "virtual" adress
virtual global_bytesize swapOut(managedMemoryChunk **chunklist, unsigned int nchunks)
Trigger swap out of the chunks pointed to by chunklist.
struct swapFileDesc * swapFiles
struct io_event * aio_eventarr
global_bytesize swapFree
Definition: managedSwap.h:148
void my_io_submit(struct iocb *aio)
static void sigStat(int signum)
returns some statistics. Typically, we will be sensitive to SIGUSR2 if compiled with -DSWAPSTATS=on ...
managedFileSwap(global_bytesize size, const char *filemask, global_bytesize oneFile=0, bool enableDMA=false)
global_bytesize swap_in_bytes
virtual bool checkForAIO()
gives this class the chance to treat incoming aio events
global_bytesize getFreeDiskSpace()
returns free disk space at file system location specified by filemask
static managedFileSwap * instance
structure to handle swap files
std::map< global_offset, pageFileLocation * > free_space
virtual bool extendSwapByPolicy(global_bytesize min_size)
extend swap by policy
global_bytesize size
Size of actual object in bytes.
virtual void swapDelete(managedMemoryChunk *chunk)
Mark chunk as deleted.
unsigned short useCnt
Number of using adhereTos or a possible location for locking the object to changes.
static void * io_submit_worker(void *ptr)
unsigned int io_submit_num_threads
static void * io_arrrive_worker(void *ptr)
global_bytesize swap_out_bytes
datastructure for handling asynchronous events
void pffree(pageFileLocation *pagePtr)
global_bytesize swapSize
Definition: managedSwap.h:146
global_bytesize pageFileSize
std::map< global_offset, pageFileLocation * > all_space
uint64_t global_offset
const global_bytesize mib
Definition: common.h:68
virtual global_bytesize swapIn(managedMemoryChunk **chunklist, unsigned int nchunks)
Trigger swap in of the chunks pointed to by chunklist.