1
0
Fork 0

Adding upstream version 0.5.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 03:26:46 +01:00
parent 393d6b555b
commit 94544fa4c2
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
12 changed files with 587 additions and 225 deletions

View file

@ -1,3 +1,8 @@
2010-02-10 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.5 released.
* Parallel decompression has been implemented.
2010-01-31 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.4 released.

7
NEWS
View file

@ -1,6 +1,3 @@
Changes in version 0.4:
Changes in version 0.5:
The option "--version" now shows the version of lzlib being used.
A code reorganization has been made. The new class "Packet_courier" now
coordinates data movement and synchronization among threads.
Parallel decompression has been implemented.

21
README
View file

@ -1,18 +1,15 @@
Description
Plzip is a massively parallel (multithreaded) data compressor compatible
with the lzip file format. The files produced by plzip are fully
compatible with lzip-1.4 or newer. Plzip is intended for faster
compression/decompression of big files on multiprocessor machines. On
files big enough, plzip can use hundreds of processors. Currently only
compression is performed in parallel. Parallel decompression is planned
to be implemented later.
Plzip is a massively parallel (multi-threaded), lossless data compressor
based on the LZMA algorithm, with very safe integrity checking and a
user interface similar to the one of gzip or bzip2. Plzip uses the lzip
file format; the files produced by plzip are fully compatible with
lzip-1.4 or newer.
Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of
gzip or bzip2. Lzip decompresses almost as fast as gzip and compresses
better than bzip2, which makes it well suited for software distribution
and data archiving.
Plzip is intended for faster compression/decompression of big files on
multiprocessor machines, which makes it specially well suited for
distribution of big software files and large scale data archiving. On
files big enough, plzip can use hundreds of processors.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.

View file

@ -19,7 +19,6 @@
#define _FILE_OFFSET_BITS 64
#include <algorithm>
#include <cassert>
#include <cerrno>
#include <climits>
#include <csignal>
@ -49,70 +48,70 @@
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_mutex_init( mutex, 0 );
if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
int errcode = pthread_mutex_init( mutex, 0 );
if( errcode ) { show_error( "pthread_mutex_init", errcode ); fatal(); }
ret = pthread_cond_init( cond, 0 );
if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
errcode = pthread_cond_init( cond, 0 );
if( errcode ) { show_error( "pthread_cond_init", errcode ); fatal(); }
}
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_destroy( cond );
if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
int errcode = pthread_cond_destroy( cond );
if( errcode ) { show_error( "pthread_cond_destroy", errcode ); fatal(); }
ret = pthread_mutex_destroy( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
errcode = pthread_mutex_destroy( mutex );
if( errcode ) { show_error( "pthread_mutex_destroy", errcode ); fatal(); }
}
void xlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_lock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
int errcode = pthread_mutex_lock( mutex );
if( errcode ) { show_error( "pthread_mutex_lock", errcode ); fatal(); }
}
void xunlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_unlock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
int errcode = pthread_mutex_unlock( mutex );
if( errcode ) { show_error( "pthread_mutex_unlock", errcode ); fatal(); }
}
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_wait( cond, mutex );
if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
int errcode = pthread_cond_wait( cond, mutex );
if( errcode ) { show_error( "pthread_cond_wait", errcode ); fatal(); }
}
void xsignal( pthread_cond_t * cond )
{
int ret = pthread_cond_signal( cond );
if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
int errcode = pthread_cond_signal( cond );
if( errcode ) { show_error( "pthread_cond_signal", errcode ); fatal(); }
}
void xbroadcast( pthread_cond_t * cond )
{
int ret = pthread_cond_broadcast( cond );
if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
int errcode = pthread_cond_broadcast( cond );
if( errcode ) { show_error( "pthread_cond_broadcast", errcode ); fatal(); }
}
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
{
int ret = pthread_create( thread, 0, routine, arg );
if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
int errcode = pthread_create( thread, 0, routine, arg );
if( errcode ) { show_error( "pthread_create", errcode ); fatal(); }
}
void xjoin( pthread_t thread )
{
int ret = pthread_join( thread, 0 );
if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
int errcode = pthread_join( thread, 0 );
if( errcode ) { show_error( "pthread_join", errcode ); fatal(); }
}
@ -125,8 +124,8 @@ long long out_size = 0;
struct Packet // data block with a serial number
{
unsigned long long id; // serial number assigned as received
int size; // # of bytes in data
uint8_t * data;
int size; // number of bytes in data (if any)
};
@ -168,8 +167,8 @@ public:
{
Packet * ipacket = new Packet;
ipacket->id = receive_id++;
ipacket->size = size;
ipacket->data = data;
ipacket->size = size;
slot_tally.get_slot(); // wait for a free slot
xlock( &imutex );
packet_queue.push( ipacket );
@ -210,7 +209,8 @@ public:
{
xlock( &omutex );
// id collision shouldn't happen
assert( circular_buffer[opacket->id%num_slots] == 0 );
if( circular_buffer[opacket->id%num_slots] != 0 )
internal_error( "id collision in collect_packet" );
// Merge packet into circular buffer
circular_buffer[opacket->id%num_slots] = opacket;
if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
@ -260,6 +260,7 @@ public:
struct Splitter_arg
{
Packet_courier * courier;
const Pretty_print * pp;
int infd;
int data_size;
};
@ -271,15 +272,17 @@ void * splitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
Packet_courier & courier = *tmp.courier;
const Pretty_print & pp = *tmp.pp;
const int infd = tmp.infd;
const int data_size = tmp.data_size;
for( bool first_post = true; ; first_post = false )
{
uint8_t * data = new( std::nothrow ) uint8_t[data_size];
if( data == 0 ) { show_error( "not enough memory" ); fatal(); }
if( data == 0 ) { pp( "not enough memory" ); fatal(); }
const int size = readblock( infd, data, data_size );
if( size != data_size && errno ) { show_error( "read", errno ); fatal(); }
if( size != data_size && errno )
{ pp(); show_error( "read error", errno ); fatal(); }
if( size > 0 || first_post ) // first packet can be empty
{
@ -299,9 +302,10 @@ void * splitter( void * arg )
struct Worker_arg
{
Packet_courier * courier;
const Pretty_print * pp;
int dictionary_size;
int match_len_limit;
Packet_courier * courier;
};
@ -310,9 +314,10 @@ struct Worker_arg
void * worker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
Packet_courier & courier = *tmp.courier;
const Pretty_print & pp = *tmp.pp;
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
Packet_courier & courier = *tmp.courier;
while( true )
{
@ -321,13 +326,19 @@ void * worker( void * arg )
const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size];
if( new_data == 0 ) { show_error( "not enough memory" ); fatal(); }
if( new_data == 0 ) { pp( "not enough memory" ); fatal(); }
const int dict_size = std::max( LZ_min_dictionary_size(),
std::min( dictionary_size, packet->size ) );
LZ_Encoder * const encoder =
LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
{ show_error( "LZ_compress_open failed." ); fatal(); }
{
if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error )
pp( "not enough memory. Try a smaller dictionary size" );
else
internal_error( "invalid argument to encoder" );
fatal();
}
int written = 0;
int new_size = 0;
@ -339,25 +350,33 @@ void * worker( void * arg )
{
const int wr = LZ_compress_write( encoder, packet->data + written,
packet->size - written );
if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
if( wr < 0 ) internal_error( "library error (LZ_compress_write)" );
written += wr;
}
if( written >= packet->size ) LZ_compress_finish( encoder );
}
const int rd = LZ_compress_read( encoder, new_data + new_size,
compr_size - new_size );
if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
if( rd < 0 )
{
pp();
if( verbosity >= 0 )
std::fprintf( stderr, "LZ_compress_read error: %s.\n",
LZ_strerror( LZ_compress_errno( encoder ) ) );
fatal();
}
new_size += rd;
assert( new_size <= compr_size );
if( new_size > compr_size )
internal_error( "packet size exceeded in worker" );
if( LZ_compress_finished( encoder ) == 1 ) break;
}
if( LZ_compress_close( encoder ) < 0 )
{ show_error( "LZ_compress_close failed." ); fatal(); }
{ pp( "LZ_compress_close failed" ); fatal(); }
delete[] packet->data;
packet->size = new_size;
packet->data = new_data;
packet->size = new_size;
courier.collect_packet( packet );
}
return 0;
@ -366,7 +385,7 @@ void * worker( void * arg )
// get from courier the processed and sorted packets, and write
// their contents to the output file.
void muxer( Packet_courier & courier, const int outfd )
void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
{
while( true )
{
@ -379,7 +398,7 @@ void muxer( Packet_courier & courier, const int outfd )
{
const int wr = writeblock( outfd, opacket->data, opacket->size );
if( wr != opacket->size )
{ show_error( "write", errno ); fatal(); }
{ pp(); show_error( "write error", errno ); fatal(); }
}
delete[] opacket->data;
delete opacket;
@ -394,7 +413,7 @@ void muxer( Packet_courier & courier, const int outfd )
int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd,
const int debug_level )
const Pretty_print & pp, const int debug_level )
{
in_size = 0;
out_size = 0;
@ -402,6 +421,7 @@ int compress( const int data_size, const int dictionary_size,
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;
splitter_arg.pp = &pp;
splitter_arg.infd = infd;
splitter_arg.data_size = data_size;
@ -409,20 +429,21 @@ int compress( const int data_size, const int dictionary_size,
xcreate( &splitter_thread, splitter, &splitter_arg );
Worker_arg worker_arg;
worker_arg.courier = &courier;
worker_arg.pp = &pp;
worker_arg.dictionary_size = dictionary_size;
worker_arg.match_len_limit = match_len_limit;
worker_arg.courier = &courier;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( worker_threads == 0 )
{ show_error( "not enough memory" ); fatal(); }
{ pp( "not enough memory" ); fatal(); }
for( int i = 0; i < num_workers; ++i )
xcreate( &worker_threads[i], worker, &worker_arg );
muxer( courier, outfd );
muxer( courier, pp, outfd );
for( int i = num_workers - 1; i >= 0; --i )
xjoin(worker_threads[i]);
xjoin( worker_threads[i] );
delete[] worker_threads; worker_threads = 0;
xjoin( splitter_thread );
@ -455,6 +476,6 @@ int compress( const int data_size, const int dictionary_size,
courier.ocheck_counter,
courier.owait_counter );
assert( courier.finished() );
if( !courier.finished() ) internal_error( "courier not finished" );
return 0;
}

4
configure vendored
View file

@ -5,12 +5,12 @@
# This configure script is free software: you have unlimited permission
# to copy, distribute and modify it.
#
# Date of this version: 2010-01-31
# Date of this version: 2010-02-10
args=
no_create=
pkgname=plzip
pkgversion=0.4
pkgversion=0.5
progname=plzip
srctrigger=plzip.h

View file

@ -38,86 +38,429 @@
namespace {
int do_decompress( LZ_Decoder * const decoder, const int infd, const int outfd,
const Pretty_print & pp, const bool testing )
long long in_size = 0;
long long out_size = 0;
struct Packet // data block
{
const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
uint8_t * data; // data == 0 means end of member
int size; // number of bytes in data (if any)
};
class Packet_courier // moves packets around
{
public:
unsigned long icheck_counter;
unsigned long iwait_counter;
unsigned long ocheck_counter;
unsigned long owait_counter;
private:
int receive_worker_id; // worker queue currently receiving packets
int deliver_worker_id; // worker queue currently delivering packets
Slot_tally slot_tally;
std::vector< std::queue< Packet * > > ipacket_queues;
std::vector< std::queue< Packet * > > opacket_queues;
int num_working; // Number of workers still running
const int num_workers; // Number of workers
const int num_slots; // max packets in circulation
pthread_mutex_t imutex;
pthread_cond_t iav_or_eof; // input packet available or splitter done
pthread_mutex_t omutex;
pthread_cond_t oav_or_exit; // output packet available or all workers exited
bool eof; // splitter done
public:
Packet_courier( const int workers, const int slots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
receive_worker_id( 0 ), deliver_worker_id( 0 ),
slot_tally( slots ), ipacket_queues( workers ),
opacket_queues( workers ), num_working( workers ),
num_workers( workers ), num_slots( slots ), eof( false )
{ xinit( &iav_or_eof, &imutex ); xinit( &oav_or_exit, &omutex ); }
~Packet_courier()
{ xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); }
// make a packet with data received from splitter
// if data == 0, move to next queue
void receive_packet( uint8_t * const data, const int size )
{
Packet * ipacket = new Packet;
ipacket->data = data;
ipacket->size = size;
if( data != 0 )
{ in_size += size; slot_tally.get_slot(); } // wait for a free slot
xlock( &imutex );
ipacket_queues[receive_worker_id].push( ipacket );
xbroadcast( &iav_or_eof );
xunlock( &imutex );
if( data == 0 && ++receive_worker_id >= num_workers )
receive_worker_id = 0;
}
// distribute a packet to a worker
Packet * distribute_packet( const int worker_id )
{
Packet * ipacket = 0;
xlock( &imutex );
++icheck_counter;
while( ipacket_queues[worker_id].empty() && !eof )
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
}
if( !ipacket_queues[worker_id].empty() )
{
ipacket = ipacket_queues[worker_id].front();
ipacket_queues[worker_id].pop();
}
xunlock( &imutex );
if( ipacket != 0 )
{ if( ipacket->data != 0 ) slot_tally.leave_slot(); }
else
{
// Notify muxer when last worker exits
xlock( &omutex );
if( --num_working == 0 )
xsignal( &oav_or_exit );
xunlock( &omutex );
}
return ipacket;
}
// collect a packet from a worker
void collect_packet( Packet * const opacket, const int worker_id )
{
xlock( &omutex );
opacket_queues[worker_id].push( opacket );
if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
// deliver a packet to muxer
// if packet data == 0, move to next queue and wait again
Packet * deliver_packet()
{
Packet * opacket = 0;
xlock( &omutex );
++ocheck_counter;
while( true )
{
while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
}
if( opacket_queues[deliver_worker_id].empty() ) break;
opacket = opacket_queues[deliver_worker_id].front();
opacket_queues[deliver_worker_id].pop();
if( opacket->data != 0 ) break;
else
{
if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
delete opacket; opacket = 0;
}
}
xunlock( &omutex );
return opacket;
}
void finish() // splitter has no more packets to send
{
xlock( &imutex );
eof = true;
xbroadcast( &iav_or_eof );
xunlock( &imutex );
}
bool finished() // all packets delivered to muxer
{
if( !slot_tally.all_free() || !eof || num_working != 0 ) return false;
for( int i = 0; i < num_workers; ++i )
if( !ipacket_queues[i].empty() ) return false;
for( int i = 0; i < num_workers; ++i )
if( !opacket_queues[i].empty() ) return false;
return true;
}
const Slot_tally & tally() const { return slot_tally; }
};
struct Splitter_arg
{
Packet_courier * courier;
const Pretty_print * pp;
int infd;
int packet_size;
};
// split data from input file into chunks and pass them to
// courier for packaging and distribution to workers.
void * splitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
Packet_courier & courier = *tmp.courier;
const Pretty_print & pp = *tmp.pp;
const int infd = tmp.infd;
const int hsize = 6; // header size
const int tsize = 20; // trailer size
const int buffer_size = tmp.packet_size;
const int base_buffer_size = tsize + buffer_size + hsize;
uint8_t * const base_buffer = new( std::nothrow ) uint8_t[base_buffer_size];
if( base_buffer == 0 ) { pp( "not enough memory" ); fatal(); }
uint8_t * const buffer = base_buffer + tsize;
int size = readblock( infd, buffer, buffer_size + hsize ) - hsize;
bool at_stream_end = ( size < buffer_size );
if( size != buffer_size && errno )
{ pp(); show_error( "read error", errno ); fatal(); }
if( size <= tsize || buffer[0] != 'L' || buffer[1] != 'Z' ||
buffer[2] != 'I' || buffer[3] != 'P' )
{ pp( "bad magic number (file not in lzip format)" ); fatal(); }
long long partial_member_size = 0;
while( true )
{
int pos = 0;
for( int newpos = 1; newpos <= size; ++newpos )
if( buffer[newpos] == 'L' && buffer[newpos+1] == 'Z' &&
buffer[newpos+2] == 'I' && buffer[newpos+3] == 'P' )
{
long long member_size = 0;
for( int i = 1; i <= 8; ++i )
{ member_size <<= 8; member_size += base_buffer[tsize+newpos-i]; }
if( partial_member_size + newpos - pos == member_size )
{ // header found
uint8_t * data = new( std::nothrow ) uint8_t[newpos - pos];
if( data == 0 ) { pp( "not enough memory" ); fatal(); }
std::memcpy( data, buffer + pos, newpos - pos );
courier.receive_packet( data, newpos - pos );
courier.receive_packet( 0, 0 ); // end of member token
partial_member_size = 0;
pos = newpos;
}
}
if( at_stream_end )
{
uint8_t * data = new( std::nothrow ) uint8_t[size + hsize - pos];
if( data == 0 ) { pp( "not enough memory" ); fatal(); }
std::memcpy( data, buffer + pos, size + hsize - pos );
courier.receive_packet( data, size + hsize - pos );
courier.receive_packet( 0, 0 ); // end of member token
break;
}
if( pos < buffer_size )
{
partial_member_size += buffer_size - pos;
uint8_t * data = new( std::nothrow ) uint8_t[buffer_size - pos];
if( data == 0 ) { pp( "not enough memory" ); fatal(); }
std::memcpy( data, buffer + pos, buffer_size - pos );
courier.receive_packet( data, buffer_size - pos );
}
std::memcpy( base_buffer, base_buffer + buffer_size, tsize + hsize );
size = readblock( infd, buffer + hsize, buffer_size );
at_stream_end = ( size < buffer_size );
if( size != buffer_size && errno )
{ pp(); show_error( "read error", errno ); fatal(); }
}
delete[] base_buffer;
courier.finish(); // no more packets to send
return 0;
}
struct Worker_arg
{
Packet_courier * courier;
const Pretty_print * pp;
int worker_id;
int packet_size;
};
// consume packets from courier, decompress their contents, and
// give the produced packets to courier.
void * worker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
Packet_courier & courier = *tmp.courier;
const Pretty_print & pp = *tmp.pp;
const int worker_id = tmp.worker_id;
const int new_data_size = tmp.packet_size;
uint8_t * new_data = new( std::nothrow ) uint8_t[new_data_size];
LZ_Decoder * const decoder = LZ_decompress_open();
if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
{ pp( "not enough memory" ); fatal(); }
int new_pos = 0;
while( true )
{
int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
if( in_size > 0 )
Packet * ipacket = courier.distribute_packet( worker_id );
if( ipacket == 0 ) break; // no more packets to process
if( ipacket->data == 0 ) LZ_decompress_finish( decoder );
int written = 0;
while( true )
{
const int max_in_size = in_size;
in_size = readblock( infd, in_buffer, max_in_size );
if( in_size != max_in_size && errno )
{ pp(); show_error( "read error", errno ); return 1; }
if( in_size == 0 ) LZ_decompress_finish( decoder );
else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) )
internal_error( "library error (LZ_decompress_write)" );
if( LZ_decompress_write_size( decoder ) > 0 && written < ipacket->size )
{
const int wr = LZ_decompress_write( decoder, ipacket->data + written,
ipacket->size - written );
if( wr < 0 ) internal_error( "library error (LZ_decompress_write)" );
written += wr;
if( written > ipacket->size )
internal_error( "ipacket size exceeded in worker" );
}
int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size );
// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size );
if( out_size < 0 )
while( true ) // read and pack decompressed data
{
const LZ_Errno lz_errno = LZ_decompress_errno( decoder );
if( lz_errno == LZ_header_error )
const int rd = LZ_decompress_read( decoder, new_data + new_pos,
new_data_size - new_pos );
if( rd < 0 )
{
if( LZ_decompress_total_out_size( decoder ) > 0 )
break; // trailing garbage
pp( "error reading member header" );
return 1;
}
if( lz_errno == LZ_mem_error )
{
pp( "not enough memory. Find a machine with more memory" );
return 1;
}
pp();
if( lz_errno == LZ_unexpected_eof )
{
if( verbosity >= 0 )
std::fprintf( stderr, "file ends unexpectedly at pos %lld\n",
LZ_decompress_total_in_size( decoder ) );
return 2;
std::fprintf( stderr, "LZ_decompress_read error in worker %d: %s.\n",
worker_id, LZ_strerror( LZ_decompress_errno( decoder ) ) );
fatal();
}
if( verbosity >= 0 )
std::fprintf( stderr, "LZ_decompress_read error: %s.\n",
LZ_strerror( LZ_decompress_errno( decoder ) ) );
return 1;
}
else if( out_size > 0 && outfd >= 0 )
new_pos += rd;
if( new_pos > new_data_size )
internal_error( "opacket size exceeded in worker" );
if( new_pos == new_data_size || LZ_decompress_finished( decoder ) == 1 )
{
const int wr = writeblock( outfd, out_buffer, out_size );
if( wr != out_size )
{ pp(); show_error( "write error", errno ); return 1; }
if( new_pos > 0 ) // make data packet
{
Packet * opacket = new Packet;
opacket->data = new_data;
opacket->size = new_pos;
courier.collect_packet( opacket, worker_id );
new_pos = 0;
new_data = new( std::nothrow ) uint8_t[new_data_size];
if( new_data == 0 ) { pp( "not enough memory" ); fatal(); }
}
if( LZ_decompress_finished( decoder ) == 1 ) break;
if( in_size == 0 && out_size == 0 )
internal_error( "library error (LZ_decompress_read)" );
if( LZ_decompress_finished( decoder ) == 1 )
{
LZ_decompress_reset( decoder );
Packet * opacket = new Packet; // end of member token
opacket->data = 0;
opacket->size = 0;
courier.collect_packet( opacket, worker_id );
break;
}
if( verbosity >= 1 )
{ if( testing ) std::fprintf( stderr, "ok\n" );
else std::fprintf( stderr, "done\n" ); }
}
if( rd == 0 ) break;
}
if( ipacket->data == 0 ) { delete ipacket; break; }
if( written == ipacket->size )
{ delete[] ipacket->data; delete ipacket; break; }
}
}
delete[] new_data;
if( LZ_decompress_total_in_size( decoder ) != 0 )
{ pp( "error, remaining data in decoder" ); fatal(); }
LZ_decompress_close( decoder );
return 0;
}
// get from courier the processed and sorted packets, and write
// their contents to the output file.
void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
{
while( true )
{
Packet * opacket = courier.deliver_packet();
if( opacket == 0 ) break; // queue is empty. all workers exited
out_size += opacket->size;
if( outfd >= 0 )
{
const int wr = writeblock( outfd, opacket->data, opacket->size );
if( wr != opacket->size )
{ pp(); show_error( "write error", errno ); fatal(); }
}
delete[] opacket->data;
delete opacket;
}
}
} // end namespace
int decompress( const int infd, const int outfd, const Pretty_print & pp,
const bool testing )
// init the courier, then start the splitter and the workers and
// call the muxer.
int decompress( const int num_workers, const int num_slots,
const int infd, const int outfd, const Pretty_print & pp,
const int debug_level, const bool testing )
{
LZ_Decoder * const decoder = LZ_decompress_open();
int retval;
in_size = 0;
out_size = 0;
const int packet_size = 1 << 20;
Packet_courier courier( num_workers, num_slots );
if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;
splitter_arg.pp = &pp;
splitter_arg.infd = infd;
splitter_arg.packet_size = packet_size;
pthread_t splitter_thread;
xcreate( &splitter_thread, splitter, &splitter_arg );
Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( worker_args == 0 || worker_threads == 0 )
{ pp( "not enough memory" ); fatal(); }
for( int i = 0; i < num_workers; ++i )
{
pp( "not enough memory" );
retval = 1;
worker_args[i].courier = &courier;
worker_args[i].pp = &pp;
worker_args[i].worker_id = i;
worker_args[i].packet_size = packet_size;
xcreate( &worker_threads[i], worker, &worker_args[i] );
}
else retval = do_decompress( decoder, infd, outfd, pp, testing );
LZ_decompress_close( decoder );
return retval;
muxer( courier, pp, outfd );
for( int i = num_workers - 1; i >= 0; --i )
xjoin( worker_threads[i] );
delete[] worker_threads; worker_threads = 0;
delete[] worker_args; worker_args = 0;
xjoin( splitter_thread );
if( verbosity >= 2 )
std::fprintf( stderr, "decompressed size %9lld, size %9lld. ",
out_size, in_size );
if( verbosity >= 1 )
{ if( testing ) std::fprintf( stderr, "ok\n" );
else std::fprintf( stderr, "done\n" ); }
if( debug_level & 1 )
std::fprintf( stderr,
"splitter tried to send a packet %8lu times\n"
"splitter had to wait %8lu times\n"
"any worker tried to consume from splitter %8lu times\n"
"any worker had to wait %8lu times\n"
"muxer tried to consume from workers %8lu times\n"
"muxer had to wait %8lu times\n",
courier.tally().check_counter,
courier.tally().wait_counter,
courier.icheck_counter,
courier.iwait_counter,
courier.ocheck_counter,
courier.owait_counter );
if( !courier.finished() ) internal_error( "courier not finished" );
return 0;
}

View file

@ -1,5 +1,5 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36.
.TH PLZIP "1" "January 2010" "Plzip 0.4" "User Commands"
.TH PLZIP "1" "February 2010" "Plzip 0.5" "User Commands"
.SH NAME
Plzip \- data compressor based on the LZMA algorithm
.SH SYNOPSIS

View file

@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir)
Plzip Manual
************
This manual is for Plzip (version 0.4, 31 January 2010).
This manual is for Plzip (version 0.5, 10 February 2010).
* Menu:
@ -34,19 +34,16 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T
1 Introduction
**************
Plzip is a massively parallel (multithreaded) data compressor compatible
with the lzip file format. The files produced by plzip are fully
compatible with lzip-1.4 or newer. Plzip is intended for faster
compression/decompression of big files on multiprocessor machines. On
files big enough, plzip can use hundreds of processors. Currently only
compression is performed in parallel. Parallel decompression is planned
to be implemented later.
Plzip is a massively parallel (multi-threaded), lossless data compressor
based on the LZMA algorithm, with very safe integrity checking and a
user interface similar to the one of gzip or bzip2. Plzip uses the lzip
file format; the files produced by plzip are fully compatible with
lzip-1.4 or newer.
Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of
gzip or bzip2. Lzip decompresses almost as fast as gzip and compresses
better than bzip2, which makes it well suited for software distribution
and data archiving.
Plzip is intended for faster compression/decompression of big files
on multiprocessor machines, which makes it specially well suited for
distribution of big software files and large scale data archiving. On
files big enough, plzip can use hundreds of processors.
Plzip replaces every file given in the command line with a compressed
version of itself, with the name "original_name.lz". Each compressed
@ -112,9 +109,9 @@ The format for running plzip is:
`-B'
Set the input data block size in bytes. The input file will be
divided in chunks of this size before compression is performed.
Valid values range from 100kB to 1GiB. Default value is two times
the dictionary size. It is a waste of memory to choose a data size
smaller than the dictionary size.
Valid values range from 8KiB to 1GiB. Default value is two times
the dictionary size. It is a waste of memory to choose a data
size smaller than the dictionary size.
`--stdout'
`-c'
@ -306,10 +303,10 @@ Concept Index

Tag Table:
Node: Top223
Node: Introduction746
Node: Invoking Plzip3669
Node: File Format7358
Node: Problems9314
Node: Concept Index9843
Node: Introduction747
Node: Invoking Plzip3489
Node: File Format7178
Node: Problems9134
Node: Concept Index9663

End Tag Table

View file

@ -5,8 +5,8 @@
@finalout
@c %**end of header
@set UPDATED 31 January 2010
@set VERSION 0.4
@set UPDATED 10 February 2010
@set VERSION 0.5
@dircategory Data Compression
@direntry
@ -50,19 +50,16 @@ to copy, distribute and modify it.
@chapter Introduction
@cindex introduction
Plzip is a massively parallel (multithreaded) data compressor compatible
with the lzip file format. The files produced by plzip are fully
compatible with lzip-1.4 or newer. Plzip is intended for faster
compression/decompression of big files on multiprocessor machines. On
files big enough, plzip can use hundreds of processors. Currently only
compression is performed in parallel. Parallel decompression is planned
to be implemented later.
Plzip is a massively parallel (multi-threaded), lossless data compressor
based on the LZMA algorithm, with very safe integrity checking and a
user interface similar to the one of gzip or bzip2. Plzip uses the lzip
file format; the files produced by plzip are fully compatible with
lzip-1.4 or newer.
Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of
gzip or bzip2. Lzip decompresses almost as fast as gzip and compresses
better than bzip2, which makes it well suited for software distribution
and data archiving.
Plzip is intended for faster compression/decompression of big files on
multiprocessor machines, which makes it specially well suited for
distribution of big software files and large scale data archiving. On
files big enough, plzip can use hundreds of processors.
Plzip replaces every file given in the command line with a compressed
version of itself, with the name "original_name.lz". Each compressed
@ -135,8 +132,8 @@ Print the version number of plzip on the standard output and exit.
@itemx -B
Set the input data block size in bytes. The input file will be divided
in chunks of this size before compression is performed. Valid values
range from 100kB to 1GiB. Default value is two times the dictionary
size. It is a waste of memory to choose a data size smaller than the
range from 8KiB to 1GiB. Default value is two times the dictionary size.
It is a waste of memory to choose a data size smaller than the
dictionary size.
@item --stdout

90
main.cc
View file

@ -86,7 +86,7 @@ struct lzma_options
enum Mode { m_compress = 0, m_decompress, m_test };
std::string output_filename;
int outhandle = -1;
int outfd = -1;
bool delete_output_on_interrupt = false;
pthread_t main_thread;
pid_t main_thread_pid;
@ -227,7 +227,7 @@ int open_instream( const std::string & name, struct stat * in_statsp,
const Mode program_mode, const int eindex,
const bool force, const bool to_stdout ) throw()
{
int inhandle = -1;
int infd = -1;
if( program_mode == m_compress && !force && eindex >= 0 )
{
if( verbosity >= 0 )
@ -237,8 +237,8 @@ int open_instream( const std::string & name, struct stat * in_statsp,
}
else
{
inhandle = open( name.c_str(), O_RDONLY | o_binary );
if( inhandle < 0 )
infd = open( name.c_str(), O_RDONLY | o_binary );
if( infd < 0 )
{
if( verbosity >= 0 )
std::fprintf( stderr, "%s: Can't open input file `%s': %s.\n",
@ -246,7 +246,7 @@ int open_instream( const std::string & name, struct stat * in_statsp,
}
else
{
const int i = fstat( inhandle, in_statsp );
const int i = fstat( infd, in_statsp );
const mode_t & mode = in_statsp->st_mode;
if( i < 0 || !( S_ISREG( mode ) || ( to_stdout &&
( S_ISFIFO( mode ) || S_ISSOCK( mode ) ||
@ -256,12 +256,12 @@ int open_instream( const std::string & name, struct stat * in_statsp,
std::fprintf( stderr, "%s: input file `%s' is not a regular file%s.\n",
program_name, name.c_str(),
to_stdout ? "" : " and `--stdout' was not specified" );
close( inhandle );
inhandle = -1;
close( infd );
infd = -1;
}
}
}
return inhandle;
return infd;
}
@ -294,18 +294,18 @@ void set_d_outname( const std::string & name, const int i ) throw()
bool open_outstream( const bool force ) throw()
{
if( force )
outhandle = open( output_filename.c_str(),
outfd = open( output_filename.c_str(),
O_CREAT | O_TRUNC | O_WRONLY | o_binary,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH );
else outhandle = open( output_filename.c_str(),
else outfd = open( output_filename.c_str(),
O_CREAT | O_EXCL | O_WRONLY | o_binary,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH );
if( outhandle < 0 )
if( outfd < 0 )
{
if( errno == EEXIST ) outhandle = -2; else outhandle = -1;
if( errno == EEXIST ) outfd = -2; else outfd = -1;
if( verbosity >= 0 )
{
if( outhandle == -2 )
if( outfd == -2 )
std::fprintf( stderr, "%s: Output file %s already exists, skipping.\n",
program_name, output_filename.c_str() );
else
@ -313,19 +313,19 @@ bool open_outstream( const bool force ) throw()
program_name, output_filename.c_str(), std::strerror( errno ) );
}
}
return ( outhandle >= 0 );
return ( outfd >= 0 );
}
bool check_tty( const int inhandle, const Mode program_mode ) throw()
bool check_tty( const int infd, const Mode program_mode ) throw()
{
if( program_mode == m_compress && isatty( outhandle ) )
if( program_mode == m_compress && isatty( outfd ) )
{
show_error( "I won't write compressed data to a terminal.", 0, true );
return false;
}
if( ( program_mode == m_decompress || program_mode == m_test ) &&
isatty( inhandle ) )
isatty( infd ) )
{
show_error( "I won't read compressed data from a terminal.", 0, true );
return false;
@ -338,10 +338,11 @@ void cleanup_and_fail( const int retval ) throw()
{
if( delete_output_on_interrupt )
{
delete_output_on_interrupt = false;
if( verbosity >= 0 )
std::fprintf( stderr, "%s: Deleting output file `%s', if it exists.\n",
program_name, output_filename.c_str() );
if( outhandle >= 0 ) { close( outhandle ); outhandle = -1; }
if( outfd >= 0 ) { close( outfd ); outfd = -1; }
if( std::remove( output_filename.c_str() ) != 0 )
show_error( "WARNING: deletion of output file (apparently) failed." );
}
@ -355,11 +356,11 @@ void close_and_set_permissions( const struct stat * const in_statsp )
bool error = false;
if( in_statsp )
{
if( fchmod( outhandle, in_statsp->st_mode ) != 0 ) error = true;
else (void)fchown( outhandle, in_statsp->st_uid, in_statsp->st_gid );
if( fchmod( outfd, in_statsp->st_mode ) != 0 ) error = true;
else (void)fchown( outfd, in_statsp->st_uid, in_statsp->st_gid );
// fchown will in many cases return with EPERM, which can be safely ignored.
}
if( close( outhandle ) == 0 ) outhandle = -1;
if( close( outfd ) == 0 ) outfd = -1;
else cleanup_and_fail( 1 );
delete_output_on_interrupt = false;
if( !in_statsp ) return;
@ -388,11 +389,14 @@ extern "C" void signal_handler( int sig ) throw()
}
void set_signals() throw()
void set_signals( const bool to_file ) throw()
{
if( to_file )
{
signal( SIGHUP, signal_handler );
signal( SIGINT, signal_handler );
signal( SIGTERM, signal_handler );
}
signal( SIGUSR1, signal_handler );
}
@ -504,7 +508,7 @@ int main( const int argc, const char * argv[] )
lzma_options encoder_options = option_mapping[5]; // default = "-6"
int data_size = 0;
int debug_level = 0;
int inhandle = -1;
int infd = -1;
int num_workers = 0; // Start this many worker threads
Mode program_mode = m_compress;
bool force = false;
@ -574,7 +578,7 @@ int main( const int argc, const char * argv[] )
case '7': case '8': case '9':
encoder_options = option_mapping[code-'1']; break;
case 'b': break;
case 'B': data_size = getnum( arg, 0, 100000,
case 'B': data_size = getnum( arg, 0, 2 * LZ_min_dictionary_size(),
2 * LZ_max_dictionary_size() ); break;
case 'c': to_stdout = true; break;
case 'd': program_mode = m_decompress; break;
@ -608,7 +612,7 @@ int main( const int argc, const char * argv[] )
if( num_online <= 0 ) num_online = 1;
num_workers = std::min( num_online, max_workers );
}
const int num_slots = std::max( 1, ( num_workers * slots_per_worker ) - 1 );
const int num_slots = num_workers * slots_per_worker;
bool filenames_given = false;
for( ; argind < parser.arguments(); ++argind )
@ -618,13 +622,12 @@ int main( const int argc, const char * argv[] )
}
if( filenames.empty() ) filenames.push_back("-");
if( !to_stdout && program_mode != m_test &&
( filenames_given || default_output_filename.size() ) )
set_signals();
set_signals( !to_stdout && program_mode != m_test &&
( filenames_given || default_output_filename.size() ) );
Pretty_print pp( filenames );
if( program_mode == m_test )
outhandle = -1;
outfd = -1;
int retval = 0;
for( unsigned int i = 0; i < filenames.size(); ++i )
@ -635,11 +638,11 @@ int main( const int argc, const char * argv[] )
if( !filenames[i].size() || filenames[i] == "-" )
{
input_filename.clear();
inhandle = STDIN_FILENO;
infd = STDIN_FILENO;
if( program_mode != m_test )
{
if( to_stdout || !default_output_filename.size() )
outhandle = STDOUT_FILENO;
outfd = STDOUT_FILENO;
else
{
if( program_mode == m_compress )
@ -647,8 +650,8 @@ int main( const int argc, const char * argv[] )
else output_filename = default_output_filename;
if( !open_outstream( force ) )
{
if( outhandle == -1 && retval < 1 ) retval = 1;
close( inhandle ); inhandle = -1;
if( outfd == -1 && retval < 1 ) retval = 1;
close( infd ); infd = -1;
continue;
}
}
@ -658,12 +661,12 @@ int main( const int argc, const char * argv[] )
{
input_filename = filenames[i];
const int eindex = extension_index( input_filename );
inhandle = open_instream( input_filename, &in_stats, program_mode,
infd = open_instream( input_filename, &in_stats, program_mode,
eindex, force, to_stdout );
if( inhandle < 0 ) { if( retval < 1 ) retval = 1; continue; }
if( infd < 0 ) { if( retval < 1 ) retval = 1; continue; }
if( program_mode != m_test )
{
if( to_stdout ) outhandle = STDOUT_FILENO;
if( to_stdout ) outfd = STDOUT_FILENO;
else
{
if( program_mode == m_compress )
@ -671,15 +674,15 @@ int main( const int argc, const char * argv[] )
else set_d_outname( input_filename, eindex );
if( !open_outstream( force ) )
{
if( outhandle == -1 && retval < 1 ) retval = 1;
close( inhandle ); inhandle = -1;
if( outfd == -1 && retval < 1 ) retval = 1;
close( infd ); infd = -1;
continue;
}
}
}
}
if( !check_tty( inhandle, program_mode ) ) return 1;
if( !check_tty( infd, program_mode ) ) return 1;
if( output_filename.size() && !to_stdout && program_mode != m_test )
delete_output_on_interrupt = true;
@ -690,9 +693,10 @@ int main( const int argc, const char * argv[] )
if( program_mode == m_compress )
tmp = compress( data_size, encoder_options.dictionary_size,
encoder_options.match_len_limit, num_workers,
num_slots, inhandle, outhandle, debug_level );
num_slots, infd, outfd, pp, debug_level );
else
tmp = decompress( inhandle, outhandle, pp, program_mode == m_test );
tmp = decompress( num_workers, num_slots, infd, outfd, pp,
debug_level, program_mode == m_test );
if( tmp > retval ) retval = tmp;
if( tmp && program_mode != m_test ) cleanup_and_fail( retval );
@ -700,12 +704,12 @@ int main( const int argc, const char * argv[] )
close_and_set_permissions( in_statsp );
if( input_filename.size() )
{
close( inhandle ); inhandle = -1;
close( infd ); infd = -1;
if( !keep_input_files && !to_stdout && program_mode != m_test )
std::remove( input_filename.c_str() );
}
}
if( outhandle >= 0 && close( outhandle ) != 0 )
if( outfd >= 0 && close( outfd ) != 0 )
{
if( verbosity >= 0 )
std::fprintf( stderr, "%s: Can't close stdout: %s.\n",

11
plzip.h
View file

@ -89,7 +89,7 @@ public:
{
xlock( &mutex );
++check_counter;
while( num_free == 0 )
while( num_free <= 0 )
{ ++wait_counter; xwait( &slot_av, &mutex ); }
--num_free;
xunlock( &mutex );
@ -98,7 +98,7 @@ public:
void leave_slot() // return a slot to the tally
{
xlock( &mutex );
if( num_free++ == 0 ) xsignal( &slot_av );
if( ++num_free == 1 ) xsignal( &slot_av );
xunlock( &mutex );
}
};
@ -107,13 +107,14 @@ public:
int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd,
const int debug_level );
const Pretty_print & pp, const int debug_level );
/*-------------------- Defined in decompress.cc --------------------*/
int decompress( const int infd, const int outfd, const Pretty_print & pp,
const bool testing );
int decompress( const int num_workers, const int num_slots,
const int infd, const int outfd, const Pretty_print & pp,
const int debug_level, const bool testing );
/*----------------------- Defined in main.cc -----------------------*/

View file

@ -23,7 +23,7 @@ echo -n "testing plzip..."
cd "${objdir}"/tmp
cat "${testdir}"/test1 > in || framework_failure
cat in in in in in > in5 || framework_failure
cat in in in in > in4 || framework_failure
fail=0
"${LZIP}" -cd "${testdir}"/test1.lz > copy || fail=1
@ -32,7 +32,7 @@ cmp in copy || fail=1
for i in s4096 1 2 3 4 5 6 7 8; do
"${LZIP}" -k -$i in || fail=1
mv -f in.lz copy.lz || fail=1
echo -n "garbage" >> copy.lz || fail=1
# echo -n "garbage" >> copy.lz || fail=1
"${LZIP}" -df copy.lz || fail=1
cmp in copy || fail=1
echo -n .
@ -40,7 +40,7 @@ done
for i in s4096 1 2 3 4 5 6 7 8; do
"${LZIP}" -c -$i in > out || fail=1
echo -n "g" >> out || fail=1
# echo -n "g" >> out || fail=1
"${LZIP}" -cd out > copy || fail=1
cmp in copy || fail=1
echo -n .
@ -61,9 +61,9 @@ for i in s4096 1 2 3 4 5 6 7 8; do
done
for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16; do
"${LZIP}" -s4096 -n$i < in5 > out5 || fail=1
"${LZIP}" -d < out5 > copy5 || fail=1
cmp in5 copy5 || fail=1
"${LZIP}" -s4Ki -B8Ki -n$i < in4 > out4 || fail=1
"${LZIP}" -d -n$i < out4 > copy4 || fail=1
cmp in4 copy4 || fail=1
echo -n .
done