1
0
Fork 0

Merging upstream version 0.4.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-24 03:26:25 +01:00
parent 43289af9b1
commit 4ebe6b69a5
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
14 changed files with 750 additions and 727 deletions

View file

@ -1,3 +1,10 @@
2010-01-31 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.4 released.
* main.cc (show_version): Show the version of lzlib being used.
* Code reorganization. Class Packet_courier now coordinates data
movement and synchronization among threads.
2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es> 2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.3 released. * Version 0.3 released.

View file

@ -7,7 +7,7 @@ INSTALL_DIR = $(INSTALL) -d -m 755
LIBS = -llz -lpthread LIBS = -llz -lpthread
SHELL = /bin/sh SHELL = /bin/sh
objs = arg_parser.o plzip.o main.o objs = arg_parser.o compress.o decompress.o main.o
.PHONY : all install install-info install-man install-strip \ .PHONY : all install install-info install-man install-strip \
@ -30,8 +30,9 @@ main.o : main.cc
$(objs) : Makefile $(objs) : Makefile
arg_parser.o : arg_parser.h arg_parser.o : arg_parser.h
compress.o : plzip.h
decompress.o : plzip.h
main.o : arg_parser.h plzip.h main.o : arg_parser.h plzip.h
plzip.o : plzip.h
doc : info man doc : info man
@ -53,30 +54,30 @@ check : all
@$(VPATH)/testsuite/check.sh $(VPATH)/testsuite @$(VPATH)/testsuite/check.sh $(VPATH)/testsuite
install : all install-info install-man install : all install-info install-man
if [ ! -d $(DESTDIR)$(bindir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(bindir) ; fi if [ ! -d "$(DESTDIR)$(bindir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(bindir)" ; fi
$(INSTALL_PROGRAM) ./$(progname) $(DESTDIR)$(bindir)/$(progname) $(INSTALL_PROGRAM) ./$(progname) "$(DESTDIR)$(bindir)/$(progname)"
install-info : install-info :
if [ ! -d $(DESTDIR)$(infodir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(infodir) ; fi if [ ! -d "$(DESTDIR)$(infodir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(infodir)" ; fi
$(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info $(DESTDIR)$(infodir)/$(pkgname).info $(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info "$(DESTDIR)$(infodir)/$(pkgname).info"
-install-info --info-dir=$(DESTDIR)$(infodir) $(DESTDIR)$(infodir)/$(pkgname).info -install-info --info-dir="$(DESTDIR)$(infodir)" "$(DESTDIR)$(infodir)/$(pkgname).info"
install-man : install-man :
if [ ! -d $(DESTDIR)$(mandir)/man1 ] ; then $(INSTALL_DIR) $(DESTDIR)$(mandir)/man1 ; fi if [ ! -d "$(DESTDIR)$(mandir)/man1" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(mandir)/man1" ; fi
$(INSTALL_DATA) $(VPATH)/doc/$(progname).1 $(DESTDIR)$(mandir)/man1/$(progname).1 $(INSTALL_DATA) $(VPATH)/doc/$(progname).1 "$(DESTDIR)$(mandir)/man1/$(progname).1"
install-strip : all install-strip : all
$(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install $(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install
uninstall : uninstall-info uninstall-man uninstall : uninstall-info uninstall-man
-rm -f $(DESTDIR)$(bindir)/$(progname) -rm -f "$(DESTDIR)$(bindir)/$(progname)"
uninstall-info : uninstall-info :
-install-info --info-dir=$(DESTDIR)$(infodir) --remove $(DESTDIR)$(infodir)/$(pkgname).info -install-info --info-dir="$(DESTDIR)$(infodir)" --remove "$(DESTDIR)$(infodir)/$(pkgname).info"
-rm -f $(DESTDIR)$(infodir)/$(pkgname).info -rm -f "$(DESTDIR)$(infodir)/$(pkgname).info"
uninstall-man : uninstall-man :
-rm -f $(DESTDIR)$(mandir)/man1/$(progname).1 -rm -f "$(DESTDIR)$(mandir)/man1/$(progname).1"
dist : doc dist : doc
ln -sf $(VPATH) $(DISTNAME) ln -sf $(VPATH) $(DISTNAME)

13
NEWS
View file

@ -1,11 +1,6 @@
Changes in version 0.3: Changes in version 0.4:
New option "--data-size" has been added. The option "--version" now shows the version of lzlib being used.
Output file is now removed if plzip is interrupted. A code reorganization has been made. The new class "Packet_courier" now
coordinates data movement and synchronization among threads.
This version automatically chooses the smallest possible dictionary size
for each member during compression, saving memory during decompression.
Regular files are now open in binary mode in non-POSIX platforms
defining the O_BINARY macro.

6
README
View file

@ -1,6 +1,10 @@
Description Description
Plzip is a parallel version of the lzip data compressor. Currently only Plzip is a massively parallel (multithreaded) data compressor compatible
with the lzip file format. The files produced by plzip are fully
compatible with lzip-1.4 or newer. Plzip is intended for faster
compression/decompression of big files on multiprocessor machines. On
files big enough, plzip can use hundreds of processors. Currently only
compression is performed in parallel. Parallel decompression is planned compression is performed in parallel. Parallel decompression is planned
to be implemented later. to be implemented later.

460
compress.cc Normal file
View file

@ -0,0 +1,460 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _FILE_OFFSET_BITS 64
#include <algorithm>
#include <cassert>
#include <cerrno>
#include <climits>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <queue>
#include <string>
#include <vector>
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
#include "plzip.h"
#ifndef LLONG_MAX
#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
#endif
#ifndef LLONG_MIN
#define LLONG_MIN (-LLONG_MAX - 1LL)
#endif
#ifndef ULLONG_MAX
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
#endif
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_mutex_init( mutex, 0 );
if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
ret = pthread_cond_init( cond, 0 );
if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
}
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_destroy( cond );
if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
ret = pthread_mutex_destroy( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
}
void xlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_lock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
}
void xunlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_unlock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
}
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_wait( cond, mutex );
if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
}
void xsignal( pthread_cond_t * cond )
{
int ret = pthread_cond_signal( cond );
if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
}
void xbroadcast( pthread_cond_t * cond )
{
int ret = pthread_cond_broadcast( cond );
if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
}
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
{
int ret = pthread_create( thread, 0, routine, arg );
if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
}
void xjoin( pthread_t thread )
{
int ret = pthread_join( thread, 0 );
if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
}
namespace {
long long in_size = 0;
long long out_size = 0;
struct Packet // data block with a serial number
{
unsigned long long id; // serial number assigned as received
int size; // # of bytes in data
uint8_t * data;
};
class Packet_courier // moves packets around
{
public:
unsigned long icheck_counter;
unsigned long iwait_counter;
unsigned long ocheck_counter;
unsigned long owait_counter;
private:
unsigned long long receive_id; // id assigned to next packet received
unsigned long long deliver_id; // id of next packet to be delivered
Slot_tally slot_tally;
std::queue< Packet * > packet_queue;
std::vector< Packet * > circular_buffer;
int num_working; // Number of workers still running
const int num_slots; // max packets in circulation
pthread_mutex_t imutex;
pthread_cond_t iav_or_eof; // input packet available or splitter done
pthread_mutex_t omutex;
pthread_cond_t oav_or_exit; // output packet available or all workers exited
bool eof; // splitter done
public:
Packet_courier( const int num_workers, const int slots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
receive_id( 0 ), deliver_id( 0 ),
slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ),
num_working( num_workers ), num_slots( slots ), eof( false )
{ xinit( &iav_or_eof, &imutex ); xinit( &oav_or_exit, &omutex ); }
~Packet_courier()
{ xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); }
// make a packet with data received from splitter
void receive_packet( uint8_t * const data, const int size )
{
Packet * ipacket = new Packet;
ipacket->id = receive_id++;
ipacket->size = size;
ipacket->data = data;
slot_tally.get_slot(); // wait for a free slot
xlock( &imutex );
packet_queue.push( ipacket );
xsignal( &iav_or_eof );
xunlock( &imutex );
}
// distribute a packet to a worker
Packet * distribute_packet()
{
Packet * ipacket = 0;
xlock( &imutex );
++icheck_counter;
while( packet_queue.empty() && !eof )
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
}
if( !packet_queue.empty() )
{
ipacket = packet_queue.front();
packet_queue.pop();
}
xunlock( &imutex );
if( ipacket == 0 )
{
// Notify muxer when last worker exits
xlock( &omutex );
if( --num_working == 0 )
xsignal( &oav_or_exit );
xunlock( &omutex );
}
return ipacket;
}
// collect a packet from a worker
void collect_packet( Packet * const opacket )
{
xlock( &omutex );
// id collision shouldn't happen
assert( circular_buffer[opacket->id%num_slots] == 0 );
// Merge packet into circular buffer
circular_buffer[opacket->id%num_slots] = opacket;
if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
// deliver a packet to muxer
Packet * deliver_packet()
{
xlock( &omutex );
++ocheck_counter;
while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
}
Packet * opacket = circular_buffer[deliver_id%num_slots];
circular_buffer[deliver_id%num_slots] = 0;
++deliver_id;
xunlock( &omutex );
if( opacket != 0 )
slot_tally.leave_slot(); // return a slot to the tally
return opacket;
}
void finish() // splitter has no more packets to send
{
xlock( &imutex );
eof = true;
xbroadcast( &iav_or_eof );
xunlock( &imutex );
}
bool finished() // all packets delivered to muxer
{
if( !slot_tally.all_free() || !eof || !packet_queue.empty() ||
num_working != 0 ) return false;
for( int i = 0; i < num_slots; ++i )
if( circular_buffer[i] != 0 ) return false;
return true;
}
const Slot_tally & tally() const { return slot_tally; }
};
struct Splitter_arg
{
Packet_courier * courier;
int infd;
int data_size;
};
// split data from input file into chunks and pass them to
// courier for packaging and distribution to workers.
void * splitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
Packet_courier & courier = *tmp.courier;
const int infd = tmp.infd;
const int data_size = tmp.data_size;
for( bool first_post = true; ; first_post = false )
{
uint8_t * data = new( std::nothrow ) uint8_t[data_size];
if( data == 0 ) { show_error( "not enough memory" ); fatal(); }
const int size = readblock( infd, data, data_size );
if( size != data_size && errno ) { show_error( "read", errno ); fatal(); }
if( size > 0 || first_post ) // first packet can be empty
{
in_size += size;
courier.receive_packet( data, size );
}
else
{
delete[] data;
courier.finish(); // no more packets to send
break;
}
}
return 0;
}
struct Worker_arg
{
int dictionary_size;
int match_len_limit;
Packet_courier * courier;
};
// get packets from courier, replace their contents, and return
// them to courier.
void * worker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
Packet_courier & courier = *tmp.courier;
while( true )
{
Packet * packet = courier.distribute_packet();
if( packet == 0 ) break; // no more packets to process
const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size];
if( new_data == 0 ) { show_error( "not enough memory" ); fatal(); }
const int dict_size = std::max( LZ_min_dictionary_size(),
std::min( dictionary_size, packet->size ) );
LZ_Encoder * const encoder =
LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
{ show_error( "LZ_compress_open failed." ); fatal(); }
int written = 0;
int new_size = 0;
while( true )
{
if( LZ_compress_write_size( encoder ) > 0 )
{
if( written < packet->size )
{
const int wr = LZ_compress_write( encoder, packet->data + written,
packet->size - written );
if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
written += wr;
}
if( written >= packet->size ) LZ_compress_finish( encoder );
}
const int rd = LZ_compress_read( encoder, new_data + new_size,
compr_size - new_size );
if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
new_size += rd;
assert( new_size <= compr_size );
if( LZ_compress_finished( encoder ) == 1 ) break;
}
if( LZ_compress_close( encoder ) < 0 )
{ show_error( "LZ_compress_close failed." ); fatal(); }
delete[] packet->data;
packet->size = new_size;
packet->data = new_data;
courier.collect_packet( packet );
}
return 0;
}
// get from courier the processed and sorted packets, and write
// their contents to the output file.
void muxer( Packet_courier & courier, const int outfd )
{
while( true )
{
Packet * opacket = courier.deliver_packet();
if( opacket == 0 ) break; // queue is empty. all workers exited
out_size += opacket->size;
if( outfd >= 0 )
{
const int wr = writeblock( outfd, opacket->data, opacket->size );
if( wr != opacket->size )
{ show_error( "write", errno ); fatal(); }
}
delete[] opacket->data;
delete opacket;
}
}
} // end namespace
// init the courier, then start the splitter and the workers and
// call the muxer.
int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd,
const int debug_level )
{
in_size = 0;
out_size = 0;
Packet_courier courier( num_workers, num_slots );
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;
splitter_arg.infd = infd;
splitter_arg.data_size = data_size;
pthread_t splitter_thread;
xcreate( &splitter_thread, splitter, &splitter_arg );
Worker_arg worker_arg;
worker_arg.dictionary_size = dictionary_size;
worker_arg.match_len_limit = match_len_limit;
worker_arg.courier = &courier;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( worker_threads == 0 )
{ show_error( "not enough memory" ); fatal(); }
for( int i = 0; i < num_workers; ++i )
xcreate( &worker_threads[i], worker, &worker_arg );
muxer( courier, outfd );
for( int i = num_workers - 1; i >= 0; --i )
xjoin(worker_threads[i]);
delete[] worker_threads; worker_threads = 0;
xjoin( splitter_thread );
if( verbosity >= 1 )
{
if( in_size <= 0 || out_size <= 0 )
std::fprintf( stderr, "no data compressed.\n" );
else
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
"%5.2f%% saved, %lld in, %lld out.\n",
(double)in_size / out_size,
( 8.0 * out_size ) / in_size,
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
in_size, out_size );
}
if( debug_level & 1 )
std::fprintf( stderr,
"splitter tried to send a packet %8lu times\n"
"splitter had to wait %8lu times\n"
"any worker tried to consume from splitter %8lu times\n"
"any worker had to wait %8lu times\n"
"muxer tried to consume from workers %8lu times\n"
"muxer had to wait %8lu times\n",
courier.tally().check_counter,
courier.tally().wait_counter,
courier.icheck_counter,
courier.iwait_counter,
courier.ocheck_counter,
courier.owait_counter );
assert( courier.finished() );
return 0;
}

