Bdb packages | Design docs | Source docs | Guidelines | Recent releases

Search | Site Map .

Main Page   Modules   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

/BdbTransfer/BdbPTChecksum.cc

Go to the documentation of this file.
00001 //------------------------------------------------------------------------------
00002 // File and Version Information:
00003 //      $Id: BdbPTChecksum.cc,v 1.8 1999/07/27 00:03:53 svarovsk Exp $
00004 //
00005 // Description:
00006 //      fast checksumming tool.
00007 //      uses multiple threads and various buffer sizes.
00008 //      optimal number of threads and size of buffer may vary on
00009 //      different systems. run tests to determine them.
00010 //
00011 // Environment:
00012 //      Software developed for the BaBar Detector at the SLAC B-Factory
00013 //
00014 // Author:
00015 //      Gennadi S. Svarovski (svarovsk@slac.stanford.edu)
00016 //          original author
00017 //
00018 // Copyright Information:
00019 //      Copyright (C) 1999      Stanford Linear Accelerator Center
00020 //
00021 //------------------------------------------------------------------------------
00022 
00023 #include "BaBar/BaBar.hh"
00024 #include <unistd.h>
00025 #include <sys/types.h>
00026 #include <sys/stat.h>
00027 #include <fcntl.h>
00028 #include <stdio.h>
00029 #include <stdlib.h>
00030 #include <assert.h>
00031 
00032 #include "BdbTransfer/BdbPTosSpecific.h"
00033 #include "BdbTransfer/BdbPThread.h"
00034 #include "BdbTransfer/BdbPTMutex.h"
00035 #include "BdbTransfer/BdbPTChecksumSum.h"
00036 #include "BdbTransfer/BdbPTVerbose.h"
00037 
00038 static char cvsid[]="$Id: BdbPTChecksum.cc,v 1.8 1999/07/27 00:03:53 svarovsk Exp $";
00039 
00040 // these 2 variables are used by threads and main function
00041 // array to hold checksums of all blocks
00042 static BdbPTChecksumSum *checksums;
00043 // needed for arithmetics
00044 static unsigned extrashift = 0;
00045 
00046 // class that does the job
00047 class BdbPTChecksum : public BdbPThread {
00048     private:
00049         // arguments
00050         int num, total, started;
00051         int fd;
00052         const char *filename;
00053         ssize_t bufsize;
00054 
00055         // attribute for threads creation
00056         BdbPThreadAttr Attr;
00057 
00058     private:
00059         // mutex to synchronize access to file
00060         static BdbPTMutex& fileMutex();
00061 
00062     public:
00063         // default constructor
00064         BdbPTChecksum( );
00065         // seeting arguments for call
00066         int start( int num, int total, int started, int fd, 
00067                    const char* filename, ssize_t bufsize);
00068         
00069     private:
00070         virtual void* run( void );
00071 };
00072 
00073 // mutex to synchronize access to file between threads
00074 BdbPTMutex& BdbPTChecksum::fileMutex() {
00075     static BdbPTMutex mutex;
00076     return mutex;
00077 }
00078 
00079 BdbPTChecksum::BdbPTChecksum( )
00080     : num( 0 ), total( 0 ), started( 0 ), fd( 0 ), filename( NULL ), bufsize( 0 )
00081 {
00082     // make system schedule threads as separate processes
00083     Attr.setscope( PTHREAD_SCOPE_SYSTEM );
00084 }
00085 
00086 int BdbPTChecksum::start( int _num, int _total, int _started, int _fd,
00087                           const char* _filename, ssize_t _bufsize) {
00088     num      = _num;
00089     total    = _total;
00090     started  = _started;
00091     fd       = _fd;
00092     filename = _filename;
00093     bufsize  = _bufsize;
00094 
00095     return BdbPThread::start( Attr );
00096 }
00097 
00098 void* BdbPTChecksum::run( void ) {
00099     off_t offset;
00100     ssize_t len;
00101     unsigned i, blocknum;
00102     char *buffer;
00103     
00104     buffer = new char [bufsize];
00105     assert( buffer != NULL );
00106 
00107     // main loop across all blocks for this thread
00108     for( blocknum = num; blocknum < total; blocknum += started ) {
00109         offset = (off_t)bufsize * blocknum;
00110 
00111         VERBOSEMSG( 3, ("running ( thread=%d, block=%d, file=%s, size=%d )\n", 
00112                         num, blocknum, filename, bufsize) );
00113 
00114         // sum calculation
00115         fileMutex().lock();
00116         lseek( fd, offset, SEEK_SET );
00117         len = read( fd, buffer, bufsize );
00118         fileMutex().unlock();
00119 
00120         if( len < 0 ) {
00121             perror( "pread" );
00122             ::exit(1);
00123         }
00124 
00125         checksums[blocknum] = BdbPTChecksumSum::calculate( buffer, len, &extrashift );
00126         VERBOSEMSG( 2, ( "blockdone ( block=%d, checksum=%08X )\n", 
00127                          blocknum, checksums[blocknum].value() ) );
00128     }    
00129 
00130     VERBOSEMSG( 3, ( "finished ( thread=%d )\n", num ) );
00131 
00132     delete[] buffer;
00133 
00134     return NULL;
00135 }    
00136 
00137 void usage( void ) {
00138     printf("Usage: BdbPTChecksum [-v] [-t <threadnum>] [-b <buffersize>] <filename>\n");
00139     exit(1);
00140 }
00141 
00142 int main( int argc, char *argv[] ) {
00143     char options[] = "vt:b:h?";
00144     int o;
00145     char *filename;
00146     unsigned bufsize = 1024*1024, threadnum = 2, threadstart;
00147     unsigned extrablock;
00148     int fd;
00149     unsigned long count, i, blocks, extrablocksize;
00150     struct stat stat;
00151     
00152     if( argc < 2 ) usage();
00153 
00154     // command-line options processing
00155     while( ( o = getopt( argc, argv, options ) ) != EOF ) {
00156         switch ( o ) {
00157             case 'v':
00158                 VERBOSEINCREASE();
00159                 break;
00160             case 't':
00161                 threadnum = atoi( optarg );
00162                 break;
00163             case 'b':
00164                 bufsize   = atoi( optarg );
00165                 break;
00166             default:
00167                 usage();
00168         }
00169     }
00170 
00171     // must have at least one argument - filename
00172     if( optind >= argc ) usage();
00173     
00174     filename = argv[optind];
00175 
00176     if( threadnum < 1 ) {
00177         printf( "illegal thread number %d. must be at least 1\n", threadnum );
00178         exit(1);
00179     }
00180 
00181     // requirement for checksumming algorithm
00182     if( bufsize % 128 != 0 || bufsize < 1 ) {
00183         printf("buffersize must be more than 1 and multiply of 128\n");
00184         exit(1);
00185     }
00186 
00187     VERBOSEMSG( 2, ( "Called with ( threadnum=%d, file=%s, size=%d )\n", 
00188                      threadnum, filename, bufsize ) );
00189 
00190     if( ( fd = open( filename, O_RDONLY ) ) == -1 ) {
00191         perror( filename );
00192         exit(1);
00193     }
00194 
00195     if( fstat( fd, &stat ) == -1 ) {
00196         perror( "fstat" );
00197         exit(1);
00198     }
00199 
00200     // need to know how many full blocks in file 
00201     // and if there is a non-full block at the end
00202     extrablock = 0;
00203     blocks = stat.st_size / bufsize;
00204     extrablocksize = stat.st_size % bufsize;
00205     if( extrablocksize != 0 )
00206         extrablock = 1;
00207 
00208     checksums = new BdbPTChecksumSum [ blocks+extrablock ];
00209     assert( checksums != NULL );
00210 
00211     // start minimum of requested thread number and number of blocks threads
00212     threadstart = ( threadnum < blocks+extrablock ) ? threadnum : blocks+extrablock;
00213     VERBOSEMSG( 2, ( "need to run %ld blocks, %d threads. will run %d at a time\n", 
00214                      blocks + extrablock, threadnum, threadstart ) );
00215 
00216     BdbPTChecksum* Threads = new BdbPTChecksum [threadstart];
00217 
00218     // loop to start all threads
00219     for( i = 0; i < threadstart; i++ )
00220         Threads[i].start( i, blocks+extrablock, threadstart, fd, filename, bufsize);
00221 
00222     // loop to wait all them finish
00223     for( i = 0; i < threadstart; i++ )
00224         Threads[i].join(NULL);
00225 
00226     delete[] Threads;
00227 
00228     // all done with threads. now need to join all subsums together
00229 
00230     // adding full block sums together
00231     // no shift needed since blocksize is multiple of 128
00232     BdbPTChecksumSum sum;
00233     for( i = 0; i < blocks; i++ ) {
00234         sum += checksums[i];
00235         VERBOSEMSG( 3, ( "adding block %d checksum %08X (result: %08X)\n", 
00236                          i, checksums[i].value(), sum.value() ) );
00237     }
00238     
00239     // adding checksum for last (not full) block - shift required
00240     // thread that processed last block 
00241     // was supposed to initialize 'extrashift' variable
00242     if( extrablock ) {
00243         // non-trivial assumption on how many shifts we need :)
00244         int shiftnumber = (extrablocksize/4+extrashift) % 32;
00245         VERBOSEMSG( 3, ( "shifting %08X %d times", sum.value(), shiftnumber ) );
00246 
00247         sum.rotate( shiftnumber );
00248         VERBOSEMSG( 3, ( " (result %08X)\n", sum.value() ) );
00249 
00250         // adding last checksum at last
00251         sum += checksums[blocks];
00252         VERBOSEMSG( 3, ( "adding block %d checksum %08X (result: %08X)\n",
00253                     blocks, checksums[blocks].value(), sum.value() ) );
00254     }
00255 
00256     delete[] checksums;
00257 
00258     printf( "%ld+%d blocks of file %s read\n", 
00259             blocks, extrablock, filename );
00260 
00261     printf( "%08X\n", sum.value() );
00262 
00263     return 0;
00264 }

 


BaBar Public Site | SLAC | News | Links | Who's Who | Contact Us

Page Owner: Jacek Becla
Last Update: October 04, 2002