1
0
Fork 0

Merging upstream version 0.8.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 03:44:03 +01:00
parent 78a0eaf2b7
commit 211f2dec81
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
15 changed files with 391 additions and 272 deletions

View file

@ -1,6 +1,6 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -46,63 +46,63 @@
#endif
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
void xinit( pthread_mutex_t * const mutex )
{
int errcode = pthread_cond_init( cond, 0 );
const int errcode = pthread_mutex_init( mutex, 0 );
if( errcode ) { show_error( "pthread_mutex_init", errcode ); fatal(); }
}
void xinit( pthread_cond_t * const cond )
{
const int errcode = pthread_cond_init( cond, 0 );
if( errcode ) { show_error( "pthread_cond_init", errcode ); fatal(); }
if( mutex )
{
errcode = pthread_mutex_init( mutex, 0 );
if( errcode ) { show_error( "pthread_mutex_init", errcode ); fatal(); }
}
}
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
void xdestroy( pthread_mutex_t * const mutex )
{
int errcode = pthread_cond_destroy( cond );
const int errcode = pthread_mutex_destroy( mutex );
if( errcode ) { show_error( "pthread_mutex_destroy", errcode ); fatal(); }
}
void xdestroy( pthread_cond_t * const cond )
{
const int errcode = pthread_cond_destroy( cond );
if( errcode ) { show_error( "pthread_cond_destroy", errcode ); fatal(); }
if( mutex )
{
errcode = pthread_mutex_destroy( mutex );
if( errcode ) { show_error( "pthread_mutex_destroy", errcode ); fatal(); }
}
}
void xlock( pthread_mutex_t * mutex )
void xlock( pthread_mutex_t * const mutex )
{
int errcode = pthread_mutex_lock( mutex );
const int errcode = pthread_mutex_lock( mutex );
if( errcode ) { show_error( "pthread_mutex_lock", errcode ); fatal(); }
}
void xunlock( pthread_mutex_t * mutex )
void xunlock( pthread_mutex_t * const mutex )
{
int errcode = pthread_mutex_unlock( mutex );
const int errcode = pthread_mutex_unlock( mutex );
if( errcode ) { show_error( "pthread_mutex_unlock", errcode ); fatal(); }
}
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
void xwait( pthread_cond_t * const cond, pthread_mutex_t * const mutex )
{
int errcode = pthread_cond_wait( cond, mutex );
const int errcode = pthread_cond_wait( cond, mutex );
if( errcode ) { show_error( "pthread_cond_wait", errcode ); fatal(); }
}
void xsignal( pthread_cond_t * cond )
void xsignal( pthread_cond_t * const cond )
{
int errcode = pthread_cond_signal( cond );
const int errcode = pthread_cond_signal( cond );
if( errcode ) { show_error( "pthread_cond_signal", errcode ); fatal(); }
}
void xbroadcast( pthread_cond_t * cond )
void xbroadcast( pthread_cond_t * const cond )
{
int errcode = pthread_cond_broadcast( cond );
const int errcode = pthread_cond_broadcast( cond );
if( errcode ) { show_error( "pthread_cond_broadcast", errcode ); fatal(); }
}
@ -142,17 +142,26 @@ private:
pthread_cond_t oav_or_exit; // output packet available or all workers exited
bool eof; // splitter done
Packet_courier( const Packet_courier & ); // declared as private
void operator=( const Packet_courier & ); // declared as private
public:
Packet_courier( const int num_workers, const int slots )
Packet_courier( const int workers, const int slots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
receive_id( 0 ), deliver_id( 0 ),
slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ),
num_working( num_workers ), num_slots( slots ), eof( false )
{ xinit( &iav_or_eof, &imutex ); xinit( &oav_or_exit, &omutex ); }
num_working( workers ), num_slots( slots ), eof( false )
{
xinit( &imutex ); xinit( &iav_or_eof );
xinit( &omutex ); xinit( &oav_or_exit );
}
~Packet_courier()
{ xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); }
{
xdestroy( &oav_or_exit ); xdestroy( &omutex );
xdestroy( &iav_or_eof ); xdestroy( &imutex );
}
const Slot_tally & tally() const { return slot_tally; }
@ -271,7 +280,7 @@ extern "C" void * csplitter( void * arg )
for( bool first_post = true; ; first_post = false )
{
uint8_t * data = new( std::nothrow ) uint8_t[data_size];
uint8_t * const data = new( std::nothrow ) uint8_t[data_size];
if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
const int size = readblock( infd, data, data_size );
if( size != data_size && errno )
@ -281,14 +290,15 @@ extern "C" void * csplitter( void * arg )
{
in_size += size;
courier.receive_packet( data, size );
if( size < data_size ) break; // EOF
}
else
{
delete[] data;
courier.finish(); // no more packets to send
break;
}
}
courier.finish(); // no more packets to send
return 0;
}
@ -314,11 +324,11 @@ extern "C" void * cworker( void * arg )
while( true )
{
Packet * packet = courier.distribute_packet();
Packet * const packet = courier.distribute_packet();
if( packet == 0 ) break; // no more packets to process
const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size];
const int max_compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
uint8_t * const new_data = new( std::nothrow ) uint8_t[max_compr_size];
if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); }
const int dict_size = std::max( LZ_min_dictionary_size(),
std::min( dictionary_size, packet->size ) );
@ -346,10 +356,11 @@ extern "C" void * cworker( void * arg )
if( wr < 0 ) internal_error( "library error (LZ_compress_write)" );
written += wr;
}
if( written >= packet->size ) LZ_compress_finish( encoder );
if( written >= packet->size )
{ delete[] packet->data; LZ_compress_finish( encoder ); }
}
const int rd = LZ_compress_read( encoder, new_data + new_size,
compr_size - new_size );
max_compr_size - new_size );
if( rd < 0 )
{
pp();
@ -359,7 +370,7 @@ extern "C" void * cworker( void * arg )
fatal();
}
new_size += rd;
if( new_size > compr_size )
if( new_size > max_compr_size )
internal_error( "packet size exceeded in worker" );
if( LZ_compress_finished( encoder ) == 1 ) break;
}
@ -367,7 +378,6 @@ extern "C" void * cworker( void * arg )
if( LZ_compress_close( encoder ) < 0 )
{ pp( "LZ_compress_close failed" ); fatal(); }
delete[] packet->data;
packet->data = new_data;
packet->size = new_size;
courier.collect_packet( packet );
@ -405,12 +415,15 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
// call the muxer.
int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd,
const int infd, const int outfd,
const Pretty_print & pp, const int debug_level )
{
const int slots_per_worker = 2;
const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
num_workers * slots_per_worker : INT_MAX );
in_size = 0;
out_size = 0;
Packet_courier courier( num_workers, num_slots );
Packet_courier courier( num_workers, num_slots - 1 );
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;
@ -434,7 +447,7 @@ int compress( const int data_size, const int dictionary_size,
{ pp( "Not enough memory" ); fatal(); }
for( int i = 0; i < num_workers; ++i )
{
errcode = pthread_create( &worker_threads[i], 0, cworker, &worker_arg );
errcode = pthread_create( worker_threads + i, 0, cworker, &worker_arg );
if( errcode )
{ show_error( "Can't create worker threads", errcode ); fatal(); }
}
@ -456,7 +469,7 @@ int compress( const int data_size, const int dictionary_size,
if( verbosity >= 1 )
{
if( in_size <= 0 || out_size <= 0 )
std::fprintf( stderr, "no data compressed.\n" );
std::fprintf( stderr, " no data compressed.\n" );
else
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
"%5.2f%% saved, %lld in, %lld out.\n",