10
configure vendored
View file

@ -1,16 +1,16 @@
#! /bin/sh #! /bin/sh
# configure script for Plzip - A parallel version of the lzip data compressor # configure script for Plzip - A parallel compressor compatible with lzip
# Copyright (C) 2009, 2010 Antonio Diaz Diaz. # Copyright (C) 2009, 2010 Antonio Diaz Diaz.
# #
# This configure script is free software: you have unlimited permission # This configure script is free software: you have unlimited permission
# to copy, distribute and modify it. # to copy, distribute and modify it.
# #
# Date of this version: 2010-01-24 # Date of this version: 2010-01-31
args= args=
no_create= no_create=
pkgname=plzip pkgname=plzip
pkgversion=0.3 pkgversion=0.4
progname=plzip progname=plzip
srctrigger=plzip.h srctrigger=plzip.h
@ -135,7 +135,7 @@ if [ -z "${CXX}" ] ; then # Let the user override the test.
fi fi
echo echo
if [ -z ${no_create} ] ; then if [ -z "${no_create}" ] ; then
echo "creating config.status" echo "creating config.status"
rm -f config.status rm -f config.status
cat > config.status << EOF cat > config.status << EOF
@ -166,7 +166,7 @@ echo "CXXFLAGS = ${CXXFLAGS}"
echo "LDFLAGS = ${LDFLAGS}" echo "LDFLAGS = ${LDFLAGS}"
rm -f Makefile rm -f Makefile
cat > Makefile << EOF cat > Makefile << EOF
# Makefile for Plzip - A parallel version of the lzip data compressor # Makefile for Plzip - A parallel compressor compatible with lzip
# Copyright (C) 2009, 2010 Antonio Diaz Diaz. # Copyright (C) 2009, 2010 Antonio Diaz Diaz.
# This file was generated automatically by configure. Do not edit. # This file was generated automatically by configure. Do not edit.
# #

