1
0
Fork 0

Adding upstream version 0.3.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 03:25:20 +01:00
parent e748b84dff
commit e1ed3a8d42
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
11 changed files with 455 additions and 814 deletions

View file

@ -1,3 +1,13 @@
2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.3 released.
* Implemented option "--data-size".
* Output file is now removed if plzip is interrupted.
* This version automatically chooses the smallest possible
dictionary size for each member during compression, saving
memory during decompression.
* main.cc: New constant "o_binary".
2010-01-17 Antonio Diaz Diaz <ant_diaz@teleline.es> 2010-01-17 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.2 released. * Version 0.2 released.

View file

@ -30,8 +30,8 @@ main.o : main.cc
$(objs) : Makefile $(objs) : Makefile
arg_parser.o : arg_parser.h arg_parser.o : arg_parser.h
main.o : arg_parser.h main.h plzip.h main.o : arg_parser.h plzip.h
plzip.o : main.h plzip.h plzip.o : plzip.h
doc : info man doc : info man

13
NEWS
View file

@ -1,4 +1,11 @@
Changes in version 0.2: Changes in version 0.3:
Lzip options "--dictionary-size" and "--match-length" have been New option "--data-size" has been added.
implemented.
Output file is now removed if plzip is interrupted.
This version automatically chooses the smallest possible dictionary size
for each member during compression, saving memory during decompression.
Regular files are now open in binary mode in non-POSIX platforms
defining the O_BINARY macro.

4
configure vendored
View file

@ -5,12 +5,12 @@
# This configure script is free software: you have unlimited permission # This configure script is free software: you have unlimited permission
# to copy, distribute and modify it. # to copy, distribute and modify it.
# #
# Date of this version: 2010-01-17 # Date of this version: 2010-01-24
args= args=
no_create= no_create=
pkgname=plzip pkgname=plzip
pkgversion=0.2 pkgversion=0.3
progname=plzip progname=plzip
srctrigger=plzip.h srctrigger=plzip.h

View file

@ -1,5 +1,5 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36. .\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36.
.TH PLZIP "1" "January 2010" "Plzip 0.2" "User Commands" .TH PLZIP "1" "January 2010" "Plzip 0.3" "User Commands"
.SH NAME .SH NAME
Plzip \- data compressor based on the LZMA algorithm Plzip \- data compressor based on the LZMA algorithm
.SH SYNOPSIS .SH SYNOPSIS
@ -15,6 +15,9 @@ display this help and exit
\fB\-V\fR, \fB\-\-version\fR \fB\-V\fR, \fB\-\-version\fR
output version information and exit output version information and exit
.TP .TP
\fB\-B\fR, \fB\-\-data\-size=\fR<n>
set input data block size in bytes
.TP
\fB\-c\fR, \fB\-\-stdout\fR \fB\-c\fR, \fB\-\-stdout\fR
send output to standard output send output to standard output
.TP .TP

View file

@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir)
Plzip Manual Plzip Manual
************ ************
This manual is for Plzip (version 0.2, 17 January 2010). This manual is for Plzip (version 0.3, 24 January 2010).
* Menu: * Menu:
@ -34,9 +34,11 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T
1 Introduction 1 Introduction
************** **************
Plzip is a parallel version of the lzip data compressor. Currently only Plzip is a parallel version of the lzip data compressor. The files
compression is performed in parallel. Parallel decompression is planned produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
to be implemented later. intended for faster compression/decompression of big files on
multiprocessor machines. Currently only compression is performed in
parallel. Parallel decompression is planned to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of very safe integrity checking and a user interface similar to the one of
@ -104,6 +106,14 @@ The format for running plzip is:
`-V' `-V'
Print the version number of plzip on the standard output and exit. Print the version number of plzip on the standard output and exit.
`--data-size=SIZE'
`-B'
Set the input data block size in bytes. The input file will be
divided in chunks of this size before compression is performed.
Valid values range from 100kB to 1GiB. Default value is two times
the dictionary size. It is a waste of memory to choose a data size
smaller than the dictionary size.
`--stdout' `--stdout'
`-c' `-c'
Compress or decompress to standard output. Needed when reading Compress or decompress to standard output. Needed when reading
@ -295,9 +305,9 @@ Concept Index
Tag Table: Tag Table:
Node: Top227 Node: Top227
Node: Introduction750 Node: Introduction750
Node: Invoking Plzip3402 Node: Invoking Plzip3571
Node: File Format6747 Node: File Format7260
Node: Problems8703 Node: Problems9216
Node: Concept Index9232 Node: Concept Index9745
 
End Tag Table End Tag Table

View file

@ -5,8 +5,8 @@
@finalout @finalout
@c %**end of header @c %**end of header
@set UPDATED 17 January 2010 @set UPDATED 24 January 2010
@set VERSION 0.2 @set VERSION 0.3
@dircategory Data Compression @dircategory Data Compression
@direntry @direntry
@ -50,9 +50,11 @@ to copy, distribute and modify it.
@chapter Introduction @chapter Introduction
@cindex introduction @cindex introduction
Plzip is a parallel version of the lzip data compressor. Currently only Plzip is a parallel version of the lzip data compressor. The files
compression is performed in parallel. Parallel decompression is planned produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
to be implemented later. intended for faster compression/decompression of big files on
multiprocessor machines. Currently only compression is performed in
parallel. Parallel decompression is planned to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of very safe integrity checking and a user interface similar to the one of
@ -127,6 +129,14 @@ Print an informative help message describing the options and exit.
@itemx -V @itemx -V
Print the version number of plzip on the standard output and exit. Print the version number of plzip on the standard output and exit.
@item --data-size=@var{size}
@itemx -B
Set the input data block size in bytes. The input file will be divided
in chunks of this size before compression is performed. Valid values
range from 100kB to 1GiB. Default value is two times the dictionary
size. It is a waste of memory to choose a data size smaller than the
dictionary size.
@item --stdout @item --stdout
@itemx -c @itemx -c
Compress or decompress to standard output. Needed when reading from a Compress or decompress to standard output. Needed when reading from a

445
main.cc
View file

