1
0
Fork 0

Merging upstream version 1.0~rc1.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 03:57:48 +01:00
parent dde76b5e14
commit 7210c549d7
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
24 changed files with 1438 additions and 882 deletions

View file

@ -1,6 +1,6 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -25,387 +25,164 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <queue>
#include <string>
#include <vector>
#include <inttypes.h>
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/stat.h>
#include <lzlib.h>
#include "plzip.h"
#include "lzip.h"
#include "file_index.h"
// Returns the number of bytes really read.
// If (returned value < size) and (errno == 0), means EOF was reached.
//
int preadblock( const int fd, uint8_t * const buf, const int size,
const long long pos )
{
int rest = size;
errno = 0;
while( rest > 0 )
{
const int n = pread( fd, buf + size - rest, rest, pos + size - rest );
if( n > 0 ) rest -= n;
else if( n == 0 ) break; // EOF
else if( errno != EINTR && errno != EAGAIN ) break;
errno = 0;
}
return size - rest;
}
// Returns the number of bytes really written.
// If (returned value < size), it is always an error.
//
int pwriteblock( const int fd, const uint8_t * const buf, const int size,
const long long pos )
{
int rest = size;
errno = 0;
while( rest > 0 )
{
const int n = pwrite( fd, buf + size - rest, rest, pos + size - rest );
if( n > 0 ) rest -= n;
else if( n < 0 && errno != EINTR && errno != EAGAIN ) break;
errno = 0;
}
return size - rest;
}
int decompress_read_error( struct LZ_Decoder * const decoder,
const Pretty_print & pp, const int worker_id )
{
const LZ_Errno errcode = LZ_decompress_errno( decoder );
pp();
if( verbosity >= 0 )
std::fprintf( stderr, "LZ_decompress_read error in worker %d: %s.\n",
worker_id, LZ_strerror( errcode ) );
if( errcode == LZ_header_error || errcode == LZ_unexpected_eof ||
errcode == LZ_data_error )
return 2;
return 1;
}
namespace {
enum { max_packet_size = 1 << 20 };
long long in_size = 0;
long long out_size = 0;
struct Packet // data block
{
uint8_t * data; // data == 0 means end of member
int size; // number of bytes in data (if any)
};
class Packet_courier // moves packets around
{
public:
unsigned long icheck_counter;
unsigned long iwait_counter;
unsigned long ocheck_counter;
unsigned long owait_counter;
private:
int receive_worker_id; // worker queue currently receiving packets
int deliver_worker_id; // worker queue currently delivering packets
Slot_tally slot_tally; // limits the number of input packets
std::vector< std::queue< Packet * > > ipacket_queues;
std::vector< std::queue< Packet * > > opacket_queues;
int num_working; // number of workers still running
const int num_workers; // number of workers
int num_free; // remaining free output slots
pthread_mutex_t imutex;
pthread_cond_t iav_or_eof; // input packet available or splitter done
pthread_mutex_t omutex;
pthread_cond_t oav_or_exit; // output packet available or all workers exited
pthread_cond_t slot_av; // free output slot available
bool eof; // splitter done
Packet_courier( const Packet_courier & ); // declared as private
void operator=( const Packet_courier & ); // declared as private
public:
Packet_courier( const int workers, const int slots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
receive_worker_id( 0 ), deliver_worker_id( 0 ),
slot_tally( slots ), ipacket_queues( workers ),
opacket_queues( workers ), num_working( workers ),
num_workers( workers ), num_free( 8 * slots ), eof( false )
{
xinit( &imutex ); xinit( &iav_or_eof );
xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av );
}
~Packet_courier()
{
xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex );
xdestroy( &iav_or_eof ); xdestroy( &imutex );
}
const Slot_tally & tally() const { return slot_tally; }
// make a packet with data received from splitter
// if data == 0, move to next queue
void receive_packet( uint8_t * const data, const int size )
{
Packet * ipacket = new Packet;
ipacket->data = data;
ipacket->size = size;
if( data != 0 )
{ in_size += size; slot_tally.get_slot(); } // wait for a free slot
xlock( &imutex );
ipacket_queues[receive_worker_id].push( ipacket );
xbroadcast( &iav_or_eof );
xunlock( &imutex );
if( data == 0 && ++receive_worker_id >= num_workers )
receive_worker_id = 0;
}
// distribute a packet to a worker
Packet * distribute_packet( const int worker_id )
{
Packet * ipacket = 0;
xlock( &imutex );
++icheck_counter;
while( ipacket_queues[worker_id].empty() && !eof )
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
++icheck_counter;
}
if( !ipacket_queues[worker_id].empty() )
{
ipacket = ipacket_queues[worker_id].front();
ipacket_queues[worker_id].pop();
}
xunlock( &imutex );
if( ipacket != 0 )
{ if( ipacket->data != 0 ) slot_tally.leave_slot(); }
else
{
// notify muxer when last worker exits
xlock( &omutex );
if( --num_working == 0 ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
return ipacket;
}
// collect a packet from a worker
void collect_packet( Packet * const opacket, const int worker_id )
{
xlock( &omutex );
if( opacket->data != 0 )
{
while( worker_id != deliver_worker_id && num_free <= 0 )
xwait( &slot_av, &omutex );
--num_free;
}
opacket_queues[worker_id].push( opacket );
if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
// deliver a packet to muxer
// if packet data == 0, move to next queue and wait again
Packet * deliver_packet()
{
Packet * opacket = 0;
xlock( &omutex );
++ocheck_counter;
while( true )
{
while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
++ocheck_counter;
}
if( opacket_queues[deliver_worker_id].empty() ) break;
opacket = opacket_queues[deliver_worker_id].front();
opacket_queues[deliver_worker_id].pop();
if( opacket->data != 0 )
{
if( ++num_free == 1 ) xsignal( &slot_av );
break;
}
if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
xbroadcast( &slot_av ); // restart deliver_worker_id thread
delete opacket; opacket = 0;
}
xunlock( &omutex );
return opacket;
}
void finish() // splitter has no more packets to send
{
xlock( &imutex );
eof = true;
xbroadcast( &iav_or_eof );
xunlock( &imutex );
}
bool finished() // all packets delivered to muxer
{
if( !slot_tally.all_free() || !eof || num_working != 0 ) return false;
for( int i = 0; i < num_workers; ++i )
if( !ipacket_queues[i].empty() ) return false;
for( int i = 0; i < num_workers; ++i )
if( !opacket_queues[i].empty() ) return false;
return true;
}
};
// Search forward from 'pos' for "LZIP" (Boyer-Moore algorithm)
// Return pos of found string or 'pos+size' if not found.
//
int find_magic( const uint8_t * const buffer, const int pos, const int size )
{
const uint8_t table[256] = {
4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
4,4,4,4,4,4,4,4,4,1,4,4,3,4,4,4,4,4,4,4,4,4,4,4,4,4,2,4,4,4,4,4,
4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4 };
for( int i = pos; i <= pos + size - 4; i += table[buffer[i+3]] )
if( buffer[i] == 'L' && buffer[i+1] == 'Z' &&
buffer[i+2] == 'I' && buffer[i+3] == 'P' )
return i; // magic string found
return pos + size;
}
struct Splitter_arg
{
Packet_courier * courier;
const Pretty_print * pp;
int infd;
};
// split data from input file into chunks and pass them to
// courier for packaging and distribution to workers.
extern "C" void * dsplitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
Packet_courier & courier = *tmp.courier;
const Pretty_print & pp = *tmp.pp;
const int infd = tmp.infd;
const int hsize = 6; // header size
const int tsize = 20; // trailer size
const int buffer_size = max_packet_size;
const int base_buffer_size = tsize + buffer_size + hsize;
uint8_t * const base_buffer = new( std::nothrow ) uint8_t[base_buffer_size];
if( base_buffer == 0 ) { pp( "Not enough memory" ); fatal(); }
uint8_t * const buffer = base_buffer + tsize;
int size = readblock( infd, buffer, buffer_size + hsize ) - hsize;
bool at_stream_end = ( size < buffer_size );
if( size != buffer_size && errno )
{ pp(); show_error( "Read error", errno ); fatal(); }
if( size <= tsize || find_magic( buffer, 0, 4 ) != 0 )
{ pp( "Bad magic number (file not in lzip format)" ); fatal(); }
long long partial_member_size = 0;
while( true )
{
int pos = 0;
for( int newpos = 1; newpos <= size; ++newpos )
{
newpos = find_magic( buffer, newpos, size + 4 - newpos );
if( newpos <= size )
{
long long member_size = 0;
for( int i = 1; i <= 8; ++i )
{ member_size <<= 8; member_size += base_buffer[tsize+newpos-i]; }
if( partial_member_size + newpos - pos == member_size )
{ // header found
uint8_t * const data = new( std::nothrow ) uint8_t[newpos - pos];
if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
std::memcpy( data, buffer + pos, newpos - pos );
courier.receive_packet( data, newpos - pos );
courier.receive_packet( 0, 0 ); // end of member token
partial_member_size = 0;
pos = newpos;
}
}
}
if( at_stream_end )
{
uint8_t * data = new( std::nothrow ) uint8_t[size + hsize - pos];
if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
std::memcpy( data, buffer + pos, size + hsize - pos );
courier.receive_packet( data, size + hsize - pos );
courier.receive_packet( 0, 0 ); // end of member token
break;
}
if( pos < buffer_size )
{
partial_member_size += buffer_size - pos;
uint8_t * data = new( std::nothrow ) uint8_t[buffer_size - pos];
if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
std::memcpy( data, buffer + pos, buffer_size - pos );
courier.receive_packet( data, buffer_size - pos );
}
std::memcpy( base_buffer, base_buffer + buffer_size, tsize + hsize );
size = readblock( infd, buffer + hsize, buffer_size );
at_stream_end = ( size < buffer_size );
if( size != buffer_size && errno )
{ pp(); show_error( "Read error", errno ); fatal(); }
}
delete[] base_buffer;
courier.finish(); // no more packets to send
return 0;
}
struct Worker_arg
{
Packet_courier * courier;
const File_index * file_index;
const Pretty_print * pp;
int worker_id;
int num_workers;
int infd;
int outfd;
};
// consume packets from courier, decompress their contents, and
// give the produced packets to courier.
// read members from file, decompress their contents, and
// write the produced data to file.
extern "C" void * dworker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
Packet_courier & courier = *tmp.courier;
const File_index & file_index = *tmp.file_index;
const Pretty_print & pp = *tmp.pp;
const int worker_id = tmp.worker_id;
const int new_data_size = max_packet_size;
const int num_workers = tmp.num_workers;
const int infd = tmp.infd;
const int outfd = tmp.outfd;
const int buffer_size = 65536;
uint8_t * new_data = new( std::nothrow ) uint8_t[new_data_size];
uint8_t * const ibuffer = new( std::nothrow ) uint8_t[buffer_size];
uint8_t * const obuffer = new( std::nothrow ) uint8_t[buffer_size];
LZ_Decoder * const decoder = LZ_decompress_open();
if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
if( !ibuffer || !obuffer || !decoder ||
LZ_decompress_errno( decoder ) != LZ_ok )
{ pp( "Not enough memory" ); fatal(); }
int new_pos = 0;
while( true )
for( int i = worker_id; i < file_index.members(); i += num_workers )
{
const Packet * const ipacket = courier.distribute_packet( worker_id );
if( ipacket == 0 ) break; // no more packets to process
if( ipacket->data == 0 ) LZ_decompress_finish( decoder );
long long data_pos = file_index.dblock( i ).pos();
long long data_rest = file_index.dblock( i ).size();
long long member_pos = file_index.mblock( i ).pos();
long long member_rest = file_index.mblock( i ).size();
int written = 0;
while( true )
while( member_rest > 0 )
{
if( LZ_decompress_write_size( decoder ) > 0 && written < ipacket->size )
while( LZ_decompress_write_size( decoder ) > 0 )
{
const int wr = LZ_decompress_write( decoder, ipacket->data + written,
ipacket->size - written );
if( wr < 0 ) internal_error( "library error (LZ_decompress_write)" );
written += wr;
if( written > ipacket->size )
internal_error( "ipacket size exceeded in worker" );
}
while( true ) // read and pack decompressed data
{
const int rd = LZ_decompress_read( decoder, new_data + new_pos,
new_data_size - new_pos );
if( rd < 0 )
const int size = std::min( LZ_decompress_write_size( decoder ),
(int)std::min( (long long)buffer_size, member_rest ) );
if( size > 0 )
{
pp();
if( verbosity >= 0 )
std::fprintf( stderr, "LZ_decompress_read error in worker %d: %s.\n",
worker_id, LZ_strerror( LZ_decompress_errno( decoder ) ) );
fatal();
if( preadblock( infd, ibuffer, size, member_pos ) != size )
{ pp(); show_error( "Read error", errno ); fatal(); }
member_pos += size;
member_rest -= size;
if( LZ_decompress_write( decoder, ibuffer, size ) != size )
internal_error( "library error (LZ_decompress_write)" );
}
new_pos += rd;
if( new_pos > new_data_size )
internal_error( "opacket size exceeded in worker" );
if( new_pos == new_data_size || LZ_decompress_finished( decoder ) == 1 )
if( member_rest <= 0 ) { LZ_decompress_finish( decoder ); break; }
}
while( true ) // write decompressed data to file
{
const int rd = LZ_decompress_read( decoder, obuffer, buffer_size );
if( rd < 0 )
fatal( decompress_read_error( decoder, pp, worker_id ) );
if( rd > 0 && outfd >= 0 )
{
if( new_pos > 0 ) // make data packet
const int wr = pwriteblock( outfd, obuffer, rd, data_pos );
if( wr != rd )
{
Packet * opacket = new Packet;
opacket->data = new_data;
opacket->size = new_pos;
courier.collect_packet( opacket, worker_id );
new_pos = 0;
new_data = new( std::nothrow ) uint8_t[new_data_size];
if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); }
}
if( LZ_decompress_finished( decoder ) == 1 )
{
LZ_decompress_reset( decoder ); // prepare for new ipacket
Packet * opacket = new Packet; // end of member token
opacket->data = 0;
opacket->size = 0;
courier.collect_packet( opacket, worker_id );
break;
pp();
if( verbosity >= 0 )
std::fprintf( stderr, "Write error in worker %d: %s\n",
worker_id, std::strerror( errno ) );
fatal();
}
}
if( rd > 0 )
{
data_pos += rd;
data_rest -= rd;
}
if( LZ_decompress_finished( decoder ) == 1 )
{
if( data_rest != 0 )
internal_error( "final data_rest != 0" );
LZ_decompress_reset( decoder ); // prepare for new member
break;
}
if( rd == 0 ) break;
}
if( ipacket->data == 0 ) { delete ipacket; break; }
if( written == ipacket->size )
{ delete[] ipacket->data; delete ipacket; break; }
}
}
delete[] new_data;
delete[] obuffer; delete[] ibuffer;
if( LZ_decompress_member_position( decoder ) != 0 )
{ pp( "Error, some data remains in decoder" ); fatal(); }
if( LZ_decompress_close( decoder ) < 0 )
@ -413,112 +190,76 @@ extern "C" void * dworker( void * arg )
return 0;
}
// get from courier the processed and sorted packets, and write
// their contents to the output file.
void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
{
while( true )
{
Packet * opacket = courier.deliver_packet();
if( opacket == 0 ) break; // queue is empty. all workers exited
out_size += opacket->size;
if( outfd >= 0 )
{
const int wr = writeblock( outfd, opacket->data, opacket->size );
if( wr != opacket->size )
{ pp(); show_error( "Write error", errno ); fatal(); }
}
delete[] opacket->data;
delete opacket;
}
}
} // end namespace
// init the courier, then start the splitter and the workers and
// call the muxer.
int decompress( const int num_workers, const int infd, const int outfd,
// start the workers and wait for them to finish.
int decompress( int num_workers, const int infd, const int outfd,
const Pretty_print & pp, const int debug_level,
const bool testing )
const bool testing, const bool infd_isreg )
{
const int slots_per_worker = 2;
const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
num_workers * slots_per_worker : INT_MAX );
in_size = 0;
out_size = 0;
Packet_courier courier( num_workers, num_slots );
if( !infd_isreg )
return dec_stream( num_workers, infd, outfd, pp, debug_level, testing );
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;
splitter_arg.pp = &pp;
splitter_arg.infd = infd;
const File_index file_index( infd );
if( file_index.retval() == 1 )
{
lseek( infd, 0, SEEK_SET );
return dec_stream( num_workers, infd, outfd, pp, debug_level, testing );
}
if( file_index.retval() != 0 )
{ show_error( file_index.error().c_str() ); return file_index.retval(); }
pthread_t splitter_thread;
int errcode = pthread_create( &splitter_thread, 0, dsplitter, &splitter_arg );
if( errcode )
{ show_error( "Can't create splitter thread", errcode ); fatal(); }
if( num_workers > file_index.members() )
num_workers = file_index.members();
if( outfd >= 0 )
{
struct stat st;
if( fstat( outfd, &st ) != 0 || !S_ISREG( st.st_mode ) ||
lseek( outfd, 0, SEEK_CUR ) < 0 )
return dec_stdout( num_workers, infd, outfd, pp, debug_level, file_index );
}
Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( worker_args == 0 || worker_threads == 0 )
if( !worker_args || !worker_threads )
{ pp( "Not enough memory" ); fatal(); }
for( int i = 0; i < num_workers; ++i )
{
worker_args[i].courier = &courier;
worker_args[i].file_index = &file_index;
worker_args[i].pp = &pp;
worker_args[i].worker_id = i;
errcode = pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] );
worker_args[i].num_workers = num_workers;
worker_args[i].infd = infd;
worker_args[i].outfd = outfd;
const int errcode =
pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] );
if( errcode )
{ show_error( "Can't create worker threads", errcode ); fatal(); }
}
muxer( courier, pp, outfd );
for( int i = num_workers - 1; i >= 0; --i )
{
errcode = pthread_join( worker_threads[i], 0 );
const int errcode = pthread_join( worker_threads[i], 0 );
if( errcode )
{ show_error( "Can't join worker threads", errcode ); fatal(); }
}
delete[] worker_threads; worker_threads = 0;
delete[] worker_args; worker_args = 0;
delete[] worker_threads;
delete[] worker_args;
errcode = pthread_join( splitter_thread, 0 );
if( errcode )
{ show_error( "Can't join splitter thread", errcode ); fatal(); }
if( verbosity >= 3 && out_size > 0 && in_size > 0 )
const unsigned long long in_size = file_index.file_end();
const unsigned long long out_size = file_index.data_end();
if( verbosity >= 2 && out_size > 0 && in_size > 0 )
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, %5.2f%% saved. ",
(double)out_size / in_size,
( 8.0 * in_size ) / out_size,
100.0 * ( 1.0 - ( (double)in_size / out_size ) ) );
if( verbosity >= 2 )
std::fprintf( stderr, "decompressed size %9lld, size %9lld. ",
if( verbosity >= 3 )
std::fprintf( stderr, "decompressed size %9llu, size %9llu. ",
out_size, in_size );
if( verbosity >= 1 )
{ if( testing ) std::fprintf( stderr, "ok\n" );
else std::fprintf( stderr, "done\n" ); }
if( verbosity >= 1 ) std::fprintf( stderr, testing ? "ok\n" : "done\n" );
if( debug_level & 1 )
std::fprintf( stderr,
"splitter tried to send a packet %8lu times\n"
"splitter had to wait %8lu times\n"
"any worker tried to consume from splitter %8lu times\n"
"any worker had to wait %8lu times\n"
"muxer tried to consume from workers %8lu times\n"
"muxer had to wait %8lu times\n",
courier.tally().check_counter,
courier.tally().wait_counter,
courier.icheck_counter,
courier.iwait_counter,
courier.ocheck_counter,
courier.owait_counter );
if( !courier.finished() ) internal_error( "courier not finished" );
return 0;
}