1
0
Fork 0

Merging upstream version 1.2~pre1.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 04:03:21 +01:00
parent 55c26d29ff
commit c229ba10a7
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
20 changed files with 280 additions and 243 deletions

View file

@ -1,6 +1,6 @@
/* Plzip - Parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -65,35 +65,36 @@ private:
std::vector< std::queue< Packet * > > opacket_queues;
int num_working; // number of workers still running
const int num_workers; // number of workers
const int num_slots; // max output packets in circulation
int num_free; // remaining free output slots
const unsigned out_slots; // max output packets per queue
pthread_mutex_t imutex;
pthread_cond_t iav_or_eof; // input packet available or splitter done
pthread_mutex_t omutex;
pthread_cond_t oav_or_exit; // output packet available or all workers exited
pthread_cond_t slot_av; // free output slot available
bool eof; // splitter done
std::vector< pthread_cond_t > slot_av; // output slot available
bool eof; // splitter done
Packet_courier( const Packet_courier & ); // declared as private
void operator=( const Packet_courier & ); // declared as private
public:
Packet_courier( const int workers, const int slots )
Packet_courier( const int workers, const int in_slots, const int oslots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
receive_worker_id( 0 ), deliver_worker_id( 0 ),
slot_tally( slots ), ipacket_queues( workers ),
slot_tally( in_slots ), ipacket_queues( workers ),
opacket_queues( workers ), num_working( workers ),
num_workers( workers ), num_slots( 8 * slots ), num_free( num_slots ),
num_workers( workers ), out_slots( oslots ), slot_av( workers ),
eof( false )
{
xinit( &imutex ); xinit( &iav_or_eof );
xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av );
xinit( &omutex ); xinit( &oav_or_exit );
for( unsigned i = 0; i < slot_av.size(); ++i ) xinit( &slot_av[i] );
}
~Packet_courier()
{
xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex );
for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy( &slot_av[i] );
xdestroy( &oav_or_exit ); xdestroy( &omutex );
xdestroy( &iav_or_eof ); xdestroy( &imutex );
}
@ -149,9 +150,8 @@ public:
xlock( &omutex );
if( opacket->data )
{
while( worker_id != deliver_worker_id && num_free <= 0 )
xwait( &slot_av, &omutex );
--num_free;
while( opacket_queues[worker_id].size() >= out_slots )
xwait( &slot_av[worker_id], &omutex );
}
opacket_queues[worker_id].push( opacket );
if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
@ -175,13 +175,10 @@ public:
if( opacket_queues[deliver_worker_id].empty() ) break;
opacket = opacket_queues[deliver_worker_id].front();
opacket_queues[deliver_worker_id].pop();
if( opacket->data )
{
if( ++num_free == 1 ) xsignal( &slot_av );
break;
}
if( opacket_queues[deliver_worker_id].size() + 1 == out_slots )
xsignal( &slot_av[deliver_worker_id] );
if( opacket->data ) break;
if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
xbroadcast( &slot_av ); // restart deliver_worker_id thread
delete opacket; opacket = 0;
}
xunlock( &omutex );
@ -198,8 +195,7 @@ public:
bool finished() // all packets delivered to muxer
{
if( !slot_tally.all_free() ||
num_free != num_slots || !eof || num_working != 0 ) return false;
if( !slot_tally.all_free() || !eof || num_working != 0 ) return false;
for( int i = 0; i < num_workers; ++i )
if( !ipacket_queues[i].empty() ) return false;
for( int i = 0; i < num_workers; ++i )
@ -408,7 +404,7 @@ extern "C" void * dworker_s( void * arg )
if( trailing_garbage_found ||
LZ_decompress_finished( decoder ) == 1 )
{
LZ_decompress_reset( decoder ); // prepare for new ipacket
LZ_decompress_reset( decoder ); // prepare for new member
Packet * opacket = new Packet; // end of member token
opacket->data = 0;
opacket->size = 0;
@ -464,12 +460,13 @@ int dec_stream( const int num_workers, const int infd, const int outfd,
const Pretty_print & pp, const int debug_level,
const bool testing )
{
const int slots_per_worker = 2;
const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
num_workers * slots_per_worker : INT_MAX );
const int in_slots_per_worker = 2;
const int out_slots = 32;
const int in_slots = ( INT_MAX / num_workers >= in_slots_per_worker ) ?
num_workers * in_slots_per_worker : INT_MAX;
in_size = 0;
out_size = 0;
Packet_courier courier( num_workers, num_slots );
Packet_courier courier( num_workers, in_slots, out_slots );
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;