Merging upstream version 1.3~pre1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
f04d94e9dd
commit
e4e17ab53e
17 changed files with 387 additions and 259 deletions
|
@ -47,6 +47,8 @@ struct Packet // data block
|
|||
{
|
||||
uint8_t * data; // data == 0 means end of member
|
||||
int size; // number of bytes in data (if any)
|
||||
explicit Packet( uint8_t * const d = 0, const int s = 0 )
|
||||
: data( d ), size( s ) {}
|
||||
};
|
||||
|
||||
|
||||
|
@ -102,9 +104,7 @@ public:
|
|||
// if data == 0, move to next queue
|
||||
void receive_packet( uint8_t * const data, const int size )
|
||||
{
|
||||
Packet * ipacket = new Packet;
|
||||
ipacket->data = data;
|
||||
ipacket->size = size;
|
||||
Packet * const ipacket = new Packet( data, size );
|
||||
if( data )
|
||||
{ in_size += size; slot_tally.get_slot(); } // wait for a free slot
|
||||
xlock( &imutex );
|
||||
|
@ -185,6 +185,13 @@ public:
|
|||
return opacket;
|
||||
}
|
||||
|
||||
void add_out_size( const unsigned long long partial_out_size )
|
||||
{
|
||||
xlock( &omutex );
|
||||
out_size += partial_out_size;
|
||||
xunlock( &omutex );
|
||||
}
|
||||
|
||||
void finish() // splitter has no more packets to send
|
||||
{
|
||||
xlock( &imutex );
|
||||
|
@ -269,6 +276,7 @@ extern "C" void * dsplitter_s( void * arg )
|
|||
header.version() ); }
|
||||
cleanup_and_fail( 2 );
|
||||
}
|
||||
show_header( header.dictionary_size() );
|
||||
|
||||
unsigned long long partial_member_size = 0;
|
||||
while( true )
|
||||
|
@ -337,22 +345,25 @@ struct Worker_arg
|
|||
Packet_courier * courier;
|
||||
const Pretty_print * pp;
|
||||
int worker_id;
|
||||
bool testing;
|
||||
};
|
||||
|
||||
|
||||
// consume packets from courier, decompress their contents, and
|
||||
// give the produced packets to courier.
|
||||
// consume packets from courier, decompress their contents and,
|
||||
// if not testing, give the produced packets to courier.
|
||||
extern "C" void * dworker_s( void * arg )
|
||||
{
|
||||
const Worker_arg & tmp = *(Worker_arg *)arg;
|
||||
Packet_courier & courier = *tmp.courier;
|
||||
const Pretty_print & pp = *tmp.pp;
|
||||
const int worker_id = tmp.worker_id;
|
||||
const bool testing = tmp.testing;
|
||||
|
||||
uint8_t * new_data = new( std::nothrow ) uint8_t[max_packet_size];
|
||||
LZ_Decoder * const decoder = LZ_decompress_open();
|
||||
if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
|
||||
{ pp( "Not enough memory." ); cleanup_and_fail(); }
|
||||
unsigned long long partial_out_size = 0;
|
||||
int new_pos = 0;
|
||||
bool trailing_garbage_found = false;
|
||||
|
||||
|
@ -391,24 +402,21 @@ extern "C" void * dworker_s( void * arg )
|
|||
if( new_pos == max_packet_size || trailing_garbage_found ||
|
||||
LZ_decompress_finished( decoder ) == 1 )
|
||||
{
|
||||
if( new_pos > 0 ) // make data packet
|
||||
if( !testing && new_pos > 0 ) // make data packet
|
||||
{
|
||||
Packet * opacket = new Packet;
|
||||
opacket->data = new_data;
|
||||
opacket->size = new_pos;
|
||||
Packet * const opacket = new Packet( new_data, new_pos );
|
||||
courier.collect_packet( opacket, worker_id );
|
||||
new_pos = 0;
|
||||
new_data = new( std::nothrow ) uint8_t[max_packet_size];
|
||||
if( !new_data ) { pp( "Not enough memory." ); cleanup_and_fail(); }
|
||||
}
|
||||
partial_out_size += new_pos;
|
||||
new_pos = 0;
|
||||
if( trailing_garbage_found ||
|
||||
LZ_decompress_finished( decoder ) == 1 )
|
||||
{
|
||||
if( !testing ) // end of member token
|
||||
courier.collect_packet( new Packet, worker_id );
|
||||
LZ_decompress_reset( decoder ); // prepare for new member
|
||||
Packet * opacket = new Packet; // end of member token
|
||||
opacket->data = 0;
|
||||
opacket->size = 0;
|
||||
courier.collect_packet( opacket, worker_id );
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -421,6 +429,7 @@ extern "C" void * dworker_s( void * arg )
|
|||
}
|
||||
|
||||
delete[] new_data;
|
||||
courier.add_out_size( partial_out_size );
|
||||
if( LZ_decompress_member_position( decoder ) != 0 )
|
||||
{ pp( "Error, some data remains in decoder." ); cleanup_and_fail(); }
|
||||
if( LZ_decompress_close( decoder ) < 0 )
|
||||
|
@ -435,17 +444,12 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
|
|||
{
|
||||
while( true )
|
||||
{
|
||||
Packet * opacket = courier.deliver_packet();
|
||||
Packet * const opacket = courier.deliver_packet();
|
||||
if( !opacket ) break; // queue is empty. all workers exited
|
||||
|
||||
out_size += opacket->size;
|
||||
|
||||
if( outfd >= 0 )
|
||||
{
|
||||
const int wr = writeblock( outfd, opacket->data, opacket->size );
|
||||
if( wr != opacket->size )
|
||||
{ pp(); show_error( "Write error", errno ); cleanup_and_fail(); }
|
||||
}
|
||||
const int wr = writeblock( outfd, opacket->data, opacket->size );
|
||||
if( wr != opacket->size )
|
||||
{ pp(); show_error( "Write error", errno ); cleanup_and_fail(); }
|
||||
delete[] opacket->data;
|
||||
delete opacket;
|
||||
}
|
||||
|
@ -454,11 +458,10 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
|
|||
} // end namespace
|
||||
|
||||
|
||||
// init the courier, then start the splitter and the workers and
|
||||
// call the muxer.
|
||||
// init the courier, then start the splitter and the workers and,
|
||||
// if not testing, call the muxer.
|
||||
int dec_stream( const int num_workers, const int infd, const int outfd,
|
||||
const Pretty_print & pp, const int debug_level,
|
||||
const bool testing )
|
||||
const Pretty_print & pp, const int debug_level )
|
||||
{
|
||||
const int in_slots_per_worker = 2;
|
||||
const int out_slots = 32;
|
||||
|
@ -487,12 +490,13 @@ int dec_stream( const int num_workers, const int infd, const int outfd,
|
|||
worker_args[i].courier = &courier;
|
||||
worker_args[i].pp = &pp;
|
||||
worker_args[i].worker_id = i;
|
||||
worker_args[i].testing = ( outfd < 0 );
|
||||
errcode = pthread_create( &worker_threads[i], 0, dworker_s, &worker_args[i] );
|
||||
if( errcode )
|
||||
{ show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); }
|
||||
}
|
||||
|
||||
muxer( courier, pp, outfd );
|
||||
if( outfd >= 0 ) muxer( courier, pp, outfd );
|
||||
|
||||
for( int i = num_workers - 1; i >= 0; --i )
|
||||
{
|
||||
|
@ -512,11 +516,11 @@ int dec_stream( const int num_workers, const int infd, const int outfd,
|
|||
(double)out_size / in_size,
|
||||
( 8.0 * in_size ) / out_size,
|
||||
100.0 * ( 1.0 - ( (double)in_size / out_size ) ) );
|
||||
if( verbosity >= 3 )
|
||||
if( verbosity >= 4 )
|
||||
std::fprintf( stderr, "decompressed size %9llu, size %9llu. ",
|
||||
out_size, in_size );
|
||||
|
||||
if( verbosity >= 1 ) std::fprintf( stderr, testing ? "ok\n" : "done\n" );
|
||||
if( verbosity >= 1 ) std::fprintf( stderr, (outfd < 0) ? "ok\n" : "done\n" );
|
||||
|
||||
if( debug_level & 1 )
|
||||
std::fprintf( stderr,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue