1
0
Fork 0

Merging upstream version 0.26.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-17 21:27:57 +01:00
parent 7185f44b62
commit 180f99b04d
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
44 changed files with 610 additions and 505 deletions

View file

@ -22,7 +22,7 @@
#include <cstdio>
#include <queue>
#include <pthread.h>
#include <stdint.h> // for lzlib.h
#include <stdint.h> // for lzlib.h
#include <unistd.h>
#include <sys/stat.h>
#include <ftw.h>
@ -60,7 +60,7 @@ public:
~Slot_tally() { xdestroy_cond( &slot_av ); xdestroy_mutex( &mutex ); }
bool all_free() { return ( num_free == num_slots ); }
bool all_free() { return num_free == num_slots; }
void get_slot() // wait for a free slot
{
@ -94,8 +94,8 @@ struct Ipacket // filename, file size and headers
struct Opacket // compressed data to be written to the archive
{
const uint8_t * const data; // data == 0 means end of lzip member
const int size; // number of bytes in data (if any)
const uint8_t * data; // data == 0 means end of lzip member
int size; // number of bytes in data (if any)
Opacket() : data( 0 ), size( 0 ) {}
Opacket( uint8_t * const d, const int s ) : data( d ), size( s ) {}
@ -110,11 +110,11 @@ public:
unsigned ocheck_counter;
unsigned owait_counter;
private:
int receive_worker_id; // worker queue currently receiving packets
int deliver_worker_id; // worker queue currently delivering packets
int receive_id; // worker queue currently receiving packets
int deliver_id; // worker queue currently delivering packets
Slot_tally slot_tally; // limits the number of input packets
std::vector< std::queue< const Ipacket * > > ipacket_queues;
std::vector< std::queue< const Opacket * > > opacket_queues;
std::vector< std::queue< Opacket > > opacket_queues;
int num_working; // number of workers still running
const int num_workers; // number of workers
const unsigned out_slots; // max output packets per queue
@ -132,11 +132,10 @@ public:
Packet_courier( const int workers, const int in_slots, const int oslots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
receive_worker_id( 0 ), deliver_worker_id( 0 ),
slot_tally( in_slots ), ipacket_queues( workers ),
opacket_queues( workers ), num_working( workers ),
num_workers( workers ), out_slots( oslots ), slot_av( workers ),
eof( false )
receive_id( 0 ), deliver_id( 0 ), slot_tally( in_slots ),
ipacket_queues( workers ), opacket_queues( workers ),
num_working( workers ), num_workers( workers ),
out_slots( oslots ), slot_av( workers ), eof( false )
{
xinit_mutex( &imutex ); xinit_cond( &iav_or_eof );
xinit_mutex( &omutex ); xinit_cond( &oav_or_exit );
@ -157,9 +156,9 @@ public:
if( !ipacket->filename.empty() )
slot_tally.get_slot(); // wait for a free slot
xlock( &imutex );
ipacket_queues[receive_worker_id].push( ipacket );
if( ipacket->filename.empty() && ++receive_worker_id >= num_workers )
receive_worker_id = 0;
ipacket_queues[receive_id].push( ipacket );
if( ipacket->filename.empty() && ++receive_id >= num_workers )
receive_id = 0;
xbroadcast( &iav_or_eof );
xunlock( &imutex );
}
@ -194,44 +193,41 @@ public:
}
// collect an opacket from a worker
void collect_packet( const Opacket * const opacket, const int worker_id )
void collect_packet( const Opacket & opacket, const int worker_id )
{
xlock( &omutex );
if( opacket->data )
if( opacket.data )
{
while( opacket_queues[worker_id].size() >= out_slots )
xwait( &slot_av[worker_id], &omutex );
}
opacket_queues[worker_id].push( opacket );
if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
if( worker_id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
/* Deliver an opacket to muxer.
If opacket data == 0, move to next queue and wait again. */
const Opacket * deliver_packet()
/* Deliver opackets to muxer.
If opacket.data == 0, skip opacket and move to next queue. */
void deliver_packets( std::vector< Opacket > & opacket_vector )
{
const Opacket * opacket = 0;
opacket_vector.clear();
xlock( &omutex );
++ocheck_counter;
while( true )
{
while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
do {
while( opacket_queues[deliver_id].empty() && num_working > 0 )
{ ++owait_counter; xwait( &oav_or_exit, &omutex ); }
while( !opacket_queues[deliver_id].empty() )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
Opacket opacket = opacket_queues[deliver_id].front();
opacket_queues[deliver_id].pop();
if( opacket_queues[deliver_id].size() + 1 == out_slots )
xsignal( &slot_av[deliver_id] );
if( opacket.data ) opacket_vector.push_back( opacket );
else if( ++deliver_id >= num_workers ) deliver_id = 0;
}
if( opacket_queues[deliver_worker_id].empty() ) break;
opacket = opacket_queues[deliver_worker_id].front();
opacket_queues[deliver_worker_id].pop();
if( opacket_queues[deliver_worker_id].size() + 1 == out_slots )
xsignal( &slot_av[deliver_worker_id] );
if( opacket->data ) break;
if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
delete opacket; opacket = 0;
}
while( opacket_vector.empty() && num_working > 0 );
xunlock( &omutex );
return opacket;
}
void finish() // grouper has no more packets to send
@ -371,7 +367,7 @@ void loop_encode( const uint8_t * const ibuf, const int isize,
{
if( opos > max_packet_size )
internal_error( "opacket size exceeded in worker." );
courier.collect_packet( new Opacket( obuf, opos ), worker_id );
courier.collect_packet( Opacket( obuf, opos ), worker_id );
opos = 0; obuf = new( std::nothrow ) uint8_t[max_packet_size];
if( !obuf ) { show_error( mem_msg2 ); exit_fail_mt(); }
if( LZ_compress_finished( encoder ) == 1 )
@ -421,7 +417,7 @@ extern "C" void * cworker( void * arg )
{
if( !flushed ) // this lzip member is not empty
loop_encode( 0, 0, data, opos, courier, encoder, worker_id, true );
courier.collect_packet( new Opacket, worker_id ); // end of member token
courier.collect_packet( Opacket(), worker_id ); // end of member token
flushed = true; delete ipacket; continue;
}
@ -501,15 +497,19 @@ extern "C" void * cworker( void * arg )
*/
void muxer( Packet_courier & courier, const int outfd )
{
std::vector< Opacket > opacket_vector;
while( true )
{
const Opacket * const opacket = courier.deliver_packet();
if( !opacket ) break; // queue is empty. all workers exited
courier.deliver_packets( opacket_vector );
if( opacket_vector.empty() ) break; // queue is empty. all workers exited
if( !writeblock_wrapper( outfd, opacket->data, opacket->size ) )
exit_fail_mt();
delete[] opacket->data;
delete opacket;
for( unsigned i = 0; i < opacket_vector.size(); ++i )
{
Opacket & opacket = opacket_vector[i];
if( !writeblock_wrapper( outfd, opacket.data, opacket.size ) )
exit_fail_mt();
delete[] opacket.data;
}
}
}