1
0
Fork 0

Merging upstream version 1.7.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 04:15:24 +01:00
parent 8bc0325467
commit f6869e4fd3
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
20 changed files with 841 additions and 444 deletions

View file

@ -1,6 +1,6 @@
/* Plzip - Parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009-2017 Antonio Diaz Diaz.
Copyright (C) 2009-2018 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -159,8 +159,9 @@ struct Packet // data block with a serial number
uint8_t * data;
int size; // number of bytes in data (if any)
unsigned id; // serial number assigned as received
Packet( uint8_t * const d, const int s, const unsigned i )
: data( d ), size( s ), id( i ) {}
Packet() : data( 0 ), size( 0 ), id( 0 ) {}
void init( uint8_t * const d, const int s, const unsigned i )
{ data = d; size = s; id = i; }
};
@ -173,10 +174,11 @@ public:
unsigned owait_counter;
private:
unsigned receive_id; // id assigned to next packet received
unsigned distrib_id; // id of next packet to be distributed
unsigned deliver_id; // id of next packet to be delivered
Slot_tally slot_tally; // limits the number of input packets
std::queue< Packet * > packet_queue;
std::vector< const Packet * > circular_buffer;
std::vector< Packet > circular_ibuffer;
std::vector< const Packet * > circular_obuffer;
int num_working; // number of workers still running
const int num_slots; // max packets in circulation
pthread_mutex_t imutex;
@ -192,8 +194,9 @@ public:
Packet_courier( const int workers, const int slots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
receive_id( 0 ), deliver_id( 0 ),
slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ),
receive_id( 0 ), distrib_id( 0 ), deliver_id( 0 ),
slot_tally( slots ), circular_ibuffer( slots ),
circular_obuffer( slots, (Packet *) 0 ),
num_working( workers ), num_slots( slots ), eof( false )
{
xinit_mutex( &imutex ); xinit_cond( &iav_or_eof );
@ -206,13 +209,13 @@ public:
xdestroy_cond( &iav_or_eof ); xdestroy_mutex( &imutex );
}
// make a packet with data received from splitter
// fill a packet with data received from splitter
void receive_packet( uint8_t * const data, const int size )
{
Packet * const ipacket = new Packet( data, size, receive_id++ );
slot_tally.get_slot(); // wait for a free slot
xlock( &imutex );
packet_queue.push( ipacket );
circular_ibuffer[receive_id % num_slots].init( data, size, receive_id );
++receive_id;
xsignal( &iav_or_eof );
xunlock( &imutex );
}
@ -223,18 +226,15 @@ public:
Packet * ipacket = 0;
xlock( &imutex );
++icheck_counter;
while( packet_queue.empty() && !eof )
while( receive_id == distrib_id && !eof ) // no packets to distribute
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
}
if( !packet_queue.empty() )
{
ipacket = packet_queue.front();
packet_queue.pop();
}
if( receive_id != distrib_id )
{ ipacket = &circular_ibuffer[distrib_id % num_slots]; ++distrib_id; }
xunlock( &imutex );
if( !ipacket )
if( !ipacket ) // EOF
{
// notify muxer when last worker exits
xlock( &omutex );
@ -250,10 +250,10 @@ public:
const int i = opacket->id % num_slots;
xlock( &omutex );
// id collision shouldn't happen
if( circular_buffer[i] != 0 )
if( circular_obuffer[i] != 0 )
internal_error( "id collision in collect_packet." );
// merge packet into circular buffer
circular_buffer[i] = opacket;
circular_obuffer[i] = opacket;
if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
@ -264,7 +264,7 @@ public:
xlock( &omutex );
++ocheck_counter;
int i = deliver_id % num_slots;
while( circular_buffer[i] == 0 && num_working > 0 )
while( circular_obuffer[i] == 0 && num_working > 0 )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
@ -272,18 +272,19 @@ public:
packet_vector.clear();
while( true )
{
const Packet * const opacket = circular_buffer[i];
const Packet * const opacket = circular_obuffer[i];
if( !opacket ) break;
packet_vector.push_back( opacket );
circular_buffer[i] = 0;
circular_obuffer[i] = 0;
++deliver_id;
i = deliver_id % num_slots;
}
xunlock( &omutex );
if( packet_vector.size() ) // return slots to the tally
slot_tally.leave_slots( packet_vector.size() );
}
void return_empty_packet() // return a slot to the tally
{ slot_tally.leave_slot(); }
void finish() // splitter has no more packets to send
{
xlock( &imutex );
@ -294,10 +295,10 @@ public:
bool finished() // all packets delivered to muxer
{
if( !slot_tally.all_free() || !eof || !packet_queue.empty() ||
if( !slot_tally.all_free() || !eof || receive_id != distrib_id ||
num_working != 0 ) return false;
for( int i = 0; i < num_slots; ++i )
if( circular_buffer[i] != 0 ) return false;
if( circular_obuffer[i] != 0 ) return false;
return true;
}
};
@ -369,26 +370,32 @@ extern "C" void * cworker( void * arg )
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
const int offset = tmp.offset;
LZ_Encoder * encoder = 0;
while( true )
{
Packet * const packet = courier.distribute_packet();
if( !packet ) break; // no more packets to process
const bool fast = dictionary_size == 65535 && match_len_limit == 16;
const int dict_size = fast ? dictionary_size :
std::max( std::min( dictionary_size, packet->size ),
LZ_min_dictionary_size() );
LZ_Encoder * const encoder =
LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
if( !encoder )
{
if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error )
pp( mem_msg );
else
internal_error( "invalid argument to encoder." );
cleanup_and_fail();
const bool fast = dictionary_size == 65535 && match_len_limit == 16;
const int dict_size = fast ? dictionary_size :
std::max( std::min( dictionary_size, packet->size ),
LZ_min_dictionary_size() );
encoder = LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
{
if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error )
pp( mem_msg );
else
internal_error( "invalid argument to encoder." );
cleanup_and_fail();
}
}
else
if( LZ_compress_restart_member( encoder, LLONG_MAX ) < 0 )
{ pp( "LZ_compress_restart_member failed." ); cleanup_and_fail(); }
int written = 0;
int new_pos = 0;
@ -422,13 +429,12 @@ extern "C" void * cworker( void * arg )
if( LZ_compress_finished( encoder ) == 1 ) break;
}
if( LZ_compress_close( encoder ) < 0 )
{ pp( "LZ_compress_close failed." ); cleanup_and_fail(); }
if( packet->size > 0 ) show_progress( packet->size );
packet->size = new_pos;
courier.collect_packet( packet );
}
if( encoder && LZ_compress_close( encoder ) < 0 )
{ pp( "LZ_compress_close failed." ); cleanup_and_fail(); }
return 0;
}
@ -452,7 +458,7 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
if( wr != opacket->size )
{ pp(); show_error( "Write error", errno ); cleanup_and_fail(); }
delete[] opacket->data;
delete opacket;
courier.return_empty_packet();
}
}
}
@ -462,7 +468,8 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
// init the courier, then start the splitter and the workers and
// call the muxer.
int compress( const int data_size, const int dictionary_size,
int compress( const unsigned long long cfile_size,
const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int infd, const int outfd,
const Pretty_print & pp, const int debug_level )
@ -486,6 +493,8 @@ int compress( const int data_size, const int dictionary_size,
int errcode = pthread_create( &splitter_thread, 0, csplitter, &splitter_arg );
if( errcode )
{ show_error( "Can't create splitter thread", errcode ); cleanup_and_fail(); }
if( verbosity >= 1 ) pp();
show_progress( 0, cfile_size, &pp ); // init
Worker_arg worker_arg;
worker_arg.courier = &courier;
@ -522,11 +531,11 @@ int compress( const int data_size, const int dictionary_size,
if( in_size == 0 || out_size == 0 )
std::fputs( " no data compressed.\n", stderr );
else
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
"%5.2f%% saved, %llu in, %llu out.\n",
std::fprintf( stderr, "%6.3f:1, %5.2f%% ratio, %5.2f%% saved, "
"%llu in, %llu out.\n",
(double)in_size / out_size,
( 8.0 * out_size ) / in_size,
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
( 100.0 * out_size ) / in_size,
100.0 - ( ( 100.0 * out_size ) / in_size ),
in_size, out_size );
}