123
decompress.cc Normal file
View file

@ -0,0 +1,123 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _FILE_OFFSET_BITS 64
#include <algorithm>
#include <cerrno>
#include <climits>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <queue>
#include <string>
#include <vector>
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
#include "plzip.h"
namespace {
int do_decompress( LZ_Decoder * const decoder, const int infd, const int outfd,
const Pretty_print & pp, const bool testing )
{
const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
while( true )
{
int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
if( in_size > 0 )
{
const int max_in_size = in_size;
in_size = readblock( infd, in_buffer, max_in_size );
if( in_size != max_in_size && errno )
{ pp(); show_error( "read error", errno ); return 1; }
if( in_size == 0 ) LZ_decompress_finish( decoder );
else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) )
internal_error( "library error (LZ_decompress_write)" );
}
int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size );
// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size );
if( out_size < 0 )
{
const LZ_Errno lz_errno = LZ_decompress_errno( decoder );
if( lz_errno == LZ_header_error )
{
if( LZ_decompress_total_out_size( decoder ) > 0 )
break; // trailing garbage
pp( "error reading member header" );
return 1;
}
if( lz_errno == LZ_mem_error )
{
pp( "not enough memory. Find a machine with more memory" );
return 1;
}
pp();
if( lz_errno == LZ_unexpected_eof )
{
if( verbosity >= 0 )
std::fprintf( stderr, "file ends unexpectedly at pos %lld\n",
LZ_decompress_total_in_size( decoder ) );
return 2;
}
if( verbosity >= 0 )
std::fprintf( stderr, "LZ_decompress_read error: %s.\n",
LZ_strerror( LZ_decompress_errno( decoder ) ) );
return 1;
}
else if( out_size > 0 && outfd >= 0 )
{
const int wr = writeblock( outfd, out_buffer, out_size );
if( wr != out_size )
{ pp(); show_error( "write error", errno ); return 1; }
}
if( LZ_decompress_finished( decoder ) == 1 ) break;
if( in_size == 0 && out_size == 0 )
internal_error( "library error (LZ_decompress_read)" );
}
if( verbosity >= 1 )
{ if( testing ) std::fprintf( stderr, "ok\n" );
else std::fprintf( stderr, "done\n" ); }
return 0;
}
} // end namespace
int decompress( const int infd, const int outfd, const Pretty_print & pp,
const bool testing )
{
LZ_Decoder * const decoder = LZ_decompress_open();
int retval;
if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
{
pp( "not enough memory" );
retval = 1;
}
else retval = do_decompress( decoder, infd, outfd, pp, testing );
LZ_decompress_close( decoder );
return retval;
}

View file

@ -1,12 +1,12 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36. .\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36.
.TH PLZIP "1" "January 2010" "Plzip 0.3" "User Commands" .TH PLZIP "1" "January 2010" "Plzip 0.4" "User Commands"
.SH NAME .SH NAME
Plzip \- data compressor based on the LZMA algorithm Plzip \- data compressor based on the LZMA algorithm
.SH SYNOPSIS .SH SYNOPSIS
.B plzip .B plzip
[\fIoptions\fR] [\fIfiles\fR] [\fIoptions\fR] [\fIfiles\fR]
.SH DESCRIPTION .SH DESCRIPTION
Plzip \- A parallel version of the lzip data compressor. Plzip \- A parallel compressor compatible with lzip.
.SH OPTIONS .SH OPTIONS
.TP .TP
\fB\-h\fR, \fB\-\-help\fR \fB\-h\fR, \fB\-\-help\fR
@ -71,6 +71,7 @@ Plzip home page: http://www.nongnu.org/lzip/plzip.html
Copyright \(co 2009 Laszlo Ersek. Copyright \(co 2009 Laszlo Ersek.
.br .br
Copyright \(co 2010 Antonio Diaz Diaz. Copyright \(co 2010 Antonio Diaz Diaz.
Using Lzlib 0.9-rc1
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html> License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
.br .br
This is free software: you are free to change and redistribute it. This is free software: you are free to change and redistribute it.

View file

@ -3,7 +3,7 @@ plzip.texinfo.
INFO-DIR-SECTION Data Compression INFO-DIR-SECTION Data Compression
START-INFO-DIR-ENTRY START-INFO-DIR-ENTRY
* Plzip: (plzip). Parallel version of the lzip data compressor * Plzip: (plzip). Parallel compressor compatible with lzip
END-INFO-DIR-ENTRY END-INFO-DIR-ENTRY
 
@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir)
Plzip Manual Plzip Manual
************ ************
This manual is for Plzip (version 0.3, 24 January 2010). This manual is for Plzip (version 0.4, 31 January 2010).
* Menu: * Menu:
@ -34,11 +34,13 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T
1 Introduction 1 Introduction
************** **************
Plzip is a parallel version of the lzip data compressor. The files Plzip is a massively parallel (multithreaded) data compressor compatible
produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is with the lzip file format. The files produced by plzip are fully
intended for faster compression/decompression of big files on compatible with lzip-1.4 or newer. Plzip is intended for faster
multiprocessor machines. Currently only compression is performed in compression/decompression of big files on multiprocessor machines. On
parallel. Parallel decompression is planned to be implemented later. files big enough, plzip can use hundreds of processors. Currently only
compression is performed in parallel. Parallel decompression is planned
to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of very safe integrity checking and a user interface similar to the one of
@ -303,11 +305,11 @@ Concept Index
 
Tag Table: Tag Table:
Node: Top227 Node: Top223
Node: Introduction750 Node: Introduction746
Node: Invoking Plzip3571 Node: Invoking Plzip3669
Node: File Format7260 Node: File Format7358
Node: Problems9216 Node: Problems9314
Node: Concept Index9745 Node: Concept Index9843
 
End Tag Table End Tag Table

View file

@ -5,12 +5,12 @@
@finalout @finalout
@c %**end of header @c %**end of header
@set UPDATED 24 January 2010 @set UPDATED 31 January 2010
@set VERSION 0.3 @set VERSION 0.4
@dircategory Data Compression @dircategory Data Compression
@direntry @direntry
* Plzip: (plzip). Parallel version of the lzip data compressor * Plzip: (plzip). Parallel compressor compatible with lzip
@end direntry @end direntry
@ -50,11 +50,13 @@ to copy, distribute and modify it.
@chapter Introduction @chapter Introduction
@cindex introduction @cindex introduction
Plzip is a parallel version of the lzip data compressor. The files Plzip is a massively parallel (multithreaded) data compressor compatible
produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is with the lzip file format. The files produced by plzip are fully
intended for faster compression/decompression of big files on compatible with lzip-1.4 or newer. Plzip is intended for faster
multiprocessor machines. Currently only compression is performed in compression/decompression of big files on multiprocessor machines. On
parallel. Parallel decompression is planned to be implemented later. files big enough, plzip can use hundreds of processors. Currently only
compression is performed in parallel. Parallel decompression is planned
to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of very safe integrity checking and a user interface similar to the one of

128
main.cc
View file

@ -1,4 +1,4 @@
/* Plzip - A parallel version of the lzip data compressor /* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek. Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz. Copyright (C) 2009, 2010 Antonio Diaz Diaz.
@ -58,8 +58,6 @@
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL #define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
#endif #endif
void internal_error( const char * msg );
namespace { namespace {
@ -93,44 +91,10 @@ bool delete_output_on_interrupt = false;
pthread_t main_thread; pthread_t main_thread;
pid_t main_thread_pid; pid_t main_thread_pid;
class Pretty_print
{
const char * const stdin_name;
const unsigned int stdin_name_len;
unsigned int longest_name;
std::string name_;
mutable bool first_post;
public:
Pretty_print( const std::vector< std::string > & filenames )
: stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ),
longest_name( 0 ), first_post( false )
{
for( unsigned int i = 0; i < filenames.size(); ++i )
{
const std::string & s = filenames[i];
const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() );
if( len > longest_name ) longest_name = len;
}
if( longest_name == 0 ) longest_name = stdin_name_len;
}
void set_name( const std::string & filename )
{
if( filename.size() && filename != "-" ) name_ = filename;
else name_ = stdin_name;
first_post = true;
}
void reset() const throw() { if( name_.size() ) first_post = true; }
const char * name() const throw() { return name_.c_str(); }
void operator()( const char * const msg = 0 ) const throw();
};
void show_help() throw() void show_help() throw()
{ {
std::printf( "%s - A parallel version of the lzip data compressor.\n", Program_name ); std::printf( "%s - A parallel compressor compatible with lzip.\n", Program_name );
std::printf( "\nUsage: %s [options] [files]\n", invocation_name ); std::printf( "\nUsage: %s [options] [files]\n", invocation_name );
std::printf( "\nOptions:\n" ); std::printf( "\nOptions:\n" );
std::printf( " -h, --help display this help and exit\n" ); std::printf( " -h, --help display this help and exit\n" );
@ -154,7 +118,7 @@ void show_help() throw()
std::printf( " --best alias for -9\n" ); std::printf( " --best alias for -9\n" );
if( verbosity > 0 ) if( verbosity > 0 )
{ {
std::printf( " -D, --debug=<level> (0-3) print debug statistics to stderr\n" ); std::printf( " -D, --debug=<level> (0-1) print debug statistics to stderr\n" );
} }
std::printf( "If no file names are given, %s compresses or decompresses\n", program_name ); std::printf( "If no file names are given, %s compresses or decompresses\n", program_name );
std::printf( "from standard input to standard output.\n" ); std::printf( "from standard input to standard output.\n" );
@ -170,6 +134,7 @@ void show_version() throw()
std::printf( "%s %s\n", Program_name, PROGVERSION ); std::printf( "%s %s\n", Program_name, PROGVERSION );
std::printf( "Copyright (C) 2009 Laszlo Ersek.\n" ); std::printf( "Copyright (C) 2009 Laszlo Ersek.\n" );
std::printf( "Copyright (C) %s Antonio Diaz Diaz.\n", program_year ); std::printf( "Copyright (C) %s Antonio Diaz Diaz.\n", program_year );
std::printf( "Using Lzlib %s\n", LZ_version() );
std::printf( "License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>\n" ); std::printf( "License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>\n" );
std::printf( "This is free software: you are free to change and redistribute it.\n" ); std::printf( "This is free software: you are free to change and redistribute it.\n" );
std::printf( "There is NO WARRANTY, to the extent permitted by law.\n" ); std::printf( "There is NO WARRANTY, to the extent permitted by law.\n" );
@ -413,89 +378,6 @@ void close_and_set_permissions( const struct stat * const in_statsp )
} }
int do_decompress( LZ_Decoder * const decoder, const int inhandle,
const Pretty_print & pp, const bool testing )
{
const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
while( true )
{
int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
if( in_size > 0 )
{
const int max_in_size = in_size;
in_size = readblock( inhandle, in_buffer, max_in_size );
if( in_size != max_in_size && errno )
{ pp(); show_error( "read error", errno ); return 1; }
if( in_size == 0 ) LZ_decompress_finish( decoder );
else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) )
internal_error( "library error (LZ_decompress_write)" );
}
int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size );
// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size );
if( out_size < 0 )
{
const LZ_Errno lz_errno = LZ_decompress_errno( decoder );
if( lz_errno == LZ_header_error )
{
if( LZ_decompress_total_out_size( decoder ) > 0 )
break; // trailing garbage
pp( "error reading member header" );
return 1;
}
if( lz_errno == LZ_mem_error )
{
pp( "not enough memory. Find a machine with more memory" );
return 1;
}
pp();
if( lz_errno == LZ_unexpected_eof )
{
if( verbosity >= 0 )
std::fprintf( stderr, "file ends unexpectedly at pos %lld\n",
LZ_decompress_total_in_size( decoder ) );
return 2;
}
if( verbosity >= 0 )
std::fprintf( stderr, "LZ_decompress_read error: %s.\n",
LZ_strerror( LZ_decompress_errno( decoder ) ) );
return 1;
}
else if( out_size > 0 && outhandle >= 0 )
{
const int wr = writeblock( outhandle, out_buffer, out_size );
if( wr != out_size )
{ pp(); show_error( "write error", errno ); return 1; }
}
if( LZ_decompress_finished( decoder ) == 1 ) break;
if( in_size == 0 && out_size == 0 )
internal_error( "library error (LZ_decompress_read)" );
}
if( verbosity >= 1 )
{ if( testing ) std::fprintf( stderr, "ok\n" );
else std::fprintf( stderr, "done\n" ); }
return 0;
}
int decompress( const int inhandle, const Pretty_print & pp,
const bool testing )
{
LZ_Decoder * const decoder = LZ_decompress_open();
int retval;
if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
{
pp( "not enough memory" );
retval = 1;
}
else retval = do_decompress( decoder, inhandle, pp, testing );
LZ_decompress_close( decoder );
return retval;
}
extern "C" void signal_handler( int sig ) throw() extern "C" void signal_handler( int sig ) throw()
{ {
if( !pthread_equal( pthread_self(), main_thread ) ) if( !pthread_equal( pthread_self(), main_thread ) )
@ -810,7 +692,7 @@ int main( const int argc, const char * argv[] )
encoder_options.match_len_limit, num_workers, encoder_options.match_len_limit, num_workers,
num_slots, inhandle, outhandle, debug_level ); num_slots, inhandle, outhandle, debug_level );
else else
tmp = decompress( inhandle, pp, program_mode == m_test ); tmp = decompress( inhandle, outhandle, pp, program_mode == m_test );
if( tmp > retval ) retval = tmp; if( tmp > retval ) retval = tmp;
if( tmp && program_mode != m_test ) cleanup_and_fail( retval ); if( tmp && program_mode != m_test ) cleanup_and_fail( retval );

548
plzip.cc
View file

@ -1,548 +0,0 @@
/* Plzip - A parallel version of the lzip data compressor
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _FILE_OFFSET_BITS 64
#include <algorithm>
#include <cassert>
#include <cerrno>
#include <climits>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <vector>
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
#include "plzip.h"
#ifndef LLONG_MAX
#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
#endif
#ifndef LLONG_MIN
#define LLONG_MIN (-LLONG_MAX - 1LL)
#endif
#ifndef ULLONG_MAX
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
#endif
namespace {
long long in_size = 0;
long long out_size = 0;
void *(*mallocf)( size_t size );
void (*freef)( void *ptr );
void * trace_malloc( size_t size )
{
int save_errno = 0;
void * ret = malloc( size );
if( ret == 0 ) save_errno = errno;
std::fprintf( stderr, "malloc(%lu) == %p\n", (unsigned long)size, ret );
if( ret == 0 ) errno = save_errno;
return ret;
}
void trace_free( void *ptr )
{
std::fprintf( stderr, "free(%p)\n", ptr );
free( ptr );
}
void * xalloc( size_t size )
{
void *ret = (*mallocf)( size );
if( ret == 0 ) { show_error( "not enough memory", errno ); fatal(); }
return ret;
}
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_mutex_init( mutex, 0 );
if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
ret = pthread_cond_init( cond, 0 );
if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
}
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_destroy( cond );
if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
ret = pthread_mutex_destroy( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
}
void xlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_lock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
}
void xunlock( pthread_mutex_t * mutex )
{
int ret = pthread_mutex_unlock( mutex );
if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
}
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
int ret = pthread_cond_wait( cond, mutex );
if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
}
void xsignal( pthread_cond_t * cond )
{
int ret = pthread_cond_signal( cond );
if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
}
void xbroadcast( pthread_cond_t * cond )
{
int ret = pthread_cond_broadcast( cond );
if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
}
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
{
int ret = pthread_create( thread, 0, routine, arg );
if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
}
void xjoin( pthread_t thread )
{
int ret = pthread_join( thread, 0 );
if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
}
struct Slot_tally // Synchronizes splitter to muxer
{
unsigned long check_counter;
unsigned long wait_counter;
int num_free; // Number of free slots
pthread_mutex_t mutex;
pthread_cond_t slot_av; // Free slot available
Slot_tally( const int slots )
: check_counter( 0 ), wait_counter( 0 ), num_free( slots )
{ xinit( &slot_av, &mutex ); }
~Slot_tally() { xdestroy( &slot_av, &mutex ); }
};
struct S2w_blk // Splitter to worker data block
{
unsigned long long id; // Block serial number as read from infd
S2w_blk *next; // Next in queue
int loaded; // # of bytes in plain, may be 0 for 1st
uint8_t plain[1]; // Data read from infd, allocated: data_size
};
struct S2w_queue
{
S2w_blk * head; // Next ready worker shall compress this
S2w_blk * tail; // Splitter will append here
unsigned long check_counter;
unsigned long wait_counter;
pthread_mutex_t mutex;
pthread_cond_t av_or_eof; // New block available or splitter done
bool eof; // Splitter done
S2w_queue()
: head( 0 ), tail( 0 ), check_counter( 0 ), wait_counter( 0 ), eof( false )
{ xinit( &av_or_eof, &mutex ); }
~S2w_queue() { xdestroy( &av_or_eof, &mutex ); }
};
struct W2m_blk // Worker to muxer data block
{
unsigned long long id; // Block index as read from infd
W2m_blk *next; // Next block in list (unordered)
int produced; // Number of bytes in compr
uint8_t compr[1]; // Data to write to outfd, alloc.: compr_size
};
struct W2m_queue
{
unsigned long long needed_id; // Block needed for resuming writing
W2m_blk *head; // Block list (unordered)
unsigned long check_counter;
unsigned long wait_counter;
int num_working; // Number of workers still running
pthread_mutex_t mutex;
pthread_cond_t av_or_exit; // New block available or all workers exited
W2m_queue( const int num_workers )
: needed_id( 0 ), head( 0 ), check_counter( 0 ), wait_counter( 0 ),
num_working( num_workers )
{ xinit( &av_or_exit, &mutex ); }
~W2m_queue() { xdestroy( &av_or_exit, &mutex ); }
};
struct Splitter_arg
{
Slot_tally * slot_tally;
S2w_queue * s2w_queue;
int infd;
int data_size;
int s2w_blk_size;
};
void * splitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
Slot_tally & slot_tally = *tmp.slot_tally;
S2w_queue & s2w_queue = *tmp.s2w_queue;
const int infd = tmp.infd;
const int data_size = tmp.data_size;
const int s2w_blk_size = tmp.s2w_blk_size;
for( unsigned long long id = 0; ; ++id )
{
S2w_blk * s2w_blk = (S2w_blk *)xalloc( s2w_blk_size );
// Fill block
const int rd = readblock( infd, s2w_blk->plain, data_size );
if( rd != data_size && errno ) { show_error( "read", errno ); fatal(); }
if( rd > 0 || id == 0 ) // first block can be empty
{
s2w_blk->id = id;
s2w_blk->next = 0;
s2w_blk->loaded = rd;
in_size += rd;
xlock( &slot_tally.mutex ); // Grab a free slot
++slot_tally.check_counter;
while( slot_tally.num_free == 0 )
{
++slot_tally.wait_counter;
xwait( &slot_tally.slot_av, &slot_tally.mutex );
}
--slot_tally.num_free;
xunlock( &slot_tally.mutex );
}
else
{ (*freef)( s2w_blk ); s2w_blk = 0; }
xlock( &s2w_queue.mutex );
if( s2w_blk != 0 )
{
if( s2w_queue.tail == 0 ) s2w_queue.head = s2w_blk;
else s2w_queue.tail->next = s2w_blk;
s2w_queue.tail = s2w_blk;
xsignal( &s2w_queue.av_or_eof );
}
else
{
s2w_queue.eof = true;
xbroadcast( &s2w_queue.av_or_eof );
}
xunlock( &s2w_queue.mutex );
if( s2w_blk == 0 ) break;
}
return 0;
}
void work_compr( const int dictionary_size, const int match_len_limit,
const S2w_blk & s2w_blk, W2m_queue & w2m_queue,
const int compr_size, const int w2m_blk_size )
{
assert( s2w_blk.loaded > 0 || s2w_blk.id == 0 );
W2m_blk * w2m_blk = (W2m_blk *)xalloc( w2m_blk_size );
const int dict_size = std::max( LZ_min_dictionary_size(),
std::min( dictionary_size, s2w_blk.loaded ) );
LZ_Encoder * const encoder =
LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
{ show_error( "LZ_compress_open failed." ); fatal(); }
int written = 0;
w2m_blk->produced = 0;
while( true )
{
if( LZ_compress_write_size( encoder ) > 0 )
{
if( written < s2w_blk.loaded )
{
const int wr = LZ_compress_write( encoder, s2w_blk.plain + written,
s2w_blk.loaded - written );
if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
written += wr;
}
if( written >= s2w_blk.loaded ) LZ_compress_finish( encoder );
}
assert( w2m_blk->produced < compr_size );
const int rd = LZ_compress_read( encoder, w2m_blk->compr + w2m_blk->produced,
compr_size - w2m_blk->produced );
if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
w2m_blk->produced += rd;
if( LZ_compress_finished( encoder ) == 1 ) break;
}
if( LZ_compress_close( encoder ) < 0 )
{ show_error( "LZ_compress_close failed." ); fatal(); }
w2m_blk->id = s2w_blk.id;
// Push block to muxer queue
xlock( &w2m_queue.mutex );
w2m_blk->next = w2m_queue.head;
w2m_queue.head = w2m_blk;
if( w2m_blk->id == w2m_queue.needed_id ) xsignal( &w2m_queue.av_or_exit );
xunlock( &w2m_queue.mutex );
}
struct Worker_arg
{
int dictionary_size;
int match_len_limit;
S2w_queue * s2w_queue;
W2m_queue * w2m_queue;
int compr_size;
int w2m_blk_size;
};
void * worker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
S2w_queue & s2w_queue = *tmp.s2w_queue;
W2m_queue & w2m_queue = *tmp.w2m_queue;
const int compr_size = tmp.compr_size;
const int w2m_blk_size = tmp.w2m_blk_size;
while( true )
{
S2w_blk *s2w_blk;
// Grab a block to work on
xlock( &s2w_queue.mutex );
++s2w_queue.check_counter;
while( s2w_queue.head == 0 && !s2w_queue.eof )
{
++s2w_queue.wait_counter;
xwait( &s2w_queue.av_or_eof, &s2w_queue.mutex );
}
if( s2w_queue.head == 0 ) // No blocks available and splitter exited
{
xunlock( &s2w_queue.mutex );
break;
}
s2w_blk = s2w_queue.head;
s2w_queue.head = s2w_blk->next;
if( s2w_queue.head == 0 ) s2w_queue.tail = 0;
xunlock( &s2w_queue.mutex );
work_compr( dictionary_size, match_len_limit, *s2w_blk, w2m_queue,
compr_size, w2m_blk_size );
(*freef)( s2w_blk );
}
// Notify muxer when last worker exits
xlock( &w2m_queue.mutex );
if( --w2m_queue.num_working == 0 && w2m_queue.head == 0 )
xsignal( &w2m_queue.av_or_exit );
xunlock( &w2m_queue.mutex );
return 0;
}
void muxer( Slot_tally & slot_tally, W2m_queue & w2m_queue,
const int num_slots, const int outfd )
{
unsigned long long needed_id = 0;
std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
xlock( &w2m_queue.mutex );
while( true )
{
// Grab all available compressed blocks in one step
++w2m_queue.check_counter;
while( w2m_queue.head == 0 && w2m_queue.num_working > 0 )
{
++w2m_queue.wait_counter;
xwait( &w2m_queue.av_or_exit, &w2m_queue.mutex );
}
if( w2m_queue.head == 0 ) break; // queue is empty. all workers exited
W2m_blk * w2m_blk = w2m_queue.head;
w2m_queue.head = 0;
xunlock( &w2m_queue.mutex );
// Merge blocks fetched this time into circular buffer
do {
// id collision shouldn't happen
assert( circular_buffer[w2m_blk->id%num_slots] == 0 );
circular_buffer[w2m_blk->id%num_slots] = w2m_blk;
W2m_blk * next = w2m_blk->next;
w2m_blk->next = 0;
w2m_blk = next;
} while( w2m_blk != 0 );
// Write out initial continuous sequence of reordered blocks
while( true )
{
w2m_blk = circular_buffer[needed_id%num_slots];
if( w2m_blk == 0 ) break;
out_size += w2m_blk->produced;
if( outfd >= 0 )
{
const int wr = writeblock( outfd, w2m_blk->compr, w2m_blk->produced );
if( wr != w2m_blk->produced )
{ show_error( "write", errno ); fatal(); }
}
circular_buffer[needed_id%num_slots] = 0;
++needed_id;
xlock( &slot_tally.mutex );
if( slot_tally.num_free++ == 0 ) xsignal( &slot_tally.slot_av );
xunlock( &slot_tally.mutex );
(*freef)( w2m_blk );
}
xlock( &w2m_queue.mutex );
w2m_queue.needed_id = needed_id;
}
xunlock( &w2m_queue.mutex );
for( int i = 0; i < num_slots; ++i )
if( circular_buffer[i] != 0 )
{ show_error( "circular buffer not empty" ); fatal(); }
}
} // end namespace
int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd,
const int debug_level )
{
if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
else { mallocf = malloc; freef = free; }
Slot_tally slot_tally( num_slots );
S2w_queue s2w_queue;
W2m_queue w2m_queue( num_workers );
Splitter_arg splitter_arg;
splitter_arg.slot_tally = &slot_tally;
splitter_arg.s2w_queue = &s2w_queue;
splitter_arg.infd = infd;
splitter_arg.data_size = data_size;
splitter_arg.s2w_blk_size = sizeof (S2w_blk) + data_size - 1;
pthread_t splitter_thread;
xcreate( &splitter_thread, splitter, &splitter_arg );
Worker_arg worker_arg;
worker_arg.dictionary_size = dictionary_size;
worker_arg.match_len_limit = match_len_limit;
worker_arg.s2w_queue = &s2w_queue;
worker_arg.w2m_queue = &w2m_queue;
worker_arg.compr_size = 6 + 20 + ( ( data_size / 8 ) * 9 );
worker_arg.w2m_blk_size = sizeof (W2m_blk) + worker_arg.compr_size - 1;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( worker_threads == 0 )
{ show_error( "not enough memory.", errno ); fatal(); }
for( int i = 0; i < num_workers; ++i )
xcreate( &worker_threads[i], worker, &worker_arg );
muxer( slot_tally, w2m_queue, num_slots, outfd );
for( int i = num_workers - 1; i >= 0; --i )
xjoin(worker_threads[i]);
delete[] worker_threads; worker_threads = 0;
xjoin( splitter_thread );
if( verbosity >= 1 )
{
if( in_size <= 0 || out_size <= 0 )
std::fprintf( stderr, "no data compressed.\n" );
else
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
"%5.2f%% saved, %lld in, %lld out.\n",
(double)in_size / out_size,
( 8.0 * out_size ) / in_size,
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
in_size, out_size );
}
const int FW = ( sizeof (unsigned long) * 8 ) / 3 + 1;
if( debug_level & 1 )
std::fprintf( stderr,
"any worker tried to consume from splitter: %*lu\n"
"any worker stalled : %*lu\n"
"muxer tried to consume from workers : %*lu\n"
"muxer stalled : %*lu\n"
"splitter tried to fill a block : %*lu\n"
"splitter stalled : %*lu\n",
FW, s2w_queue.check_counter,
FW, s2w_queue.wait_counter,
FW, w2m_queue.check_counter,
FW, w2m_queue.wait_counter,
FW, slot_tally.check_counter,
FW, slot_tally.wait_counter );
assert( slot_tally.num_free == num_slots );
assert( s2w_queue.eof );
assert( s2w_queue.head == 0 );
assert( s2w_queue.tail == 0 );
assert( w2m_queue.num_working == 0 );
assert( w2m_queue.head == 0 );
return 0;
}

102
plzip.h
View file

@ -1,4 +1,4 @@
/* Plzip - A parallel version of the lzip data compressor /* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek. Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz. Copyright (C) 2009, 2010 Antonio Diaz Diaz.
@ -16,19 +16,113 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
class Pretty_print
{
const char * const stdin_name;
const unsigned int stdin_name_len;
unsigned int longest_name;
std::string name_;
mutable bool first_post;
public:
Pretty_print( const std::vector< std::string > & filenames )
: stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ),
longest_name( 0 ), first_post( false )
{
for( unsigned int i = 0; i < filenames.size(); ++i )
{
const std::string & s = filenames[i];
const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() );
if( len > longest_name ) longest_name = len;
}
if( longest_name == 0 ) longest_name = stdin_name_len;
}
void set_name( const std::string & filename )
{
if( filename.size() && filename != "-" ) name_ = filename;
else name_ = stdin_name;
first_post = true;
}
void reset() const throw() { if( name_.size() ) first_post = true; }
const char * name() const throw() { return name_.c_str(); }
void operator()( const char * const msg = 0 ) const throw();
};
/*--------------------- Defined in compress.cc ---------------------*/
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex );
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex );
void xlock( pthread_mutex_t * mutex );
void xunlock( pthread_mutex_t * mutex );
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex );
void xsignal( pthread_cond_t * cond );
void xbroadcast( pthread_cond_t * cond );
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg );
void xjoin( pthread_t thread );
class Slot_tally
{
public:
unsigned long check_counter;
unsigned long wait_counter;
private:
const int num_slots; // total slots
int num_free; // remaining free slots
pthread_mutex_t mutex;
pthread_cond_t slot_av; // free slot available
public:
Slot_tally( const int slots )
: check_counter( 0 ), wait_counter( 0 ),
num_slots( slots ), num_free( slots )
{ xinit( &slot_av, &mutex ); }
~Slot_tally() { xdestroy( &slot_av, &mutex ); }
bool all_free() { return ( num_free == num_slots ); }
void get_slot() // wait for a free slot
{
xlock( &mutex );
++check_counter;
while( num_free == 0 )
{ ++wait_counter; xwait( &slot_av, &mutex ); }
--num_free;
xunlock( &mutex );
}
void leave_slot() // return a slot to the tally
{
xlock( &mutex );
if( num_free++ == 0 ) xsignal( &slot_av );
xunlock( &mutex );
}
};
int compress( const int data_size, const int dictionary_size, int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers, const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd, const int num_slots, const int infd, const int outfd,
const int debug_level ); const int debug_level );
/*-------------------- Defined in decompress.cc --------------------*/
int decompress( const int infd, const int outfd, const Pretty_print & pp,
const bool testing );
/*----------------------- Defined in main.cc -----------------------*/ /*----------------------- Defined in main.cc -----------------------*/
extern int verbosity;
void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw(); void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
void internal_error( const char * msg );
int readblock( const int fd, uint8_t * buf, const int size ) throw(); int readblock( const int fd, uint8_t * buf, const int size ) throw();
int writeblock( const int fd, const uint8_t * buf, const int size ) throw(); int writeblock( const int fd, const uint8_t * buf, const int size ) throw();
void fatal(); // Terminate the process void fatal(); // Terminate the process
extern int verbosity;

View file

@ -1,5 +1,5 @@
#! /bin/sh #! /bin/sh
# check script for Plzip - A parallel version of the lzip data compressor # check script for Plzip - A parallel compressor compatible with lzip
# Copyright (C) 2009, 2010 Antonio Diaz Diaz. # Copyright (C) 2009, 2010 Antonio Diaz Diaz.
# #
# This script is free software: you have unlimited permission # This script is free software: you have unlimited permission