1
0
Fork 0

Adding upstream version 0.12.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-17 21:12:27 +01:00
parent 6bd0c00498
commit 454dd91a77
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
18 changed files with 427 additions and 212 deletions

View file

@ -45,6 +45,42 @@ Packet_courier * courierp = 0; // local vars needed by add_member
unsigned long long partial_data_size = 0; // size of current block
class Slot_tally
{
const int num_slots; // total slots
int num_free; // remaining free slots
pthread_mutex_t mutex;
pthread_cond_t slot_av; // slot available
Slot_tally( const Slot_tally & ); // declared as private
void operator=( const Slot_tally & ); // declared as private
public:
explicit Slot_tally( const int slots )
: num_slots( slots ), num_free( slots )
{ xinit_mutex( &mutex ); xinit_cond( &slot_av ); }
~Slot_tally() { xdestroy_cond( &slot_av ); xdestroy_mutex( &mutex ); }
bool all_free() { return ( num_free == num_slots ); }
void get_slot() // wait for a free slot
{
xlock( &mutex );
while( num_free <= 0 ) xwait( &slot_av, &mutex );
--num_free;
xunlock( &mutex );
}
void leave_slot() // return a slot to the tally
{
xlock( &mutex );
if( ++num_free == 1 ) xsignal( &slot_av ); // num_free was 0
xunlock( &mutex );
}
};
struct Ipacket // filename, file size and headers
{
const unsigned long long file_size;
@ -458,7 +494,7 @@ extern "C" void * cworker( void * arg )
/* Get from courier the processed and sorted packets, and write
their contents to the output archive. */
bool muxer( Packet_courier & courier, const char * const archive_name,
void muxer( Packet_courier & courier, const char * const archive_name,
const int outfd )
{
while( true )
@ -466,13 +502,12 @@ bool muxer( Packet_courier & courier, const char * const archive_name,
const Opacket * const opacket = courier.deliver_packet();
if( !opacket ) break; // queue is empty. all workers exited
const int wr = writeblock( outfd, opacket->data, opacket->size );
if( wr != opacket->size )
{ show_file_error( archive_name, "Write error", errno ); return false; }
if( writeblock( outfd, opacket->data, opacket->size ) != opacket->size )
{ show_file_error( archive_name, "Write error", errno );
cleanup_and_fail(); }
delete[] opacket->data;
delete opacket;
}
return true;
}
} // end namespace
@ -488,6 +523,8 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser,
num_workers * in_slots : INT_MAX;
const int out_slots = 64;
/* If an error happens after any threads have been started, exit must be
called before courier goes out of scope. */
Packet_courier courier( num_workers, total_in_slots, out_slots );
courierp = &courier; // needed by add_member
@ -498,11 +535,12 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser,
pthread_t grouper_thread;
int errcode = pthread_create( &grouper_thread, 0, grouper, &grouper_arg );
if( errcode )
{ show_error( "Can't create grouper thread", errcode ); return 1; }
{ show_error( "Can't create grouper thread", errcode ); cleanup_and_fail(); }
Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( !worker_args || !worker_threads ) { show_error( mem_msg ); return 1; }
if( !worker_args || !worker_threads )
{ show_error( mem_msg ); cleanup_and_fail(); }
for( int i = 0; i < num_workers; ++i )
{
worker_args[i].courier = &courier;
@ -511,23 +549,23 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser,
worker_args[i].worker_id = i;
errcode = pthread_create( &worker_threads[i], 0, cworker, &worker_args[i] );
if( errcode )
{ show_error( "Can't create worker threads", errcode ); return 1; }
{ show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); }
}
if( !muxer( courier, archive_name, outfd ) ) return 1;
muxer( courier, archive_name, outfd );
for( int i = num_workers - 1; i >= 0; --i )
{
errcode = pthread_join( worker_threads[i], 0 );
if( errcode )
{ show_error( "Can't join worker threads", errcode ); return 1; }
{ show_error( "Can't join worker threads", errcode ); cleanup_and_fail(); }
}
delete[] worker_threads;
delete[] worker_args;
errcode = pthread_join( grouper_thread, 0 );
if( errcode )
{ show_error( "Can't join grouper thread", errcode ); return 1; }
{ show_error( "Can't join grouper thread", errcode ); cleanup_and_fail(); }
// write End-Of-Archive records
int retval = 0;