@ -25,20 +25,17 @@
#define _FILE_OFFSET_BITS 64 #define _FILE_OFFSET_BITS 64
#include <algorithm> #include <algorithm>
#include <cassert>
#include <cerrno> #include <cerrno>
#include <climits> #include <climits>
#include <csignal> #include <csignal>
#include <cstdarg>
#include <cstddef>
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
#include <string> #include <string>
#include <vector> #include <vector>
#include <fcntl.h> #include <fcntl.h>
#include <stdint.h>
#include <pthread.h> #include <pthread.h>
#include <stdint.h>
#include <unistd.h> #include <unistd.h>
#include <utime.h> #include <utime.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -49,7 +46,6 @@
#endif #endif
#include "arg_parser.h" #include "arg_parser.h"
#include "main.h"
#include "plzip.h" #include "plzip.h"
#ifndef LLONG_MAX #ifndef LLONG_MAX
@ -72,6 +68,12 @@ const char * const Program_name = "Plzip";
const char * const program_name = "plzip"; const char * const program_name = "plzip";
const char * const program_year = "2010"; const char * const program_year = "2010";
#ifdef O_BINARY
const int o_binary = O_BINARY;
#else
const int o_binary = 0;
#endif
struct { const char * from; const char * to; } const known_extensions[] = { struct { const char * from; const char * to; } const known_extensions[] = {
{ ".lz", "" }, { ".lz", "" },
{ ".tlz", ".tar" }, { ".tlz", ".tar" },
@ -88,6 +90,8 @@ enum Mode { m_compress = 0, m_decompress, m_test };
std::string output_filename; std::string output_filename;
int outhandle = -1; int outhandle = -1;
bool delete_output_on_interrupt = false; bool delete_output_on_interrupt = false;
pthread_t main_thread;
pid_t main_thread_pid;
class Pretty_print class Pretty_print
{ {
@ -132,6 +136,7 @@ void show_help() throw()
std::printf( " -h, --help display this help and exit\n" ); std::printf( " -h, --help display this help and exit\n" );
std::printf( " -V, --version output version information and exit\n" ); std::printf( " -V, --version output version information and exit\n" );
// std::printf( " -b, --member-size=<n> set member size limit in bytes\n" ); // std::printf( " -b, --member-size=<n> set member size limit in bytes\n" );
std::printf( " -B, --data-size=<n> set input data block size in bytes\n" );
std::printf( " -c, --stdout send output to standard output\n" ); std::printf( " -c, --stdout send output to standard output\n" );
std::printf( " -d, --decompress decompress\n" ); std::printf( " -d, --decompress decompress\n" );
std::printf( " -f, --force overwrite existing output files\n" ); std::printf( " -f, --force overwrite existing output files\n" );
@ -267,7 +272,7 @@ int open_instream( const std::string & name, struct stat * in_statsp,
} }
else else
{ {
inhandle = open( name.c_str(), O_RDONLY ); inhandle = open( name.c_str(), O_RDONLY | o_binary );
if( inhandle < 0 ) if( inhandle < 0 )
{ {
if( verbosity >= 0 ) if( verbosity >= 0 )
@ -324,9 +329,11 @@ void set_d_outname( const std::string & name, const int i ) throw()
bool open_outstream( const bool force ) throw() bool open_outstream( const bool force ) throw()
{ {
if( force ) if( force )
outhandle = open( output_filename.c_str(), O_CREAT | O_TRUNC | O_WRONLY, outhandle = open( output_filename.c_str(),
O_CREAT | O_TRUNC | O_WRONLY | o_binary,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH ); S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH );
else outhandle = open( output_filename.c_str(), O_CREAT | O_EXCL | O_WRONLY, else outhandle = open( output_filename.c_str(),
O_CREAT | O_EXCL | O_WRONLY | o_binary,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH ); S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH );
if( outhandle < 0 ) if( outhandle < 0 )
{ {
@ -412,14 +419,13 @@ int do_decompress( LZ_Decoder * const decoder, const int inhandle,
const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size; const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size]; uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
if( verbosity >= 1 ) pp();
while( true ) while( true )
{ {
int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size ); int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
if( in_size > 0 ) if( in_size > 0 )
{ {
const int max_in_size = in_size; const int max_in_size = in_size;
in_size = readblock( inhandle, (char *)in_buffer, max_in_size ); in_size = readblock( inhandle, in_buffer, max_in_size );
if( in_size != max_in_size && errno ) if( in_size != max_in_size && errno )
{ pp(); show_error( "read error", errno ); return 1; } { pp(); show_error( "read error", errno ); return 1; }
if( in_size == 0 ) LZ_decompress_finish( decoder ); if( in_size == 0 ) LZ_decompress_finish( decoder );
@ -458,7 +464,7 @@ int do_decompress( LZ_Decoder * const decoder, const int inhandle,
} }
else if( out_size > 0 && outhandle >= 0 ) else if( out_size > 0 && outhandle >= 0 )
{ {
const int wr = writeblock( outhandle, (char *)out_buffer, out_size ); const int wr = writeblock( outhandle, out_buffer, out_size );
if( wr != out_size ) if( wr != out_size )
{ pp(); show_error( "write error", errno ); return 1; } { pp(); show_error( "write error", errno ); return 1; }
} }
@ -490,9 +496,12 @@ int decompress( const int inhandle, const Pretty_print & pp,
} }
extern "C" void signal_handler( int ) throw() extern "C" void signal_handler( int sig ) throw()
{ {
show_error( "Control-C or similar caught, quitting." ); if( !pthread_equal( pthread_self(), main_thread ) )
kill( main_thread_pid, sig );
if( sig != SIGUSR1 )
show_error( "Control-C or similar caught, quitting." );
cleanup_and_fail( 1 ); cleanup_and_fail( 1 );
} }
@ -502,6 +511,7 @@ void set_signals() throw()
signal( SIGHUP, signal_handler ); signal( SIGHUP, signal_handler );
signal( SIGINT, signal_handler ); signal( SIGINT, signal_handler );
signal( SIGTERM, signal_handler ); signal( SIGTERM, signal_handler );
signal( SIGUSR1, signal_handler );
} }
} // end namespace } // end namespace
@ -551,372 +561,16 @@ void internal_error( const char * msg )
} }
/* Private stuff needed by fatal(). */ // This can be called from any thread, main thread or sub-threads alike, since
static pthread_t main_thread; // they all call common helper functions that call fatal() in case of an error.
//
static pid_t pid; void fatal() { signal_handler( SIGUSR1 ); }
/* Public utility variables and functions. */
/*
This can be called from any thread, main thread or sub-threads alike, since
they all call common helper functions that call fatal() in case of an error.
*/
void fatal()
{
if( pthread_equal(pthread_self(), main_thread) )
cleanup_and_fail( 1 );
else
{
if( 0 == kill(pid, SIGUSR1) )
pthread_exit(0);
}
_exit( 1 );
}
void
fail(const char *fmt, int err, ...)
{
va_list args;
/* Locking stderr should also protect strerror(). */
flockfile(stderr);
(void)fprintf(stderr, "%s: ", program_name);
va_start(args, err);
(void)vfprintf(stderr, fmt, args);
va_end(args);
(void)fprintf(stderr, ": %s\n", strerror(err));
funlockfile(stderr);
/* Stream stderr is never fully buffered originally. */
fatal();
}
void
xinit(Cond *cond)
{
pthread_mutexattr_t attr;
int ret = pthread_mutexattr_init(&attr);
if( ret != 0 ) {
fail("pthread_mutexattr_init()", ret);
}
ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if( ret != 0 ) {
fail("pthread_mutexattr_settype()", ret);
}
ret = pthread_mutex_init(&cond->lock, &attr);
if( ret != 0 ) {
fail("pthread_mutex_init()", ret);
}
ret = pthread_mutexattr_destroy(&attr);
if( ret != 0 ) {
fail("pthread_mutexattr_destroy()", ret);
}
ret = pthread_cond_init(&cond->cond, 0);
if( ret != 0 ) {
fail("pthread_cond_init()", ret);
}
cond->ccount = 0;
cond->wcount = 0;
}
void
xdestroy(Cond *cond)
{
int ret = pthread_cond_destroy(&cond->cond);
if( ret != 0 ) {
fail("pthread_cond_destroy()", ret);
}
ret = pthread_mutex_destroy(&cond->lock);
if( ret != 0 ) {
fail("pthread_mutex_destroy()", ret);
}
}
void
xlock(Cond *cond)
{
int ret = pthread_mutex_lock(&cond->lock);
if( ret != 0 ) {
fail("pthread_mutex_lock()", ret);
}
}
void
xlock_pred(Cond *cond)
{
xlock(cond);
++cond->ccount;
}
void
xunlock(Cond *cond)
{
int ret = pthread_mutex_unlock(&cond->lock);
if( ret != 0 ) {
fail("pthread_mutex_unlock()", ret);
}
}
void
xwait(Cond *cond)
{
++cond->wcount;
int ret = pthread_cond_wait(&cond->cond, &cond->lock);
if( ret != 0 ) {
fail("pthread_cond_wait()", ret);
}
++cond->ccount;
}
void
xsignal(Cond *cond)
{
int ret = pthread_cond_signal(&cond->cond);
if( ret != 0 ) {
fail("pthread_cond_signal()", ret);
}
}
void
xbroadcast(Cond *cond)
{
int ret = pthread_cond_broadcast(&cond->cond);
if( ret != 0 ) {
fail("pthread_cond_broadcast()", ret);
}
}
void
xcreate(pthread_t *thread, void *(*routine)(void *), void *arg)
{
int ret = pthread_create(thread, 0, routine, arg);
if( ret != 0 ) {
fail("pthread_create()", ret);
}
}
void
xjoin(pthread_t thread)
{
int ret = pthread_join(thread, 0);
if( ret != 0 ) {
fail("pthread_join()", ret);
}
}
void
xraise(int sig)
{
if( -1 == kill(pid, sig) ) {
fail("kill()", errno);
}
}
/* Private stuff part 2. */
static void
xsigemptyset(sigset_t *set)
{
if( -1 == sigemptyset(set) ) {
fail("sigemptyset()", errno);
}
}
static void
xsigaddset(sigset_t *set, int signo)
{
if( -1 == sigaddset(set, signo) ) {
fail("sigaddset()", errno);
}
}
static void
xsigmask(int how, const sigset_t *set, sigset_t *oset)
{
int ret = pthread_sigmask(how, set, oset);
if( ret != 0 ) {
fail("pthread_sigmask()", ret);
}
}
static void
xsigaction(int sig, void (*handler)(int))
{
struct sigaction act;
act.sa_handler = handler;
xsigemptyset(&act.sa_mask);
act.sa_flags = 0;
if( -1 == sigaction(sig, &act, 0) ) {
fail("sigaction()", errno);
}
}
enum Caught_sig { CS_INT = 1, CS_TERM, CS_USR1, CS_USR2 };
static volatile sig_atomic_t caught_sig;
extern "C" void sighandler( int sig )
{
/* sig_atomic_t is nowhere required to be able to hold signal values. */
switch( sig )
{
case SIGINT : caught_sig = CS_INT; break;
case SIGTERM: caught_sig = CS_TERM; break;
case SIGUSR1: caught_sig = CS_USR1; break;
case SIGUSR2: caught_sig = CS_USR2; break;
default: internal_error( "caught signal not in set" );
}
}
static void compress( const lzma_options & encoder_options, const int num_workers,
int debug_level, int num_slots, int infd, int outfd,
const Pretty_print & pp, const sigset_t *unblocked )
{
/*
We could wait for signals with either sigwait() or sigsuspend(). SUSv2
states about sigwait() that its effect on signal actions is unspecified.
SUSv3 still claims the same.
The SUSv2 description of sigsuspend() talks about both the thread and the
whole process being suspended until a signal arrives, although thread
suspension seems much more likely from the wording. They note that they
filed a clarification request for this. SUSv3 cleans this up and chooses
thread suspension which was more logical anyway.
I favor sigsuspend() because I need to re-raise SIGTERM and SIGINT, and
unspecified action behavior with sigwait() seems messy.
13-OCT-2009 lacos
*/
if( verbosity >= 1 ) pp();
Muxer_arg muxer_arg;
muxer_arg.dictionary_size = encoder_options.dictionary_size;
muxer_arg.match_len_limit = encoder_options.match_len_limit;
muxer_arg.num_workers = num_workers;
muxer_arg.num_slots = num_slots;
muxer_arg.debug_level = debug_level;
muxer_arg.infd = infd;
muxer_arg.outfd = outfd;
pthread_t muxer_thread;
xcreate(&muxer_thread, muxer, &muxer_arg);
/* Unblock signals, wait for them, then block them again. */
{
int ret = sigsuspend(unblocked);
assert(-1 == ret && EINTR == errno);
}
switch( caught_sig ) {
case CS_INT:
case CS_TERM: // FIXME remove output file
{
int sig;
sigset_t mask;
sig = (CS_INT == caught_sig) ? SIGINT : SIGTERM;
/*
We might have inherited a SIG_IGN from the parent, but that would
make no sense here. 24-OCT-2009 lacos
*/
xsigaction(sig, SIG_DFL);
xraise(sig);
xsigemptyset(&mask);
xsigaddset(&mask, sig);
xsigmask(SIG_UNBLOCK, &mask, 0);
}
/*
We shouldn't reach this point, but if we do for some reason, fall
through.
*/
case CS_USR1:
/* Error from a non-main thread via fatal(). */
fatal();
case CS_USR2:
/* Muxer thread joined other sub-threads and finished successfully. */
break;
default:
assert(0);
}
xjoin(muxer_thread);
}
static void
sigs_mod(int block_n_catch, sigset_t *oset)
{
void (*handler)(int);
if( block_n_catch ) {
sigset_t mask;
xsigemptyset(&mask);
xsigaddset(&mask, SIGINT);
xsigaddset(&mask, SIGTERM);
xsigaddset(&mask, SIGUSR1);
xsigaddset(&mask, SIGUSR2);
xsigmask(SIG_BLOCK, &mask, oset);
handler = sighandler;
}
else {
handler = SIG_DFL;
}
xsigaction(SIGINT, handler);
xsigaction(SIGTERM, handler);
xsigaction(SIGUSR1, handler);
xsigaction(SIGUSR2, handler);
if( !block_n_catch ) {
xsigmask(SIG_SETMASK, oset, 0);
}
}
// Returns the number of bytes really read. // Returns the number of bytes really read.
// If (returned value < size) and (errno == 0), means EOF was reached. // If (returned value < size) and (errno == 0), means EOF was reached.
// //
int readblock( const int fd, char * buf, const int size ) throw() int readblock( const int fd, uint8_t * buf, const int size ) throw()
{ {
int rest = size; int rest = size;
errno = 0; errno = 0;
@ -935,7 +589,7 @@ int readblock( const int fd, char * buf, const int size ) throw()
// Returns the number of bytes really written. // Returns the number of bytes really written.
// If (returned value < size), it is always an error. // If (returned value < size), it is always an error.
// //
int writeblock( const int fd, const char * buf, const int size ) throw() int writeblock( const int fd, const uint8_t * buf, const int size ) throw()
{ {
int rest = size; int rest = size;
errno = 0; errno = 0;
@ -966,6 +620,7 @@ int main( const int argc, const char * argv[] )
{ 1 << 24, 163 }, // -8 { 1 << 24, 163 }, // -8
{ 1 << 25, 273 } }; // -9 { 1 << 25, 273 } }; // -9
lzma_options encoder_options = option_mapping[5]; // default = "-6" lzma_options encoder_options = option_mapping[5]; // default = "-6"
int data_size = 0;
int debug_level = 0; int debug_level = 0;
int inhandle = -1; int inhandle = -1;
int num_workers = 0; // Start this many worker threads int num_workers = 0; // Start this many worker threads
@ -977,22 +632,18 @@ int main( const int argc, const char * argv[] )
std::string default_output_filename; std::string default_output_filename;
std::vector< std::string > filenames; std::vector< std::string > filenames;
invocation_name = argv[0]; invocation_name = argv[0];
main_thread = pthread_self();
main_thread_pid = getpid();
if( LZ_version()[0] != LZ_version_string[0] ) if( LZ_version()[0] != LZ_version_string[0] )
internal_error( "bad library version" ); internal_error( "bad library version" );
main_thread = pthread_self();
pid = getpid();
xsigaction(SIGPIPE, SIG_IGN);
xsigaction(SIGXFSZ, SIG_IGN);
const int slots_per_worker = 2; const int slots_per_worker = 2;
long max_workers = sysconf( _SC_THREAD_THREADS_MAX ); long max_workers = sysconf( _SC_THREAD_THREADS_MAX );
if( max_workers < 1 || max_workers > INT_MAX / slots_per_worker ) if( max_workers < 1 || max_workers > INT_MAX / slots_per_worker )
max_workers = INT_MAX / slots_per_worker; max_workers = INT_MAX / slots_per_worker;
if( max_workers > INT_MAX / (int)sizeof( pthread_t ) ) if( max_workers > INT_MAX / (int)sizeof (pthread_t) )
max_workers = INT_MAX / sizeof( pthread_t ); max_workers = INT_MAX / sizeof (pthread_t);
const Arg_parser::Option options[] = const Arg_parser::Option options[] =
{ {
@ -1006,6 +657,7 @@ int main( const int argc, const char * argv[] )
{ '8', 0, Arg_parser::no }, { '8', 0, Arg_parser::no },
{ '9', "best", Arg_parser::no }, { '9', "best", Arg_parser::no },
{ 'b', "member-size", Arg_parser::yes }, { 'b', "member-size", Arg_parser::yes },
{ 'B', "data-size", Arg_parser::yes },
{ 'c', "stdout", Arg_parser::no }, { 'c', "stdout", Arg_parser::no },
{ 'd', "decompress", Arg_parser::no }, { 'd', "decompress", Arg_parser::no },
{ 'D', "debug", Arg_parser::yes }, { 'D', "debug", Arg_parser::yes },
@ -1040,6 +692,8 @@ int main( const int argc, const char * argv[] )
case '7': case '8': case '9': case '7': case '8': case '9':
encoder_options = option_mapping[code-'1']; break; encoder_options = option_mapping[code-'1']; break;
case 'b': break; case 'b': break;
case 'B': data_size = getnum( arg, 0, 100000,
2 * LZ_max_dictionary_size() ); break;
case 'c': to_stdout = true; break; case 'c': to_stdout = true; break;
case 'd': program_mode = m_decompress; break; case 'd': program_mode = m_decompress; break;
case 'D': debug_level = getnum( arg, 0, 0, 3 ); case 'D': debug_level = getnum( arg, 0, 0, 3 );
@ -1048,8 +702,8 @@ int main( const int argc, const char * argv[] )
case 'h': show_help(); return 0; case 'h': show_help(); return 0;
case 'k': keep_input_files = true; break; case 'k': keep_input_files = true; break;
case 'm': encoder_options.match_len_limit = case 'm': encoder_options.match_len_limit =
getnum( arg, 0, LZ_min_match_len_limit(), getnum( arg, 0, LZ_min_match_len_limit(),
LZ_max_match_len_limit() ); break; LZ_max_match_len_limit() ); break;
case 'o': default_output_filename = arg; break; case 'o': default_output_filename = arg; break;
case 'n': num_workers = getnum( arg, 0, 1, max_workers ); break; case 'n': num_workers = getnum( arg, 0, 1, max_workers ); break;
case 'q': verbosity = -1; break; case 'q': verbosity = -1; break;
@ -1063,13 +717,16 @@ int main( const int argc, const char * argv[] )
} }
} }
if( data_size <= 0 )
data_size = 2 * std::max( 65536, encoder_options.dictionary_size );
if( num_workers <= 0 ) if( num_workers <= 0 )
{ {
long num_online = sysconf( _SC_NPROCESSORS_ONLN ); long num_online = sysconf( _SC_NPROCESSORS_ONLN );
if( num_online <= 0 ) num_online = 2; if( num_online <= 0 ) num_online = 1;
num_workers = std::min( num_online, max_workers ); num_workers = std::min( num_online, max_workers );
} }
const int num_slots = num_workers * slots_per_worker; const int num_slots = std::max( 1, ( num_workers * slots_per_worker ) - 1 );
bool filenames_given = false; bool filenames_given = false;
for( ; argind < parser.arguments(); ++argind ) for( ; argind < parser.arguments(); ++argind )
@ -1079,7 +736,9 @@ int main( const int argc, const char * argv[] )
} }
if( filenames.empty() ) filenames.push_back("-"); if( filenames.empty() ) filenames.push_back("-");
if( filenames_given && program_mode != m_compress ) set_signals(); if( !to_stdout && program_mode != m_test &&
( filenames_given || default_output_filename.size() ) )
set_signals();
Pretty_print pp( filenames ); Pretty_print pp( filenames );
if( program_mode == m_test ) if( program_mode == m_test )
@ -1144,14 +803,12 @@ int main( const int argc, const char * argv[] )
delete_output_on_interrupt = true; delete_output_on_interrupt = true;
const struct stat * const in_statsp = input_filename.size() ? &in_stats : 0; const struct stat * const in_statsp = input_filename.size() ? &in_stats : 0;
pp.set_name( input_filename ); pp.set_name( input_filename );
if( verbosity >= 1 ) pp();
int tmp = 0; int tmp = 0;
if( program_mode == m_compress ) if( program_mode == m_compress )
{ tmp = compress( data_size, encoder_options.dictionary_size,
sigset_t unblocked; encoder_options.match_len_limit, num_workers,
sigs_mod(1, &unblocked); num_slots, inhandle, outhandle, debug_level );
compress( encoder_options, num_workers, debug_level, num_slots, inhandle, outhandle, pp, &unblocked );
sigs_mod(0, &unblocked);
}
else else
tmp = decompress( inhandle, pp, program_mode == m_test ); tmp = decompress( inhandle, pp, program_mode == m_test );
if( tmp > retval ) retval = tmp; if( tmp > retval ) retval = tmp;

77
main.h
View file

@ -1,77 +0,0 @@
/* Plzip - A parallel version of the lzip data compressor
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
int readblock( const int fd, char * buf, const int size ) throw();
int writeblock( const int fd, const char * buf, const int size ) throw();
struct Cond
{
pthread_mutex_t lock; /* Lock this to protect shared resource. */
pthread_cond_t cond; /* Trigger this if predicate becomes true. */
long unsigned ccount, /* Increment this when checking predicate. */
wcount; /* Increment this when waiting is necessary. */
};
/* Terminate the process. */
void fatal();
/* Format operation and append resolved error, then call fatal(). */
void
fail(const char *fmt, int err, ...)
#ifdef __GNUC__
__attribute__((format(printf, 1, 3)))
#endif
;
/* If these primitives fail, they call fail(), which in turn calls fatal(). */
void
xinit(Cond *cond);
void
xdestroy(Cond *cond);
void
xlock(Cond *cond);
void
xlock_pred(Cond *cond);
void
xunlock(Cond *cond);
void
xwait(Cond *cond);
void
xsignal(Cond *cond);
void
xbroadcast(Cond *cond);
void
xcreate(pthread_t *thread, void *(*routine)(void *), void *arg);
void
xjoin(pthread_t thread);
void
xraise(int sig);

641
plzip.cc
View file

@ -26,11 +26,11 @@
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
#include <vector> #include <vector>
#include <pthread.h>
#include <stdint.h> #include <stdint.h>
#include <unistd.h> #include <unistd.h>
#include <lzlib.h> #include <lzlib.h>
#include "main.h"
#include "plzip.h" #include "plzip.h"
#ifndef LLONG_MAX #ifndef LLONG_MAX
@ -49,282 +49,304 @@ namespace {
long long in_size = 0; long long in_size = 0;
long long out_size = 0; long long out_size = 0;
void *(*mallocf)(size_t size); void *(*mallocf)( size_t size );
void (*freef)(void *ptr); void (*freef)( void *ptr );
void * trace_malloc(size_t size) void * trace_malloc( size_t size )
{ {
int save_errno = 0; int save_errno = 0;
void * ret = malloc(size); void * ret = malloc( size );
if( ret == 0 ) save_errno = errno; if( ret == 0 ) save_errno = errno;
fprintf(stderr, "malloc(%lu) == %p\n", (long unsigned)size, ret); std::fprintf( stderr, "malloc(%lu) == %p\n", (unsigned long)size, ret );
if( ret == 0 ) errno = save_errno; if( ret == 0 ) errno = save_errno;
return ret; return ret;
} }
void trace_free(void *ptr) void trace_free( void *ptr )
{
fprintf(stderr, "free(%p)\n", ptr);
free(ptr);
}
void * xalloc(size_t size)
{
void *ret = (*mallocf)(size);
if( 0 == ret ) fail("(*mallocf)()", errno);
return ret;
}
struct S2w_blk /* Splitter to workers. */
{
unsigned long long id; /* Block serial number as read from infd. */
S2w_blk *next; /* Next in queue. */
size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
unsigned char plain[1]; /* Data read from infd, allocated: sizeof_plain. */
};
struct S2w_q
{
Cond av_or_eof; /* New block available or splitter done. */
S2w_blk *tail, /* Splitter will append here. */
*head; /* Next ready worker shall compress this. */
int eof; /* Splitter done. */
};
void
s2w_q_init(S2w_q *s2w_q)
{
xinit(&s2w_q->av_or_eof);
s2w_q->tail = 0;
s2w_q->head = 0;
s2w_q->eof = 0;
}
void
s2w_q_uninit(S2w_q *s2w_q)
{
assert(0 != s2w_q->eof);
assert(0 == s2w_q->head);
assert(0 == s2w_q->tail);
xdestroy(&s2w_q->av_or_eof);
}
struct W2m_blk /* Workers to muxer data block. */
{
unsigned long long id; /* Block index as read from infd. */
W2m_blk *next; /* Next block in list (unordered). */
int produced; /* Number of bytes in compr. */
unsigned char compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
};
struct W2m_q
{
Cond av_or_exit; /* New block available or all workers exited. */
unsigned long long needed; /* Block needed for resuming writing. */
W2m_blk *head; /* Block list (unordered). */
unsigned working; /* Number of workers still running. */
};
void
w2m_q_init(W2m_q *w2m_q, int num_workers)
{
assert(0 < num_workers);
xinit(&w2m_q->av_or_exit);
w2m_q->needed = 0;
w2m_q->head = 0;
w2m_q->working = num_workers;
}
void
w2m_q_uninit(W2m_q *w2m_q)
{
assert(0 == w2m_q->working);
assert(0 == w2m_q->head);
xdestroy(&w2m_q->av_or_exit);
}
struct M2s_q // Muxer to splitter queue
{ {
Cond av; // Free slot available std::fprintf( stderr, "free(%p)\n", ptr );
int num_free; // Number of free slots free( ptr );
}
M2s_q( const int slots )
{
xinit(&av);
num_free = slots;
}
~M2s_q() { xdestroy(&av); } void * xalloc( size_t size )
{
void *ret = (*mallocf)( size );
if( ret == 0 ) { show_error( "not enough memory", errno ); fatal(); }
return ret;
}
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_mutex_init( mutex, 0 );
if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
ret = pthread_cond_init( cond, 0 );
if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
}
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_destroy( cond );
if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
ret = pthread_mutex_destroy( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
}
void xlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_lock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
}
void xunlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_unlock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
}
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_wait( cond, mutex );
if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
}
void xsignal( pthread_cond_t * cond )
{
int ret = pthread_cond_signal( cond );
if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
}
void xbroadcast( pthread_cond_t * cond )
{
int ret = pthread_cond_broadcast( cond );
if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
}
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
{
int ret = pthread_create( thread, 0, routine, arg );
if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
}
void xjoin( pthread_t thread )
{
int ret = pthread_join( thread, 0 );
if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
}
struct Slot_tally // Synchronizes splitter to muxer
{
unsigned long check_counter;
unsigned long wait_counter;
int num_free; // Number of free slots
pthread_mutex_t mutex;
pthread_cond_t slot_av; // Free slot available
Slot_tally( const int slots )
: check_counter( 0 ), wait_counter( 0 ), num_free( slots )
{ xinit( &slot_av, &mutex ); }
~Slot_tally() { xdestroy( &slot_av, &mutex ); }
};
struct S2w_blk // Splitter to worker data block
{
unsigned long long id; // Block serial number as read from infd
S2w_blk *next; // Next in queue
int loaded; // # of bytes in plain, may be 0 for 1st
uint8_t plain[1]; // Data read from infd, allocated: data_size
};
struct S2w_queue
{
S2w_blk * head; // Next ready worker shall compress this
S2w_blk * tail; // Splitter will append here
unsigned long check_counter;
unsigned long wait_counter;
pthread_mutex_t mutex;
pthread_cond_t av_or_eof; // New block available or splitter done
bool eof; // Splitter done
S2w_queue()
: head( 0 ), tail( 0 ), check_counter( 0 ), wait_counter( 0 ), eof( false )
{ xinit( &av_or_eof, &mutex ); }
~S2w_queue() { xdestroy( &av_or_eof, &mutex ); }
};
struct W2m_blk // Worker to muxer data block
{
unsigned long long id; // Block index as read from infd
W2m_blk *next; // Next block in list (unordered)
int produced; // Number of bytes in compr
uint8_t compr[1]; // Data to write to outfd, alloc.: compr_size
};
struct W2m_queue
{
unsigned long long needed_id; // Block needed for resuming writing
W2m_blk *head; // Block list (unordered)
unsigned long check_counter;
unsigned long wait_counter;
int num_working; // Number of workers still running
pthread_mutex_t mutex;
pthread_cond_t av_or_exit; // New block available or all workers exited
W2m_queue( const int num_workers )
: needed_id( 0 ), head( 0 ), check_counter( 0 ), wait_counter( 0 ),
num_working( num_workers )
{ xinit( &av_or_exit, &mutex ); }
~W2m_queue() { xdestroy( &av_or_exit, &mutex ); }
}; };
struct Splitter_arg struct Splitter_arg
{ {
M2s_q *m2s_q; Slot_tally * slot_tally;
S2w_q *s2w_q; S2w_queue * s2w_queue;
int infd; int infd;
int sizeof_plain; int data_size;
size_t sizeof_S2w_blk; int s2w_blk_size;
}; };
void * splitter( void * arg ) void * splitter( void * arg )
{ {
const Splitter_arg & tmp = *(Splitter_arg *)arg; const Splitter_arg & tmp = *(Splitter_arg *)arg;
M2s_q *m2s_q = tmp.m2s_q; Slot_tally & slot_tally = *tmp.slot_tally;
S2w_q *s2w_q = tmp.s2w_q; S2w_queue & s2w_queue = *tmp.s2w_queue;
const int infd = tmp.infd; const int infd = tmp.infd;
const int sizeof_plain = tmp.sizeof_plain; const int data_size = tmp.data_size;
const size_t sizeof_s2w_blk = tmp.sizeof_S2w_blk; const int s2w_blk_size = tmp.s2w_blk_size;
for( unsigned long long id = 0; ; ++id ) for( unsigned long long id = 0; ; ++id )
{ {
/* Grab a free slot. */ S2w_blk * s2w_blk = (S2w_blk *)xalloc( s2w_blk_size );
xlock_pred(&m2s_q->av);
while( m2s_q->num_free == 0 ) xwait(&m2s_q->av);
--m2s_q->num_free;
xunlock(&m2s_q->av);
S2w_blk * s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
/* Fill block. */ // Fill block
const int rd = readblock( infd, (char *)s2w_blk->plain, sizeof_plain ); const int rd = readblock( infd, s2w_blk->plain, data_size );
if( rd != sizeof_plain && errno ) fail("read()", errno); if( rd != data_size && errno ) { show_error( "read", errno ); fatal(); }
if( rd == 0 && id != 0 ) if( rd > 0 || id == 0 ) // first block can be empty
{
/* EOF on first read, but not for first input block. */
(*freef)(s2w_blk);
xlock(&m2s_q->av);
++m2s_q->num_free;
xunlock(&m2s_q->av);
}
else
{ {
s2w_blk->id = id; s2w_blk->id = id;
s2w_blk->next = 0; s2w_blk->next = 0;
s2w_blk->loaded = rd; s2w_blk->loaded = rd;
in_size += rd; in_size += rd;
xlock( &slot_tally.mutex ); // Grab a free slot
++slot_tally.check_counter;
while( slot_tally.num_free == 0 )
{
++slot_tally.wait_counter;
xwait( &slot_tally.slot_av, &slot_tally.mutex );
}
--slot_tally.num_free;
xunlock( &slot_tally.mutex );
} }
else
{ (*freef)( s2w_blk ); s2w_blk = 0; }
xlock(&s2w_q->av_or_eof); xlock( &s2w_queue.mutex );
if( s2w_q->head == 0 ) xbroadcast(&s2w_q->av_or_eof); if( s2w_blk != 0 )
if( rd > 0 || id == 0 )
{ {
if( s2w_q->tail == 0 ) s2w_q->head = s2w_blk; if( s2w_queue.tail == 0 ) s2w_queue.head = s2w_blk;
else s2w_q->tail->next = s2w_blk; else s2w_queue.tail->next = s2w_blk;
s2w_q->tail = s2w_blk; s2w_queue.tail = s2w_blk;
xsignal( &s2w_queue.av_or_eof );
} }
s2w_q->eof = ( rd == 0 ); else
xunlock(&s2w_q->av_or_eof); {
s2w_queue.eof = true;
xbroadcast( &s2w_queue.av_or_eof );
}
xunlock( &s2w_queue.mutex );
if( rd <= 0 ) break; if( s2w_blk == 0 ) break;
} }
return 0; return 0;
} }
void work_lz_rd(W2m_blk *w2m_blk, const int sizeof_compr, LZ_Encoder * lz)
{
int rd;
assert(w2m_blk->produced < sizeof_compr);
rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
sizeof_compr - w2m_blk->produced);
if( -1 == rd ) {
show_error( "LZ_compress_read() failed." );
fatal();
}
w2m_blk->produced += rd;
}
void work_compr( const int dictionary_size, const int match_len_limit, void work_compr( const int dictionary_size, const int match_len_limit,
S2w_blk *s2w_blk, W2m_q *w2m_q, const S2w_blk & s2w_blk, W2m_queue & w2m_queue,
const int sizeof_compr, const size_t sizeof_w2m_blk ) const int compr_size, const int w2m_blk_size )
{
W2m_blk *w2m_blk;
assert(0 < s2w_blk->loaded || 0 == s2w_blk->id);
w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
/* Single member compression. Settings like with lzip -6. */
{ {
LZ_Encoder * lz; assert( s2w_blk.loaded > 0 || s2w_blk.id == 0 );
size_t written;
lz = LZ_compress_open( dictionary_size, match_len_limit, LLONG_MAX ); W2m_blk * w2m_blk = (W2m_blk *)xalloc( w2m_blk_size );
if( LZ_ok != LZ_compress_errno(lz) ) {
show_error( "LZ_compress_open() failed." );
fatal();
}
written = 0; const int dict_size = std::max( LZ_min_dictionary_size(),
w2m_blk->produced = 0; std::min( dictionary_size, s2w_blk.loaded ) );
while( written < s2w_blk->loaded ) { LZ_Encoder * const encoder =
int wr; LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
{ show_error( "LZ_compress_open failed." ); fatal(); }
wr = LZ_compress_write(lz, s2w_blk->plain + written, int written = 0;
s2w_blk->loaded - written); w2m_blk->produced = 0;
if( -1 == wr ) { while( true )
show_error( "LZ_compress_write() failed." ); {
fatal(); if( LZ_compress_write_size( encoder ) > 0 )
{
if( written < s2w_blk.loaded )
{
const int wr = LZ_compress_write( encoder, s2w_blk.plain + written,
s2w_blk.loaded - written );
if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
written += wr;
}
if( written >= s2w_blk.loaded ) LZ_compress_finish( encoder );
} }
written += (size_t)wr; assert( w2m_blk->produced < compr_size );
const int rd = LZ_compress_read( encoder, w2m_blk->compr + w2m_blk->produced,
work_lz_rd(w2m_blk, sizeof_compr, lz); compr_size - w2m_blk->produced );
if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
w2m_blk->produced += rd;
if( LZ_compress_finished( encoder ) == 1 ) break;
} }
if( -1 == LZ_compress_finish(lz) ) { if( LZ_compress_close( encoder ) < 0 )
show_error( "LZ_compress_finish() failed." ); { show_error( "LZ_compress_close failed." ); fatal(); }
fatal();
}
while( !LZ_compress_finished(lz) ) { w2m_blk->id = s2w_blk.id;
work_lz_rd(w2m_blk, sizeof_compr, lz);
}
if( -1 == LZ_compress_close(lz) ) { // Push block to muxer queue
show_error( "LZ_compress_close() failed." ); xlock( &w2m_queue.mutex );
fatal(); w2m_blk->next = w2m_queue.head;
} w2m_queue.head = w2m_blk;
if( w2m_blk->id == w2m_queue.needed_id ) xsignal( &w2m_queue.av_or_exit );
xunlock( &w2m_queue.mutex );
} }
w2m_blk->id = s2w_blk->id;
/* Push block to muxer. */
xlock(&w2m_q->av_or_exit);
w2m_blk->next = w2m_q->head;
w2m_q->head = w2m_blk;
if( w2m_blk->id == w2m_q->needed ) {
xsignal(&w2m_q->av_or_exit);
}
xunlock(&w2m_q->av_or_exit);
}
struct Worker_arg struct Worker_arg
{ {
int dictionary_size; int dictionary_size;
int match_len_limit; int match_len_limit;
S2w_q *s2w_q; S2w_queue * s2w_queue;
W2m_q *w2m_q; W2m_queue * w2m_queue;
int sizeof_compr; int compr_size;
size_t sizeof_W2m_blk; int w2m_blk_size;
}; };
@ -333,64 +355,68 @@ void * worker( void * arg )
const Worker_arg & tmp = *(Worker_arg *)arg; const Worker_arg & tmp = *(Worker_arg *)arg;
const int dictionary_size = tmp.dictionary_size; const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit; const int match_len_limit = tmp.match_len_limit;
S2w_q *s2w_q = tmp.s2w_q; S2w_queue & s2w_queue = *tmp.s2w_queue;
W2m_q *w2m_q = tmp.w2m_q; W2m_queue & w2m_queue = *tmp.w2m_queue;
const int sizeof_compr = tmp.sizeof_compr; const int compr_size = tmp.compr_size;
const size_t sizeof_w2m_blk = tmp.sizeof_W2m_blk; const int w2m_blk_size = tmp.w2m_blk_size;
while( true ) while( true )
{ {
S2w_blk *s2w_blk; S2w_blk *s2w_blk;
/* Grab a block to work on. */ // Grab a block to work on
xlock_pred(&s2w_q->av_or_eof); xlock( &s2w_queue.mutex );
while( 0 == s2w_q->head && !s2w_q->eof ) { ++s2w_queue.check_counter;
xwait(&s2w_q->av_or_eof); while( s2w_queue.head == 0 && !s2w_queue.eof )
} {
if( 0 == s2w_q->head ) { ++s2w_queue.wait_counter;
/* No blocks available and splitter exited. */ xwait( &s2w_queue.av_or_eof, &s2w_queue.mutex );
xunlock(&s2w_q->av_or_eof); }
if( s2w_queue.head == 0 ) // No blocks available and splitter exited
{
xunlock( &s2w_queue.mutex );
break; break;
} }
s2w_blk = s2w_q->head; s2w_blk = s2w_queue.head;
s2w_q->head = s2w_blk->next; s2w_queue.head = s2w_blk->next;
if( 0 == s2w_q->head ) { if( s2w_queue.head == 0 ) s2w_queue.tail = 0;
s2w_q->tail = 0; xunlock( &s2w_queue.mutex );
}
xunlock(&s2w_q->av_or_eof);
work_compr( dictionary_size, match_len_limit, s2w_blk, w2m_q, work_compr( dictionary_size, match_len_limit, *s2w_blk, w2m_queue,
sizeof_compr, sizeof_w2m_blk ); compr_size, w2m_blk_size );
(*freef)(s2w_blk); (*freef)( s2w_blk );
} }
/* Notify muxer when last worker exits. */ // Notify muxer when last worker exits
xlock(&w2m_q->av_or_exit); xlock( &w2m_queue.mutex );
if( 0 == --w2m_q->working && 0 == w2m_q->head ) { if( --w2m_queue.num_working == 0 && w2m_queue.head == 0 )
xsignal(&w2m_q->av_or_exit); xsignal( &w2m_queue.av_or_exit );
} xunlock( &w2m_queue.mutex );
xunlock(&w2m_q->av_or_exit);
return 0; return 0;
} }
void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outfd ) void muxer( Slot_tally & slot_tally, W2m_queue & w2m_queue,
const int num_slots, const int outfd )
{ {
unsigned long long needed_id = 0; unsigned long long needed_id = 0;
std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 ); std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
xlock_pred(&w2m_q->av_or_exit); xlock( &w2m_queue.mutex );
while( true ) while( true )
{ {
/* Grab all available compressed blocks in one step. */ // Grab all available compressed blocks in one step
while( w2m_q->head == 0 && w2m_q->working > 0 ) ++w2m_queue.check_counter;
xwait(&w2m_q->av_or_exit); while( w2m_queue.head == 0 && w2m_queue.num_working > 0 )
{
++w2m_queue.wait_counter;
xwait( &w2m_queue.av_or_exit, &w2m_queue.mutex );
}
if( w2m_queue.head == 0 ) break; // queue is empty. all workers exited
if( w2m_q->head == 0 ) break; // queue is empty. all workers exited W2m_blk * w2m_blk = w2m_queue.head;
w2m_queue.head = 0;
W2m_blk * w2m_blk = w2m_q->head; xunlock( &w2m_queue.mutex );
w2m_q->head = 0;
xunlock(&w2m_q->av_or_exit);
// Merge blocks fetched this time into circular buffer // Merge blocks fetched this time into circular buffer
do { do {
@ -405,30 +431,31 @@ void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outf
// Write out initial continuous sequence of reordered blocks // Write out initial continuous sequence of reordered blocks
while( true ) while( true )
{ {
W2m_blk * needed_w2m_blk = circular_buffer[needed_id%num_slots]; w2m_blk = circular_buffer[needed_id%num_slots];
if( needed_w2m_blk == 0 ) break; if( w2m_blk == 0 ) break;
out_size += needed_w2m_blk->produced; out_size += w2m_blk->produced;
if( outfd >= 0 ) if( outfd >= 0 )
{ {
const int wr = writeblock( outfd, (char *)needed_w2m_blk->compr, needed_w2m_blk->produced ); const int wr = writeblock( outfd, w2m_blk->compr, w2m_blk->produced );
if( wr != needed_w2m_blk->produced ) fail("write()", errno); if( wr != w2m_blk->produced )
{ show_error( "write", errno ); fatal(); }
} }
circular_buffer[needed_id%num_slots] = 0; circular_buffer[needed_id%num_slots] = 0;
++needed_id; ++needed_id;
xlock(&m2s_q->av); xlock( &slot_tally.mutex );
if( 0 == m2s_q->num_free++ ) xsignal(&m2s_q->av); if( slot_tally.num_free++ == 0 ) xsignal( &slot_tally.slot_av );
xunlock(&m2s_q->av); xunlock( &slot_tally.mutex );
(*freef)(needed_w2m_blk); (*freef)( w2m_blk );
} }
xlock_pred(&w2m_q->av_or_exit); xlock( &w2m_queue.mutex );
w2m_q->needed = needed_id; w2m_queue.needed_id = needed_id;
} }
xunlock(&w2m_q->av_or_exit); xunlock( &w2m_queue.mutex );
for( int i = 0; i < num_slots; ++i ) for( int i = 0; i < num_slots; ++i )
if( circular_buffer[i] != 0 ) if( circular_buffer[i] != 0 )
@ -438,57 +465,49 @@ void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outf
} // end namespace } // end namespace
void * muxer( void * arg ) int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd,
const int debug_level )
{ {
const Muxer_arg & tmp = *(Muxer_arg *)arg;
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
const int num_workers = tmp.num_workers;
const int num_slots = tmp.num_slots;
const int debug_level = tmp.debug_level;
const int infd = tmp.infd;
const int outfd = tmp.outfd;
S2w_q s2w_q;
W2m_q w2m_q;
if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; } if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
else { mallocf = malloc; freef = free; } else { mallocf = malloc; freef = free; }
s2w_q_init(&s2w_q); Slot_tally slot_tally( num_slots );
w2m_q_init(&w2m_q, num_workers); S2w_queue s2w_queue;
M2s_q m2s_q( num_slots ); W2m_queue w2m_queue( num_workers );
Splitter_arg splitter_arg; Splitter_arg splitter_arg;
splitter_arg.m2s_q = &m2s_q; splitter_arg.slot_tally = &slot_tally;
splitter_arg.s2w_q = &s2w_q; splitter_arg.s2w_queue = &s2w_queue;
splitter_arg.infd = infd; splitter_arg.infd = infd;
splitter_arg.sizeof_plain = 2 * std::max( 65536, dictionary_size ); splitter_arg.data_size = data_size;
splitter_arg.sizeof_S2w_blk = sizeof(S2w_blk) + splitter_arg.sizeof_plain - 1; splitter_arg.s2w_blk_size = sizeof (S2w_blk) + data_size - 1;
pthread_t splitter_thread; pthread_t splitter_thread;
xcreate(&splitter_thread, splitter, &splitter_arg); xcreate( &splitter_thread, splitter, &splitter_arg );
Worker_arg worker_arg; Worker_arg worker_arg;
worker_arg.dictionary_size = dictionary_size; worker_arg.dictionary_size = dictionary_size;
worker_arg.match_len_limit = match_len_limit; worker_arg.match_len_limit = match_len_limit;
worker_arg.s2w_q = &s2w_q; worker_arg.s2w_queue = &s2w_queue;
worker_arg.w2m_q = &w2m_q; worker_arg.w2m_queue = &w2m_queue;
worker_arg.sizeof_compr = 6 + 20 + ( ( splitter_arg.sizeof_plain / 8 ) * 9 ); worker_arg.compr_size = 6 + 20 + ( ( data_size / 8 ) * 9 );
worker_arg.sizeof_W2m_blk = sizeof(W2m_blk) + worker_arg.sizeof_compr - 1; worker_arg.w2m_blk_size = sizeof (W2m_blk) + worker_arg.compr_size - 1;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( worker_threads == 0 ) fail("not enough memory.", errno); if( worker_threads == 0 )
{ show_error( "not enough memory.", errno ); fatal(); }
for( int i = 0; i < num_workers; ++i ) for( int i = 0; i < num_workers; ++i )
xcreate(&worker_threads[i], worker, &worker_arg); xcreate( &worker_threads[i], worker, &worker_arg );
muxer_loop( &w2m_q, &m2s_q, num_slots, outfd ); muxer( slot_tally, w2m_queue, num_slots, outfd );
for( int i = num_workers - 1; i >= 0; --i ) for( int i = num_workers - 1; i >= 0; --i )
xjoin(worker_threads[i]); xjoin(worker_threads[i]);
delete[] worker_threads; worker_threads = 0; delete[] worker_threads; worker_threads = 0;
xjoin(splitter_thread); xjoin( splitter_thread );
if( verbosity >= 1 ) if( verbosity >= 1 )
{ {
@ -503,27 +522,27 @@ void * muxer( void * arg )
in_size, out_size ); in_size, out_size );
} }
const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1; const int FW = ( sizeof (unsigned long) * 8 ) / 3 + 1;
if( ( debug_level & 1 ) && 0 > fprintf(stderr, if( debug_level & 1 )
std::fprintf( stderr,
"any worker tried to consume from splitter: %*lu\n" "any worker tried to consume from splitter: %*lu\n"
"any worker stalled : %*lu\n" "any worker stalled : %*lu\n"
"muxer tried to consume from workers : %*lu\n" "muxer tried to consume from workers : %*lu\n"
"muxer stalled : %*lu\n" "muxer stalled : %*lu\n"
"splitter tried to consume from muxer : %*lu\n" "splitter tried to fill a block : %*lu\n"
"splitter stalled : %*lu\n", "splitter stalled : %*lu\n",
FW, s2w_q.av_or_eof.ccount, FW, s2w_queue.check_counter,
FW, s2w_q.av_or_eof.wcount, FW, s2w_queue.wait_counter,
FW, w2m_q.av_or_exit.ccount, FW, w2m_queue.check_counter,
FW, w2m_q.av_or_exit.wcount, FW, w2m_queue.wait_counter,
FW, m2s_q.av.ccount, FW, slot_tally.check_counter,
FW, m2s_q.av.wcount) ) FW, slot_tally.wait_counter );
{
fatal();
}
assert( m2s_q.num_free == num_slots ); assert( slot_tally.num_free == num_slots );
w2m_q_uninit(&w2m_q); assert( s2w_queue.eof );
s2w_q_uninit(&s2w_q); assert( s2w_queue.head == 0 );
xraise(SIGUSR2); assert( s2w_queue.tail == 0 );
assert( w2m_queue.num_working == 0 );
assert( w2m_queue.head == 0 );
return 0; return 0;
} }

24
plzip.h
View file

@ -16,17 +16,19 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
struct Muxer_arg int compress( const int data_size, const int dictionary_size,
{ const int match_len_limit, const int num_workers,
int dictionary_size; const int num_slots, const int infd, const int outfd,
int match_len_limit; const int debug_level );
int num_workers;
int num_slots;
int debug_level;
int infd;
int outfd;
};
void * muxer( void * arg );
/*----------------------- Defined in main.cc -----------------------*/
void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
int readblock( const int fd, uint8_t * buf, const int size ) throw();
int writeblock( const int fd, const uint8_t * buf, const int size ) throw();
void fatal(); // Terminate the process
extern int verbosity; extern int verbosity;