1
0
Fork 0

Merging upstream version 0.6.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 03:27:29 +01:00
parent 237ee44a6a
commit b724aa0729
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
11 changed files with 235 additions and 138 deletions

View file

@ -28,8 +28,8 @@
#include <queue>
#include <string>
#include <vector>
#include <inttypes.h>
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
@ -111,6 +111,7 @@ public:
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
++icheck_counter;
}
if( !ipacket_queues[worker_id].empty() )
{
@ -153,6 +154,7 @@ public:
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
++ocheck_counter;
}
if( opacket_queues[deliver_worker_id].empty() ) break;
opacket = opacket_queues[deliver_worker_id].front();
@ -201,7 +203,7 @@ struct Splitter_arg
// split data from input file into chunks and pass them to
// courier for packaging and distribution to workers.
void * splitter( void * arg )
extern "C" void * dsplitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
Packet_courier & courier = *tmp.courier;
@ -286,7 +288,7 @@ struct Worker_arg
// consume packets from courier, decompress their contents, and
// give the produced packets to courier.
void * worker( void * arg )
extern "C" void * dworker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
Packet_courier & courier = *tmp.courier;
@ -414,7 +416,9 @@ int decompress( const int num_workers, const int num_slots,
splitter_arg.packet_size = packet_size;
pthread_t splitter_thread;
xcreate( &splitter_thread, splitter, &splitter_arg );
int errcode = pthread_create( &splitter_thread, 0, dsplitter, &splitter_arg );
if( errcode )
{ show_error( "can't create splitter thread", errcode ); fatal(); }
Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
@ -426,17 +430,25 @@ int decompress( const int num_workers, const int num_slots,
worker_args[i].pp = &pp;
worker_args[i].worker_id = i;
worker_args[i].packet_size = packet_size;
xcreate( &worker_threads[i], worker, &worker_args[i] );
errcode = pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] );
if( errcode )
{ show_error( "can't create worker threads", errcode ); fatal(); }
}
muxer( courier, pp, outfd );
for( int i = num_workers - 1; i >= 0; --i )
xjoin( worker_threads[i] );
{
errcode = pthread_join( worker_threads[i], 0 );
if( errcode )
{ show_error( "can't join worker threads", errcode ); fatal(); }
}
delete[] worker_threads; worker_threads = 0;
delete[] worker_args; worker_args = 0;
xjoin( splitter_thread );
errcode = pthread_join( splitter_thread, 0 );
if( errcode )
{ show_error( "can't join splitter thread", errcode ); fatal(); }
if( verbosity >= 2 )
std::fprintf( stderr, "decompressed size %9lld, size %9lld. ",