Merging upstream version 1.12~rc1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
4ddb634c25
commit
cd6a248630
24 changed files with 874 additions and 719 deletions
83
compress.cc
83
compress.cc
|
@ -112,7 +112,6 @@ void xlock( pthread_mutex_t * const mutex )
|
|||
{ show_error( "pthread_mutex_lock", errcode ); cleanup_and_fail(); }
|
||||
}
|
||||
|
||||
|
||||
void xunlock( pthread_mutex_t * const mutex )
|
||||
{
|
||||
const int errcode = pthread_mutex_unlock( mutex );
|
||||
|
@ -158,7 +157,7 @@ struct Packet // data block with a serial number
|
|||
int size; // number of bytes in data (if any)
|
||||
unsigned id; // serial number assigned as received
|
||||
Packet() : data( 0 ), size( 0 ), id( 0 ) {}
|
||||
void init( uint8_t * const d, const int s, const unsigned i )
|
||||
void assign( uint8_t * const d, const int s, const unsigned i )
|
||||
{ data = d; size = s; id = i; }
|
||||
};
|
||||
|
||||
|
@ -176,7 +175,7 @@ private:
|
|||
unsigned deliver_id; // id of next packet to be delivered
|
||||
Slot_tally slot_tally; // limits the number of input packets
|
||||
std::vector< Packet > circular_ibuffer;
|
||||
std::vector< const Packet * > circular_obuffer;
|
||||
std::vector< const Packet * > circular_obuffer; // pointers to ibuffer
|
||||
int num_working; // number of workers still running
|
||||
const int num_slots; // max packets in circulation
|
||||
pthread_mutex_t imutex;
|
||||
|
@ -212,7 +211,7 @@ public:
|
|||
{
|
||||
slot_tally.get_slot(); // wait for a free slot
|
||||
xlock( &imutex );
|
||||
circular_ibuffer[receive_id % num_slots].init( data, size, receive_id );
|
||||
circular_ibuffer[receive_id % num_slots].assign( data, size, receive_id );
|
||||
++receive_id;
|
||||
xsignal( &iav_or_eof );
|
||||
xunlock( &imutex );
|
||||
|
@ -221,7 +220,6 @@ public:
|
|||
// distribute a packet to a worker
|
||||
Packet * distribute_packet()
|
||||
{
|
||||
Packet * ipacket = 0;
|
||||
xlock( &imutex );
|
||||
++icheck_counter;
|
||||
while( receive_id == distrib_id && !eof ) // no packets to distribute
|
||||
|
@ -230,15 +228,13 @@ public:
|
|||
xwait( &iav_or_eof, &imutex );
|
||||
}
|
||||
if( receive_id != distrib_id )
|
||||
{ ipacket = &circular_ibuffer[distrib_id % num_slots]; ++distrib_id; }
|
||||
{ Packet * ipacket = &circular_ibuffer[distrib_id % num_slots];
|
||||
++distrib_id; xunlock( &imutex ); return ipacket; }
|
||||
xunlock( &imutex );
|
||||
if( !ipacket ) // EOF
|
||||
{
|
||||
xlock( &omutex ); // notify muxer when last worker exits
|
||||
if( --num_working == 0 ) xsignal( &oav_or_exit );
|
||||
xunlock( &omutex );
|
||||
}
|
||||
return ipacket;
|
||||
xlock( &omutex ); // notify muxer when last worker exits
|
||||
if( --num_working == 0 ) xsignal( &oav_or_exit );
|
||||
xunlock( &omutex );
|
||||
return 0; // EOF
|
||||
}
|
||||
|
||||
// collect a packet from a worker
|
||||
|
@ -307,30 +303,38 @@ public:
|
|||
|
||||
struct Worker_arg
|
||||
{
|
||||
Packet_courier * courier;
|
||||
const Pretty_print * pp;
|
||||
int dictionary_size;
|
||||
int match_len_limit;
|
||||
int offset;
|
||||
Packet_courier & courier;
|
||||
const Pretty_print & pp;
|
||||
const int dictionary_size;
|
||||
const int match_len_limit;
|
||||
const int offset;
|
||||
Worker_arg( Packet_courier & co, const Pretty_print & pp_, const int dis,
|
||||
const int mll, const int off )
|
||||
: courier( co ), pp( pp_ ), dictionary_size( dis ),
|
||||
match_len_limit( mll ), offset( off ) {}
|
||||
};
|
||||
|
||||
struct Splitter_arg
|
||||
{
|
||||
struct Worker_arg worker_arg;
|
||||
pthread_t * worker_threads;
|
||||
int infd;
|
||||
int data_size;
|
||||
Worker_arg worker_arg;
|
||||
pthread_t * const worker_threads;
|
||||
const int data_size;
|
||||
const int infd;
|
||||
int num_workers; // returned by splitter to main thread
|
||||
Splitter_arg( Packet_courier & co, const Pretty_print & pp_, const int dis,
|
||||
const int mll, const int off, pthread_t * wt, const int das,
|
||||
const int ifd, const int nw )
|
||||
: worker_arg( co, pp_, dis, mll, off ), worker_threads( wt ),
|
||||
data_size( das ), infd( ifd ), num_workers( nw ) {}
|
||||
};
|
||||
|
||||
|
||||
/* Get packets from courier, replace their contents, and return them to
|
||||
courier. */
|
||||
// get packets from courier, replace their contents, and return them to courier
|
||||
extern "C" void * cworker( void * arg )
|
||||
{
|
||||
const Worker_arg & tmp = *(const Worker_arg *)arg;
|
||||
Packet_courier & courier = *tmp.courier;
|
||||
const Pretty_print & pp = *tmp.pp;
|
||||
Packet_courier & courier = tmp.courier;
|
||||
const Pretty_print & pp = tmp.pp;
|
||||
const int dictionary_size = tmp.dictionary_size;
|
||||
const int match_len_limit = tmp.match_len_limit;
|
||||
const int offset = tmp.offset;
|
||||
|
@ -407,8 +411,8 @@ extern "C" void * cworker( void * arg )
|
|||
extern "C" void * csplitter( void * arg )
|
||||
{
|
||||
Splitter_arg & tmp = *(Splitter_arg *)arg;
|
||||
Packet_courier & courier = *tmp.worker_arg.courier;
|
||||
const Pretty_print & pp = *tmp.worker_arg.pp;
|
||||
Packet_courier & courier = tmp.worker_arg.courier;
|
||||
const Pretty_print & pp = tmp.worker_arg.pp;
|
||||
pthread_t * const worker_threads = tmp.worker_threads;
|
||||
const int offset = tmp.worker_arg.offset;
|
||||
const int infd = tmp.infd;
|
||||
|
@ -436,11 +440,7 @@ extern "C" void * csplitter( void * arg )
|
|||
}
|
||||
if( size < data_size ) break; // EOF
|
||||
}
|
||||
else
|
||||
{
|
||||
delete[] data;
|
||||
break;
|
||||
}
|
||||
else { delete[] data; break; }
|
||||
}
|
||||
courier.finish( tmp.num_workers - i ); // no more packets to send
|
||||
tmp.num_workers = i;
|
||||
|
@ -465,7 +465,7 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
|
|||
out_size += opacket->size;
|
||||
|
||||
if( writeblock( outfd, opacket->data, opacket->size ) != opacket->size )
|
||||
{ pp(); show_error( "Write error", errno ); cleanup_and_fail(); }
|
||||
{ pp(); show_error( write_error_msg, errno ); cleanup_and_fail(); }
|
||||
delete[] opacket->data;
|
||||
courier.return_empty_packet();
|
||||
}
|
||||
|
@ -475,8 +475,7 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
|
|||
} // end namespace
|
||||
|
||||
|
||||
/* Init the courier, then start the splitter and the workers and call the
|
||||
muxer. */
|
||||
// init the courier, then start the splitter and the workers and call the muxer
|
||||
int compress( const unsigned long long cfile_size,
|
||||
const int data_size, const int dictionary_size,
|
||||
const int match_len_limit, const int num_workers,
|
||||
|
@ -496,16 +495,8 @@ int compress( const unsigned long long cfile_size,
|
|||
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
|
||||
if( !worker_threads ) { pp( mem_msg ); return 1; }
|
||||
|
||||
Splitter_arg splitter_arg;
|
||||
splitter_arg.worker_arg.courier = &courier;
|
||||
splitter_arg.worker_arg.pp = &pp;
|
||||
splitter_arg.worker_arg.dictionary_size = dictionary_size;
|
||||
splitter_arg.worker_arg.match_len_limit = match_len_limit;
|
||||
splitter_arg.worker_arg.offset = offset;
|
||||
splitter_arg.worker_threads = worker_threads;
|
||||
splitter_arg.infd = infd;
|
||||
splitter_arg.data_size = data_size;
|
||||
splitter_arg.num_workers = num_workers;
|
||||
Splitter_arg splitter_arg( courier, pp, dictionary_size, match_len_limit,
|
||||
offset, worker_threads, data_size, infd, num_workers );
|
||||
|
||||
pthread_t splitter_thread;
|
||||
int errcode = pthread_create( &splitter_thread, 0, csplitter, &splitter_arg );
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue