Adding upstream version 0.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
e1ed3a8d42
commit
393d6b555b
14 changed files with 750 additions and 727 deletions
|
@ -1,3 +1,10 @@
|
||||||
|
2010-01-31 Antonio Diaz Diaz <ant_diaz@teleline.es>
|
||||||
|
|
||||||
|
* Version 0.4 released.
|
||||||
|
* main.cc (show_version): Show the version of lzlib being used.
|
||||||
|
* Code reorganization. Class Packet_courier now coordinates data
|
||||||
|
movement and synchronization among threads.
|
||||||
|
|
||||||
2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es>
|
2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es>
|
||||||
|
|
||||||
* Version 0.3 released.
|
* Version 0.3 released.
|
||||||
|
|
27
Makefile.in
27
Makefile.in
|
@ -7,7 +7,7 @@ INSTALL_DIR = $(INSTALL) -d -m 755
|
||||||
LIBS = -llz -lpthread
|
LIBS = -llz -lpthread
|
||||||
SHELL = /bin/sh
|
SHELL = /bin/sh
|
||||||
|
|
||||||
objs = arg_parser.o plzip.o main.o
|
objs = arg_parser.o compress.o decompress.o main.o
|
||||||
|
|
||||||
|
|
||||||
.PHONY : all install install-info install-man install-strip \
|
.PHONY : all install install-info install-man install-strip \
|
||||||
|
@ -30,8 +30,9 @@ main.o : main.cc
|
||||||
|
|
||||||
$(objs) : Makefile
|
$(objs) : Makefile
|
||||||
arg_parser.o : arg_parser.h
|
arg_parser.o : arg_parser.h
|
||||||
|
compress.o : plzip.h
|
||||||
|
decompress.o : plzip.h
|
||||||
main.o : arg_parser.h plzip.h
|
main.o : arg_parser.h plzip.h
|
||||||
plzip.o : plzip.h
|
|
||||||
|
|
||||||
doc : info man
|
doc : info man
|
||||||
|
|
||||||
|
@ -53,30 +54,30 @@ check : all
|
||||||
@$(VPATH)/testsuite/check.sh $(VPATH)/testsuite
|
@$(VPATH)/testsuite/check.sh $(VPATH)/testsuite
|
||||||
|
|
||||||
install : all install-info install-man
|
install : all install-info install-man
|
||||||
if [ ! -d $(DESTDIR)$(bindir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(bindir) ; fi
|
if [ ! -d "$(DESTDIR)$(bindir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(bindir)" ; fi
|
||||||
$(INSTALL_PROGRAM) ./$(progname) $(DESTDIR)$(bindir)/$(progname)
|
$(INSTALL_PROGRAM) ./$(progname) "$(DESTDIR)$(bindir)/$(progname)"
|
||||||
|
|
||||||
install-info :
|
install-info :
|
||||||
if [ ! -d $(DESTDIR)$(infodir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(infodir) ; fi
|
if [ ! -d "$(DESTDIR)$(infodir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(infodir)" ; fi
|
||||||
$(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info $(DESTDIR)$(infodir)/$(pkgname).info
|
$(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info "$(DESTDIR)$(infodir)/$(pkgname).info"
|
||||||
-install-info --info-dir=$(DESTDIR)$(infodir) $(DESTDIR)$(infodir)/$(pkgname).info
|
-install-info --info-dir="$(DESTDIR)$(infodir)" "$(DESTDIR)$(infodir)/$(pkgname).info"
|
||||||
|
|
||||||
install-man :
|
install-man :
|
||||||
if [ ! -d $(DESTDIR)$(mandir)/man1 ] ; then $(INSTALL_DIR) $(DESTDIR)$(mandir)/man1 ; fi
|
if [ ! -d "$(DESTDIR)$(mandir)/man1" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(mandir)/man1" ; fi
|
||||||
$(INSTALL_DATA) $(VPATH)/doc/$(progname).1 $(DESTDIR)$(mandir)/man1/$(progname).1
|
$(INSTALL_DATA) $(VPATH)/doc/$(progname).1 "$(DESTDIR)$(mandir)/man1/$(progname).1"
|
||||||
|
|
||||||
install-strip : all
|
install-strip : all
|
||||||
$(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install
|
$(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install
|
||||||
|
|
||||||
uninstall : uninstall-info uninstall-man
|
uninstall : uninstall-info uninstall-man
|
||||||
-rm -f $(DESTDIR)$(bindir)/$(progname)
|
-rm -f "$(DESTDIR)$(bindir)/$(progname)"
|
||||||
|
|
||||||
uninstall-info :
|
uninstall-info :
|
||||||
-install-info --info-dir=$(DESTDIR)$(infodir) --remove $(DESTDIR)$(infodir)/$(pkgname).info
|
-install-info --info-dir="$(DESTDIR)$(infodir)" --remove "$(DESTDIR)$(infodir)/$(pkgname).info"
|
||||||
-rm -f $(DESTDIR)$(infodir)/$(pkgname).info
|
-rm -f "$(DESTDIR)$(infodir)/$(pkgname).info"
|
||||||
|
|
||||||
uninstall-man :
|
uninstall-man :
|
||||||
-rm -f $(DESTDIR)$(mandir)/man1/$(progname).1
|
-rm -f "$(DESTDIR)$(mandir)/man1/$(progname).1"
|
||||||
|
|
||||||
dist : doc
|
dist : doc
|
||||||
ln -sf $(VPATH) $(DISTNAME)
|
ln -sf $(VPATH) $(DISTNAME)
|
||||||
|
|
13
NEWS
13
NEWS
|
@ -1,11 +1,6 @@
|
||||||
Changes in version 0.3:
|
Changes in version 0.4:
|
||||||
|
|
||||||
New option "--data-size" has been added.
|
The option "--version" now shows the version of lzlib being used.
|
||||||
|
|
||||||
Output file is now removed if plzip is interrupted.
|
A code reorganization has been made. The new class "Packet_courier" now
|
||||||
|
coordinates data movement and synchronization among threads.
|
||||||
This version automatically chooses the smallest possible dictionary size
|
|
||||||
for each member during compression, saving memory during decompression.
|
|
||||||
|
|
||||||
Regular files are now open in binary mode in non-POSIX platforms
|
|
||||||
defining the O_BINARY macro.
|
|
||||||
|
|
6
README
6
README
|
@ -1,6 +1,10 @@
|
||||||
Description
|
Description
|
||||||
|
|
||||||
Plzip is a parallel version of the lzip data compressor. Currently only
|
Plzip is a massively parallel (multithreaded) data compressor compatible
|
||||||
|
with the lzip file format. The files produced by plzip are fully
|
||||||
|
compatible with lzip-1.4 or newer. Plzip is intended for faster
|
||||||
|
compression/decompression of big files on multiprocessor machines. On
|
||||||
|
files big enough, plzip can use hundreds of processors. Currently only
|
||||||
compression is performed in parallel. Parallel decompression is planned
|
compression is performed in parallel. Parallel decompression is planned
|
||||||
to be implemented later.
|
to be implemented later.
|
||||||
|
|
||||||
|
|
460
compress.cc
Normal file
460
compress.cc
Normal file
|
@ -0,0 +1,460 @@
|
||||||
|
/* Plzip - A parallel compressor compatible with lzip
|
||||||
|
Copyright (C) 2009 Laszlo Ersek.
|
||||||
|
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
||||||
|
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _FILE_OFFSET_BITS 64
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <cassert>
|
||||||
|
#include <cerrno>
|
||||||
|
#include <climits>
|
||||||
|
#include <csignal>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
#include <queue>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <lzlib.h>
|
||||||
|
|
||||||
|
#include "plzip.h"
|
||||||
|
|
||||||
|
#ifndef LLONG_MAX
|
||||||
|
#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
|
||||||
|
#endif
|
||||||
|
#ifndef LLONG_MIN
|
||||||
|
#define LLONG_MIN (-LLONG_MAX - 1LL)
|
||||||
|
#endif
|
||||||
|
#ifndef ULLONG_MAX
|
||||||
|
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
|
||||||
|
{
|
||||||
|
int ret = pthread_mutex_init( mutex, 0 );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
|
||||||
|
|
||||||
|
ret = pthread_cond_init( cond, 0 );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
|
||||||
|
{
|
||||||
|
int ret = pthread_cond_destroy( cond );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
|
||||||
|
|
||||||
|
ret = pthread_mutex_destroy( mutex );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xlock( pthread_mutex_t * mutex )
|
||||||
|
{
|
||||||
|
int ret = pthread_mutex_lock( mutex );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xunlock( pthread_mutex_t * mutex )
|
||||||
|
{
|
||||||
|
int ret = pthread_mutex_unlock( mutex );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
|
||||||
|
{
|
||||||
|
int ret = pthread_cond_wait( cond, mutex );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xsignal( pthread_cond_t * cond )
|
||||||
|
{
|
||||||
|
int ret = pthread_cond_signal( cond );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xbroadcast( pthread_cond_t * cond )
|
||||||
|
{
|
||||||
|
int ret = pthread_cond_broadcast( cond );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
|
||||||
|
{
|
||||||
|
int ret = pthread_create( thread, 0, routine, arg );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void xjoin( pthread_t thread )
|
||||||
|
{
|
||||||
|
int ret = pthread_join( thread, 0 );
|
||||||
|
if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
long long in_size = 0;
|
||||||
|
long long out_size = 0;
|
||||||
|
|
||||||
|
|
||||||
|
struct Packet // data block with a serial number
|
||||||
|
{
|
||||||
|
unsigned long long id; // serial number assigned as received
|
||||||
|
int size; // # of bytes in data
|
||||||
|
uint8_t * data;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
class Packet_courier // moves packets around
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
unsigned long icheck_counter;
|
||||||
|
unsigned long iwait_counter;
|
||||||
|
unsigned long ocheck_counter;
|
||||||
|
unsigned long owait_counter;
|
||||||
|
private:
|
||||||
|
unsigned long long receive_id; // id assigned to next packet received
|
||||||
|
unsigned long long deliver_id; // id of next packet to be delivered
|
||||||
|
Slot_tally slot_tally;
|
||||||
|
std::queue< Packet * > packet_queue;
|
||||||
|
std::vector< Packet * > circular_buffer;
|
||||||
|
int num_working; // Number of workers still running
|
||||||
|
const int num_slots; // max packets in circulation
|
||||||
|
pthread_mutex_t imutex;
|
||||||
|
pthread_cond_t iav_or_eof; // input packet available or splitter done
|
||||||
|
pthread_mutex_t omutex;
|
||||||
|
pthread_cond_t oav_or_exit; // output packet available or all workers exited
|
||||||
|
bool eof; // splitter done
|
||||||
|
|
||||||
|
public:
|
||||||
|
Packet_courier( const int num_workers, const int slots )
|
||||||
|
: icheck_counter( 0 ), iwait_counter( 0 ),
|
||||||
|
ocheck_counter( 0 ), owait_counter( 0 ),
|
||||||
|
receive_id( 0 ), deliver_id( 0 ),
|
||||||
|
slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ),
|
||||||
|
num_working( num_workers ), num_slots( slots ), eof( false )
|
||||||
|
{ xinit( &iav_or_eof, &imutex ); xinit( &oav_or_exit, &omutex ); }
|
||||||
|
|
||||||
|
~Packet_courier()
|
||||||
|
{ xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); }
|
||||||
|
|
||||||
|
// make a packet with data received from splitter
|
||||||
|
void receive_packet( uint8_t * const data, const int size )
|
||||||
|
{
|
||||||
|
Packet * ipacket = new Packet;
|
||||||
|
ipacket->id = receive_id++;
|
||||||
|
ipacket->size = size;
|
||||||
|
ipacket->data = data;
|
||||||
|
slot_tally.get_slot(); // wait for a free slot
|
||||||
|
xlock( &imutex );
|
||||||
|
packet_queue.push( ipacket );
|
||||||
|
xsignal( &iav_or_eof );
|
||||||
|
xunlock( &imutex );
|
||||||
|
}
|
||||||
|
|
||||||
|
// distribute a packet to a worker
|
||||||
|
Packet * distribute_packet()
|
||||||
|
{
|
||||||
|
Packet * ipacket = 0;
|
||||||
|
xlock( &imutex );
|
||||||
|
++icheck_counter;
|
||||||
|
while( packet_queue.empty() && !eof )
|
||||||
|
{
|
||||||
|
++iwait_counter;
|
||||||
|
xwait( &iav_or_eof, &imutex );
|
||||||
|
}
|
||||||
|
if( !packet_queue.empty() )
|
||||||
|
{
|
||||||
|
ipacket = packet_queue.front();
|
||||||
|
packet_queue.pop();
|
||||||
|
}
|
||||||
|
xunlock( &imutex );
|
||||||
|
if( ipacket == 0 )
|
||||||
|
{
|
||||||
|
// Notify muxer when last worker exits
|
||||||
|
xlock( &omutex );
|
||||||
|
if( --num_working == 0 )
|
||||||
|
xsignal( &oav_or_exit );
|
||||||
|
xunlock( &omutex );
|
||||||
|
}
|
||||||
|
return ipacket;
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect a packet from a worker
|
||||||
|
void collect_packet( Packet * const opacket )
|
||||||
|
{
|
||||||
|
xlock( &omutex );
|
||||||
|
// id collision shouldn't happen
|
||||||
|
assert( circular_buffer[opacket->id%num_slots] == 0 );
|
||||||
|
// Merge packet into circular buffer
|
||||||
|
circular_buffer[opacket->id%num_slots] = opacket;
|
||||||
|
if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
|
||||||
|
xunlock( &omutex );
|
||||||
|
}
|
||||||
|
|
||||||
|
// deliver a packet to muxer
|
||||||
|
Packet * deliver_packet()
|
||||||
|
{
|
||||||
|
xlock( &omutex );
|
||||||
|
++ocheck_counter;
|
||||||
|
while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 )
|
||||||
|
{
|
||||||
|
++owait_counter;
|
||||||
|
xwait( &oav_or_exit, &omutex );
|
||||||
|
}
|
||||||
|
Packet * opacket = circular_buffer[deliver_id%num_slots];
|
||||||
|
circular_buffer[deliver_id%num_slots] = 0;
|
||||||
|
++deliver_id;
|
||||||
|
xunlock( &omutex );
|
||||||
|
if( opacket != 0 )
|
||||||
|
slot_tally.leave_slot(); // return a slot to the tally
|
||||||
|
return opacket;
|
||||||
|
}
|
||||||
|
|
||||||
|
void finish() // splitter has no more packets to send
|
||||||
|
{
|
||||||
|
xlock( &imutex );
|
||||||
|
eof = true;
|
||||||
|
xbroadcast( &iav_or_eof );
|
||||||
|
xunlock( &imutex );
|
||||||
|
}
|
||||||
|
|
||||||
|
bool finished() // all packets delivered to muxer
|
||||||
|
{
|
||||||
|
if( !slot_tally.all_free() || !eof || !packet_queue.empty() ||
|
||||||
|
num_working != 0 ) return false;
|
||||||
|
for( int i = 0; i < num_slots; ++i )
|
||||||
|
if( circular_buffer[i] != 0 ) return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const Slot_tally & tally() const { return slot_tally; }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct Splitter_arg
|
||||||
|
{
|
||||||
|
Packet_courier * courier;
|
||||||
|
int infd;
|
||||||
|
int data_size;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// split data from input file into chunks and pass them to
|
||||||
|
// courier for packaging and distribution to workers.
|
||||||
|
void * splitter( void * arg )
|
||||||
|
{
|
||||||
|
const Splitter_arg & tmp = *(Splitter_arg *)arg;
|
||||||
|
Packet_courier & courier = *tmp.courier;
|
||||||
|
const int infd = tmp.infd;
|
||||||
|
const int data_size = tmp.data_size;
|
||||||
|
|
||||||
|
for( bool first_post = true; ; first_post = false )
|
||||||
|
{
|
||||||
|
uint8_t * data = new( std::nothrow ) uint8_t[data_size];
|
||||||
|
if( data == 0 ) { show_error( "not enough memory" ); fatal(); }
|
||||||
|
const int size = readblock( infd, data, data_size );
|
||||||
|
if( size != data_size && errno ) { show_error( "read", errno ); fatal(); }
|
||||||
|
|
||||||
|
if( size > 0 || first_post ) // first packet can be empty
|
||||||
|
{
|
||||||
|
in_size += size;
|
||||||
|
courier.receive_packet( data, size );
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
delete[] data;
|
||||||
|
courier.finish(); // no more packets to send
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct Worker_arg
|
||||||
|
{
|
||||||
|
int dictionary_size;
|
||||||
|
int match_len_limit;
|
||||||
|
Packet_courier * courier;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// get packets from courier, replace their contents, and return
|
||||||
|
// them to courier.
|
||||||
|
void * worker( void * arg )
|
||||||
|
{
|
||||||
|
const Worker_arg & tmp = *(Worker_arg *)arg;
|
||||||
|
const int dictionary_size = tmp.dictionary_size;
|
||||||
|
const int match_len_limit = tmp.match_len_limit;
|
||||||
|
Packet_courier & courier = *tmp.courier;
|
||||||
|
|
||||||
|
while( true )
|
||||||
|
{
|
||||||
|
Packet * packet = courier.distribute_packet();
|
||||||
|
if( packet == 0 ) break; // no more packets to process
|
||||||
|
|
||||||
|
const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
|
||||||
|
uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size];
|
||||||
|
if( new_data == 0 ) { show_error( "not enough memory" ); fatal(); }
|
||||||
|
const int dict_size = std::max( LZ_min_dictionary_size(),
|
||||||
|
std::min( dictionary_size, packet->size ) );
|
||||||
|
LZ_Encoder * const encoder =
|
||||||
|
LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
|
||||||
|
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
|
||||||
|
{ show_error( "LZ_compress_open failed." ); fatal(); }
|
||||||
|
|
||||||
|
int written = 0;
|
||||||
|
int new_size = 0;
|
||||||
|
while( true )
|
||||||
|
{
|
||||||
|
if( LZ_compress_write_size( encoder ) > 0 )
|
||||||
|
{
|
||||||
|
if( written < packet->size )
|
||||||
|
{
|
||||||
|
const int wr = LZ_compress_write( encoder, packet->data + written,
|
||||||
|
packet->size - written );
|
||||||
|
if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
|
||||||
|
written += wr;
|
||||||
|
}
|
||||||
|
if( written >= packet->size ) LZ_compress_finish( encoder );
|
||||||
|
}
|
||||||
|
const int rd = LZ_compress_read( encoder, new_data + new_size,
|
||||||
|
compr_size - new_size );
|
||||||
|
if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
|
||||||
|
new_size += rd;
|
||||||
|
assert( new_size <= compr_size );
|
||||||
|
if( LZ_compress_finished( encoder ) == 1 ) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if( LZ_compress_close( encoder ) < 0 )
|
||||||
|
{ show_error( "LZ_compress_close failed." ); fatal(); }
|
||||||
|
|
||||||
|
delete[] packet->data;
|
||||||
|
packet->size = new_size;
|
||||||
|
packet->data = new_data;
|
||||||
|
courier.collect_packet( packet );
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// get from courier the processed and sorted packets, and write
|
||||||
|
// their contents to the output file.
|
||||||
|
void muxer( Packet_courier & courier, const int outfd )
|
||||||
|
{
|
||||||
|
while( true )
|
||||||
|
{
|
||||||
|
Packet * opacket = courier.deliver_packet();
|
||||||
|
if( opacket == 0 ) break; // queue is empty. all workers exited
|
||||||
|
|
||||||
|
out_size += opacket->size;
|
||||||
|
|
||||||
|
if( outfd >= 0 )
|
||||||
|
{
|
||||||
|
const int wr = writeblock( outfd, opacket->data, opacket->size );
|
||||||
|
if( wr != opacket->size )
|
||||||
|
{ show_error( "write", errno ); fatal(); }
|
||||||
|
}
|
||||||
|
delete[] opacket->data;
|
||||||
|
delete opacket;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end namespace
|
||||||
|
|
||||||
|
|
||||||
|
// init the courier, then start the splitter and the workers and
|
||||||
|
// call the muxer.
|
||||||
|
int compress( const int data_size, const int dictionary_size,
|
||||||
|
const int match_len_limit, const int num_workers,
|
||||||
|
const int num_slots, const int infd, const int outfd,
|
||||||
|
const int debug_level )
|
||||||
|
{
|
||||||
|
in_size = 0;
|
||||||
|
out_size = 0;
|
||||||
|
Packet_courier courier( num_workers, num_slots );
|
||||||
|
|
||||||
|
Splitter_arg splitter_arg;
|
||||||
|
splitter_arg.courier = &courier;
|
||||||
|
splitter_arg.infd = infd;
|
||||||
|
splitter_arg.data_size = data_size;
|
||||||
|
|
||||||
|
pthread_t splitter_thread;
|
||||||
|
xcreate( &splitter_thread, splitter, &splitter_arg );
|
||||||
|
|
||||||
|
Worker_arg worker_arg;
|
||||||
|
worker_arg.dictionary_size = dictionary_size;
|
||||||
|
worker_arg.match_len_limit = match_len_limit;
|
||||||
|
worker_arg.courier = &courier;
|
||||||
|
|
||||||
|
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
|
||||||
|
if( worker_threads == 0 )
|
||||||
|
{ show_error( "not enough memory" ); fatal(); }
|
||||||
|
for( int i = 0; i < num_workers; ++i )
|
||||||
|
xcreate( &worker_threads[i], worker, &worker_arg );
|
||||||
|
|
||||||
|
muxer( courier, outfd );
|
||||||
|
|
||||||
|
for( int i = num_workers - 1; i >= 0; --i )
|
||||||
|
xjoin(worker_threads[i]);
|
||||||
|
delete[] worker_threads; worker_threads = 0;
|
||||||
|
|
||||||
|
xjoin( splitter_thread );
|
||||||
|
|
||||||
|
if( verbosity >= 1 )
|
||||||
|
{
|
||||||
|
if( in_size <= 0 || out_size <= 0 )
|
||||||
|
std::fprintf( stderr, "no data compressed.\n" );
|
||||||
|
else
|
||||||
|
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
|
||||||
|
"%5.2f%% saved, %lld in, %lld out.\n",
|
||||||
|
(double)in_size / out_size,
|
||||||
|
( 8.0 * out_size ) / in_size,
|
||||||
|
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
|
||||||
|
in_size, out_size );
|
||||||
|
}
|
||||||
|
|
||||||
|
if( debug_level & 1 )
|
||||||
|
std::fprintf( stderr,
|
||||||
|
"splitter tried to send a packet %8lu times\n"
|
||||||
|
"splitter had to wait %8lu times\n"
|
||||||
|
"any worker tried to consume from splitter %8lu times\n"
|
||||||
|
"any worker had to wait %8lu times\n"
|
||||||
|
"muxer tried to consume from workers %8lu times\n"
|
||||||
|
"muxer had to wait %8lu times\n",
|
||||||
|
courier.tally().check_counter,
|
||||||
|
courier.tally().wait_counter,
|
||||||
|
courier.icheck_counter,
|
||||||
|
courier.iwait_counter,
|
||||||
|
courier.ocheck_counter,
|
||||||
|
courier.owait_counter );
|
||||||
|
|
||||||
|
assert( courier.finished() );
|
||||||
|
return 0;
|
||||||
|
}
|
10
configure
vendored
10
configure
vendored
|
@ -1,16 +1,16 @@
|
||||||
#! /bin/sh
|
#! /bin/sh
|
||||||
# configure script for Plzip - A parallel version of the lzip data compressor
|
# configure script for Plzip - A parallel compressor compatible with lzip
|
||||||
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
||||||
#
|
#
|
||||||
# This configure script is free software: you have unlimited permission
|
# This configure script is free software: you have unlimited permission
|
||||||
# to copy, distribute and modify it.
|
# to copy, distribute and modify it.
|
||||||
#
|
#
|
||||||
# Date of this version: 2010-01-24
|
# Date of this version: 2010-01-31
|
||||||
|
|
||||||
args=
|
args=
|
||||||
no_create=
|
no_create=
|
||||||
pkgname=plzip
|
pkgname=plzip
|
||||||
pkgversion=0.3
|
pkgversion=0.4
|
||||||
progname=plzip
|
progname=plzip
|
||||||
srctrigger=plzip.h
|
srctrigger=plzip.h
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ if [ -z "${CXX}" ] ; then # Let the user override the test.
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo
|
echo
|
||||||
if [ -z ${no_create} ] ; then
|
if [ -z "${no_create}" ] ; then
|
||||||
echo "creating config.status"
|
echo "creating config.status"
|
||||||
rm -f config.status
|
rm -f config.status
|
||||||
cat > config.status << EOF
|
cat > config.status << EOF
|
||||||
|
@ -166,7 +166,7 @@ echo "CXXFLAGS = ${CXXFLAGS}"
|
||||||
echo "LDFLAGS = ${LDFLAGS}"
|
echo "LDFLAGS = ${LDFLAGS}"
|
||||||
rm -f Makefile
|
rm -f Makefile
|
||||||
cat > Makefile << EOF
|
cat > Makefile << EOF
|
||||||
# Makefile for Plzip - A parallel version of the lzip data compressor
|
# Makefile for Plzip - A parallel compressor compatible with lzip
|
||||||
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
||||||
# This file was generated automatically by configure. Do not edit.
|
# This file was generated automatically by configure. Do not edit.
|
||||||
#
|
#
|
||||||
|
|
123
decompress.cc
Normal file
123
decompress.cc
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
/* Plzip - A parallel compressor compatible with lzip
|
||||||
|
Copyright (C) 2009 Laszlo Ersek.
|
||||||
|
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
||||||
|
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _FILE_OFFSET_BITS 64
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <cerrno>
|
||||||
|
#include <climits>
|
||||||
|
#include <csignal>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
#include <queue>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <lzlib.h>
|
||||||
|
|
||||||
|
#include "plzip.h"
|
||||||
|
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
int do_decompress( LZ_Decoder * const decoder, const int infd, const int outfd,
|
||||||
|
const Pretty_print & pp, const bool testing )
|
||||||
|
{
|
||||||
|
const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
|
||||||
|
uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
|
||||||
|
|
||||||
|
while( true )
|
||||||
|
{
|
||||||
|
int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
|
||||||
|
if( in_size > 0 )
|
||||||
|
{
|
||||||
|
const int max_in_size = in_size;
|
||||||
|
in_size = readblock( infd, in_buffer, max_in_size );
|
||||||
|
if( in_size != max_in_size && errno )
|
||||||
|
{ pp(); show_error( "read error", errno ); return 1; }
|
||||||
|
if( in_size == 0 ) LZ_decompress_finish( decoder );
|
||||||
|
else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) )
|
||||||
|
internal_error( "library error (LZ_decompress_write)" );
|
||||||
|
}
|
||||||
|
int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size );
|
||||||
|
// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size );
|
||||||
|
if( out_size < 0 )
|
||||||
|
{
|
||||||
|
const LZ_Errno lz_errno = LZ_decompress_errno( decoder );
|
||||||
|
if( lz_errno == LZ_header_error )
|
||||||
|
{
|
||||||
|
if( LZ_decompress_total_out_size( decoder ) > 0 )
|
||||||
|
break; // trailing garbage
|
||||||
|
pp( "error reading member header" );
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if( lz_errno == LZ_mem_error )
|
||||||
|
{
|
||||||
|
pp( "not enough memory. Find a machine with more memory" );
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
pp();
|
||||||
|
if( lz_errno == LZ_unexpected_eof )
|
||||||
|
{
|
||||||
|
if( verbosity >= 0 )
|
||||||
|
std::fprintf( stderr, "file ends unexpectedly at pos %lld\n",
|
||||||
|
LZ_decompress_total_in_size( decoder ) );
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
if( verbosity >= 0 )
|
||||||
|
std::fprintf( stderr, "LZ_decompress_read error: %s.\n",
|
||||||
|
LZ_strerror( LZ_decompress_errno( decoder ) ) );
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
else if( out_size > 0 && outfd >= 0 )
|
||||||
|
{
|
||||||
|
const int wr = writeblock( outfd, out_buffer, out_size );
|
||||||
|
if( wr != out_size )
|
||||||
|
{ pp(); show_error( "write error", errno ); return 1; }
|
||||||
|
}
|
||||||
|
if( LZ_decompress_finished( decoder ) == 1 ) break;
|
||||||
|
if( in_size == 0 && out_size == 0 )
|
||||||
|
internal_error( "library error (LZ_decompress_read)" );
|
||||||
|
}
|
||||||
|
if( verbosity >= 1 )
|
||||||
|
{ if( testing ) std::fprintf( stderr, "ok\n" );
|
||||||
|
else std::fprintf( stderr, "done\n" ); }
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end namespace
|
||||||
|
|
||||||
|
|
||||||
|
int decompress( const int infd, const int outfd, const Pretty_print & pp,
|
||||||
|
const bool testing )
|
||||||
|
{
|
||||||
|
LZ_Decoder * const decoder = LZ_decompress_open();
|
||||||
|
int retval;
|
||||||
|
|
||||||
|
if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
|
||||||
|
{
|
||||||
|
pp( "not enough memory" );
|
||||||
|
retval = 1;
|
||||||
|
}
|
||||||
|
else retval = do_decompress( decoder, infd, outfd, pp, testing );
|
||||||
|
LZ_decompress_close( decoder );
|
||||||
|
return retval;
|
||||||
|
}
|
|
@ -1,12 +1,12 @@
|
||||||
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36.
|
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36.
|
||||||
.TH PLZIP "1" "January 2010" "Plzip 0.3" "User Commands"
|
.TH PLZIP "1" "January 2010" "Plzip 0.4" "User Commands"
|
||||||
.SH NAME
|
.SH NAME
|
||||||
Plzip \- data compressor based on the LZMA algorithm
|
Plzip \- data compressor based on the LZMA algorithm
|
||||||
.SH SYNOPSIS
|
.SH SYNOPSIS
|
||||||
.B plzip
|
.B plzip
|
||||||
[\fIoptions\fR] [\fIfiles\fR]
|
[\fIoptions\fR] [\fIfiles\fR]
|
||||||
.SH DESCRIPTION
|
.SH DESCRIPTION
|
||||||
Plzip \- A parallel version of the lzip data compressor.
|
Plzip \- A parallel compressor compatible with lzip.
|
||||||
.SH OPTIONS
|
.SH OPTIONS
|
||||||
.TP
|
.TP
|
||||||
\fB\-h\fR, \fB\-\-help\fR
|
\fB\-h\fR, \fB\-\-help\fR
|
||||||
|
@ -71,6 +71,7 @@ Plzip home page: http://www.nongnu.org/lzip/plzip.html
|
||||||
Copyright \(co 2009 Laszlo Ersek.
|
Copyright \(co 2009 Laszlo Ersek.
|
||||||
.br
|
.br
|
||||||
Copyright \(co 2010 Antonio Diaz Diaz.
|
Copyright \(co 2010 Antonio Diaz Diaz.
|
||||||
|
Using Lzlib 0.9-rc1
|
||||||
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
|
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
|
||||||
.br
|
.br
|
||||||
This is free software: you are free to change and redistribute it.
|
This is free software: you are free to change and redistribute it.
|
||||||
|
|
|
@ -3,7 +3,7 @@ plzip.texinfo.
|
||||||
|
|
||||||
INFO-DIR-SECTION Data Compression
|
INFO-DIR-SECTION Data Compression
|
||||||
START-INFO-DIR-ENTRY
|
START-INFO-DIR-ENTRY
|
||||||
* Plzip: (plzip). Parallel version of the lzip data compressor
|
* Plzip: (plzip). Parallel compressor compatible with lzip
|
||||||
END-INFO-DIR-ENTRY
|
END-INFO-DIR-ENTRY
|
||||||
|
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir)
|
||||||
Plzip Manual
|
Plzip Manual
|
||||||
************
|
************
|
||||||
|
|
||||||
This manual is for Plzip (version 0.3, 24 January 2010).
|
This manual is for Plzip (version 0.4, 31 January 2010).
|
||||||
|
|
||||||
* Menu:
|
* Menu:
|
||||||
|
|
||||||
|
@ -34,11 +34,13 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T
|
||||||
1 Introduction
|
1 Introduction
|
||||||
**************
|
**************
|
||||||
|
|
||||||
Plzip is a parallel version of the lzip data compressor. The files
|
Plzip is a massively parallel (multithreaded) data compressor compatible
|
||||||
produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
|
with the lzip file format. The files produced by plzip are fully
|
||||||
intended for faster compression/decompression of big files on
|
compatible with lzip-1.4 or newer. Plzip is intended for faster
|
||||||
multiprocessor machines. Currently only compression is performed in
|
compression/decompression of big files on multiprocessor machines. On
|
||||||
parallel. Parallel decompression is planned to be implemented later.
|
files big enough, plzip can use hundreds of processors. Currently only
|
||||||
|
compression is performed in parallel. Parallel decompression is planned
|
||||||
|
to be implemented later.
|
||||||
|
|
||||||
Lzip is a lossless data compressor based on the LZMA algorithm, with
|
Lzip is a lossless data compressor based on the LZMA algorithm, with
|
||||||
very safe integrity checking and a user interface similar to the one of
|
very safe integrity checking and a user interface similar to the one of
|
||||||
|
@ -303,11 +305,11 @@ Concept Index
|
||||||
|
|
||||||
|
|
||||||
Tag Table:
|
Tag Table:
|
||||||
Node: Top227
|
Node: Top223
|
||||||
Node: Introduction750
|
Node: Introduction746
|
||||||
Node: Invoking Plzip3571
|
Node: Invoking Plzip3669
|
||||||
Node: File Format7260
|
Node: File Format7358
|
||||||
Node: Problems9216
|
Node: Problems9314
|
||||||
Node: Concept Index9745
|
Node: Concept Index9843
|
||||||
|
|
||||||
End Tag Table
|
End Tag Table
|
||||||
|
|
|
@ -5,12 +5,12 @@
|
||||||
@finalout
|
@finalout
|
||||||
@c %**end of header
|
@c %**end of header
|
||||||
|
|
||||||
@set UPDATED 24 January 2010
|
@set UPDATED 31 January 2010
|
||||||
@set VERSION 0.3
|
@set VERSION 0.4
|
||||||
|
|
||||||
@dircategory Data Compression
|
@dircategory Data Compression
|
||||||
@direntry
|
@direntry
|
||||||
* Plzip: (plzip). Parallel version of the lzip data compressor
|
* Plzip: (plzip). Parallel compressor compatible with lzip
|
||||||
@end direntry
|
@end direntry
|
||||||
|
|
||||||
|
|
||||||
|
@ -50,11 +50,13 @@ to copy, distribute and modify it.
|
||||||
@chapter Introduction
|
@chapter Introduction
|
||||||
@cindex introduction
|
@cindex introduction
|
||||||
|
|
||||||
Plzip is a parallel version of the lzip data compressor. The files
|
Plzip is a massively parallel (multithreaded) data compressor compatible
|
||||||
produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
|
with the lzip file format. The files produced by plzip are fully
|
||||||
intended for faster compression/decompression of big files on
|
compatible with lzip-1.4 or newer. Plzip is intended for faster
|
||||||
multiprocessor machines. Currently only compression is performed in
|
compression/decompression of big files on multiprocessor machines. On
|
||||||
parallel. Parallel decompression is planned to be implemented later.
|
files big enough, plzip can use hundreds of processors. Currently only
|
||||||
|
compression is performed in parallel. Parallel decompression is planned
|
||||||
|
to be implemented later.
|
||||||
|
|
||||||
Lzip is a lossless data compressor based on the LZMA algorithm, with
|
Lzip is a lossless data compressor based on the LZMA algorithm, with
|
||||||
very safe integrity checking and a user interface similar to the one of
|
very safe integrity checking and a user interface similar to the one of
|
||||||
|
|
128
main.cc
128
main.cc
|
@ -1,4 +1,4 @@
|
||||||
/* Plzip - A parallel version of the lzip data compressor
|
/* Plzip - A parallel compressor compatible with lzip
|
||||||
Copyright (C) 2009 Laszlo Ersek.
|
Copyright (C) 2009 Laszlo Ersek.
|
||||||
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
||||||
|
|
||||||
|
@ -58,8 +58,6 @@
|
||||||
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
|
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void internal_error( const char * msg );
|
|
||||||
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
@ -93,44 +91,10 @@ bool delete_output_on_interrupt = false;
|
||||||
pthread_t main_thread;
|
pthread_t main_thread;
|
||||||
pid_t main_thread_pid;
|
pid_t main_thread_pid;
|
||||||
|
|
||||||
class Pretty_print
|
|
||||||
{
|
|
||||||
const char * const stdin_name;
|
|
||||||
const unsigned int stdin_name_len;
|
|
||||||
unsigned int longest_name;
|
|
||||||
std::string name_;
|
|
||||||
mutable bool first_post;
|
|
||||||
|
|
||||||
public:
|
|
||||||
Pretty_print( const std::vector< std::string > & filenames )
|
|
||||||
: stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ),
|
|
||||||
longest_name( 0 ), first_post( false )
|
|
||||||
{
|
|
||||||
for( unsigned int i = 0; i < filenames.size(); ++i )
|
|
||||||
{
|
|
||||||
const std::string & s = filenames[i];
|
|
||||||
const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() );
|
|
||||||
if( len > longest_name ) longest_name = len;
|
|
||||||
}
|
|
||||||
if( longest_name == 0 ) longest_name = stdin_name_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
void set_name( const std::string & filename )
|
|
||||||
{
|
|
||||||
if( filename.size() && filename != "-" ) name_ = filename;
|
|
||||||
else name_ = stdin_name;
|
|
||||||
first_post = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void reset() const throw() { if( name_.size() ) first_post = true; }
|
|
||||||
const char * name() const throw() { return name_.c_str(); }
|
|
||||||
void operator()( const char * const msg = 0 ) const throw();
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
void show_help() throw()
|
void show_help() throw()
|
||||||
{
|
{
|
||||||
std::printf( "%s - A parallel version of the lzip data compressor.\n", Program_name );
|
std::printf( "%s - A parallel compressor compatible with lzip.\n", Program_name );
|
||||||
std::printf( "\nUsage: %s [options] [files]\n", invocation_name );
|
std::printf( "\nUsage: %s [options] [files]\n", invocation_name );
|
||||||
std::printf( "\nOptions:\n" );
|
std::printf( "\nOptions:\n" );
|
||||||
std::printf( " -h, --help display this help and exit\n" );
|
std::printf( " -h, --help display this help and exit\n" );
|
||||||
|
@ -154,7 +118,7 @@ void show_help() throw()
|
||||||
std::printf( " --best alias for -9\n" );
|
std::printf( " --best alias for -9\n" );
|
||||||
if( verbosity > 0 )
|
if( verbosity > 0 )
|
||||||
{
|
{
|
||||||
std::printf( " -D, --debug=<level> (0-3) print debug statistics to stderr\n" );
|
std::printf( " -D, --debug=<level> (0-1) print debug statistics to stderr\n" );
|
||||||
}
|
}
|
||||||
std::printf( "If no file names are given, %s compresses or decompresses\n", program_name );
|
std::printf( "If no file names are given, %s compresses or decompresses\n", program_name );
|
||||||
std::printf( "from standard input to standard output.\n" );
|
std::printf( "from standard input to standard output.\n" );
|
||||||
|
@ -170,6 +134,7 @@ void show_version() throw()
|
||||||
std::printf( "%s %s\n", Program_name, PROGVERSION );
|
std::printf( "%s %s\n", Program_name, PROGVERSION );
|
||||||
std::printf( "Copyright (C) 2009 Laszlo Ersek.\n" );
|
std::printf( "Copyright (C) 2009 Laszlo Ersek.\n" );
|
||||||
std::printf( "Copyright (C) %s Antonio Diaz Diaz.\n", program_year );
|
std::printf( "Copyright (C) %s Antonio Diaz Diaz.\n", program_year );
|
||||||
|
std::printf( "Using Lzlib %s\n", LZ_version() );
|
||||||
std::printf( "License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>\n" );
|
std::printf( "License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>\n" );
|
||||||
std::printf( "This is free software: you are free to change and redistribute it.\n" );
|
std::printf( "This is free software: you are free to change and redistribute it.\n" );
|
||||||
std::printf( "There is NO WARRANTY, to the extent permitted by law.\n" );
|
std::printf( "There is NO WARRANTY, to the extent permitted by law.\n" );
|
||||||
|
@ -413,89 +378,6 @@ void close_and_set_permissions( const struct stat * const in_statsp )
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int do_decompress( LZ_Decoder * const decoder, const int inhandle,
|
|
||||||
const Pretty_print & pp, const bool testing )
|
|
||||||
{
|
|
||||||
const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
|
|
||||||
uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
|
|
||||||
|
|
||||||
while( true )
|
|
||||||
{
|
|
||||||
int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
|
|
||||||
if( in_size > 0 )
|
|
||||||
{
|
|
||||||
const int max_in_size = in_size;
|
|
||||||
in_size = readblock( inhandle, in_buffer, max_in_size );
|
|
||||||
if( in_size != max_in_size && errno )
|
|
||||||
{ pp(); show_error( "read error", errno ); return 1; }
|
|
||||||
if( in_size == 0 ) LZ_decompress_finish( decoder );
|
|
||||||
else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) )
|
|
||||||
internal_error( "library error (LZ_decompress_write)" );
|
|
||||||
}
|
|
||||||
int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size );
|
|
||||||
// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size );
|
|
||||||
if( out_size < 0 )
|
|
||||||
{
|
|
||||||
const LZ_Errno lz_errno = LZ_decompress_errno( decoder );
|
|
||||||
if( lz_errno == LZ_header_error )
|
|
||||||
{
|
|
||||||
if( LZ_decompress_total_out_size( decoder ) > 0 )
|
|
||||||
break; // trailing garbage
|
|
||||||
pp( "error reading member header" );
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if( lz_errno == LZ_mem_error )
|
|
||||||
{
|
|
||||||
pp( "not enough memory. Find a machine with more memory" );
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
pp();
|
|
||||||
if( lz_errno == LZ_unexpected_eof )
|
|
||||||
{
|
|
||||||
if( verbosity >= 0 )
|
|
||||||
std::fprintf( stderr, "file ends unexpectedly at pos %lld\n",
|
|
||||||
LZ_decompress_total_in_size( decoder ) );
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
if( verbosity >= 0 )
|
|
||||||
std::fprintf( stderr, "LZ_decompress_read error: %s.\n",
|
|
||||||
LZ_strerror( LZ_decompress_errno( decoder ) ) );
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
else if( out_size > 0 && outhandle >= 0 )
|
|
||||||
{
|
|
||||||
const int wr = writeblock( outhandle, out_buffer, out_size );
|
|
||||||
if( wr != out_size )
|
|
||||||
{ pp(); show_error( "write error", errno ); return 1; }
|
|
||||||
}
|
|
||||||
if( LZ_decompress_finished( decoder ) == 1 ) break;
|
|
||||||
if( in_size == 0 && out_size == 0 )
|
|
||||||
internal_error( "library error (LZ_decompress_read)" );
|
|
||||||
}
|
|
||||||
if( verbosity >= 1 )
|
|
||||||
{ if( testing ) std::fprintf( stderr, "ok\n" );
|
|
||||||
else std::fprintf( stderr, "done\n" ); }
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int decompress( const int inhandle, const Pretty_print & pp,
|
|
||||||
const bool testing )
|
|
||||||
{
|
|
||||||
LZ_Decoder * const decoder = LZ_decompress_open();
|
|
||||||
int retval;
|
|
||||||
|
|
||||||
if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
|
|
||||||
{
|
|
||||||
pp( "not enough memory" );
|
|
||||||
retval = 1;
|
|
||||||
}
|
|
||||||
else retval = do_decompress( decoder, inhandle, pp, testing );
|
|
||||||
LZ_decompress_close( decoder );
|
|
||||||
return retval;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
extern "C" void signal_handler( int sig ) throw()
|
extern "C" void signal_handler( int sig ) throw()
|
||||||
{
|
{
|
||||||
if( !pthread_equal( pthread_self(), main_thread ) )
|
if( !pthread_equal( pthread_self(), main_thread ) )
|
||||||
|
@ -810,7 +692,7 @@ int main( const int argc, const char * argv[] )
|
||||||
encoder_options.match_len_limit, num_workers,
|
encoder_options.match_len_limit, num_workers,
|
||||||
num_slots, inhandle, outhandle, debug_level );
|
num_slots, inhandle, outhandle, debug_level );
|
||||||
else
|
else
|
||||||
tmp = decompress( inhandle, pp, program_mode == m_test );
|
tmp = decompress( inhandle, outhandle, pp, program_mode == m_test );
|
||||||
if( tmp > retval ) retval = tmp;
|
if( tmp > retval ) retval = tmp;
|
||||||
if( tmp && program_mode != m_test ) cleanup_and_fail( retval );
|
if( tmp && program_mode != m_test ) cleanup_and_fail( retval );
|
||||||
|
|
||||||
|
|
548
plzip.cc
548
plzip.cc
|
@ -1,548 +0,0 @@
|
||||||
/* Plzip - A parallel version of the lzip data compressor
|
|
||||||
Copyright (C) 2009 Laszlo Ersek.
|
|
||||||
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
|
||||||
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU General Public License as published by
|
|
||||||
the Free Software Foundation, either version 3 of the License, or
|
|
||||||
(at your option) any later version.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define _FILE_OFFSET_BITS 64
|
|
||||||
|
|
||||||
#include <algorithm>
|
|
||||||
#include <cassert>
|
|
||||||
#include <cerrno>
|
|
||||||
#include <climits>
|
|
||||||
#include <csignal>
|
|
||||||
#include <cstdio>
|
|
||||||
#include <cstdlib>
|
|
||||||
#include <vector>
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include <lzlib.h>
|
|
||||||
|
|
||||||
#include "plzip.h"
|
|
||||||
|
|
||||||
#ifndef LLONG_MAX
|
|
||||||
#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
|
|
||||||
#endif
|
|
||||||
#ifndef LLONG_MIN
|
|
||||||
#define LLONG_MIN (-LLONG_MAX - 1LL)
|
|
||||||
#endif
|
|
||||||
#ifndef ULLONG_MAX
|
|
||||||
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
namespace {
|
|
||||||
|
|
||||||
long long in_size = 0;
|
|
||||||
long long out_size = 0;
|
|
||||||
|
|
||||||
void *(*mallocf)( size_t size );
|
|
||||||
void (*freef)( void *ptr );
|
|
||||||
|
|
||||||
|
|
||||||
void * trace_malloc( size_t size )
|
|
||||||
{
|
|
||||||
int save_errno = 0;
|
|
||||||
|
|
||||||
void * ret = malloc( size );
|
|
||||||
if( ret == 0 ) save_errno = errno;
|
|
||||||
std::fprintf( stderr, "malloc(%lu) == %p\n", (unsigned long)size, ret );
|
|
||||||
if( ret == 0 ) errno = save_errno;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void trace_free( void *ptr )
|
|
||||||
{
|
|
||||||
std::fprintf( stderr, "free(%p)\n", ptr );
|
|
||||||
free( ptr );
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void * xalloc( size_t size )
|
|
||||||
{
|
|
||||||
void *ret = (*mallocf)( size );
|
|
||||||
if( ret == 0 ) { show_error( "not enough memory", errno ); fatal(); }
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
|
|
||||||
{
|
|
||||||
int ret = pthread_mutex_init( mutex, 0 );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
|
|
||||||
|
|
||||||
ret = pthread_cond_init( cond, 0 );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
|
|
||||||
{
|
|
||||||
int ret = pthread_cond_destroy( cond );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
|
|
||||||
|
|
||||||
ret = pthread_mutex_destroy( mutex );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xlock( pthread_mutex_t * mutex )
|
|
||||||
{
|
|
||||||
int ret = pthread_mutex_lock( mutex );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xunlock( pthread_mutex_t * mutex )
|
|
||||||
{
|
|
||||||
int ret = pthread_mutex_unlock( mutex );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
|
|
||||||
{
|
|
||||||
int ret = pthread_cond_wait( cond, mutex );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xsignal( pthread_cond_t * cond )
|
|
||||||
{
|
|
||||||
int ret = pthread_cond_signal( cond );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xbroadcast( pthread_cond_t * cond )
|
|
||||||
{
|
|
||||||
int ret = pthread_cond_broadcast( cond );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
|
|
||||||
{
|
|
||||||
int ret = pthread_create( thread, 0, routine, arg );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void xjoin( pthread_t thread )
|
|
||||||
{
|
|
||||||
int ret = pthread_join( thread, 0 );
|
|
||||||
if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
struct Slot_tally // Synchronizes splitter to muxer
|
|
||||||
{
|
|
||||||
unsigned long check_counter;
|
|
||||||
unsigned long wait_counter;
|
|
||||||
int num_free; // Number of free slots
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
pthread_cond_t slot_av; // Free slot available
|
|
||||||
|
|
||||||
Slot_tally( const int slots )
|
|
||||||
: check_counter( 0 ), wait_counter( 0 ), num_free( slots )
|
|
||||||
{ xinit( &slot_av, &mutex ); }
|
|
||||||
|
|
||||||
~Slot_tally() { xdestroy( &slot_av, &mutex ); }
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct S2w_blk // Splitter to worker data block
|
|
||||||
{
|
|
||||||
unsigned long long id; // Block serial number as read from infd
|
|
||||||
S2w_blk *next; // Next in queue
|
|
||||||
int loaded; // # of bytes in plain, may be 0 for 1st
|
|
||||||
uint8_t plain[1]; // Data read from infd, allocated: data_size
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct S2w_queue
|
|
||||||
{
|
|
||||||
S2w_blk * head; // Next ready worker shall compress this
|
|
||||||
S2w_blk * tail; // Splitter will append here
|
|
||||||
unsigned long check_counter;
|
|
||||||
unsigned long wait_counter;
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
pthread_cond_t av_or_eof; // New block available or splitter done
|
|
||||||
bool eof; // Splitter done
|
|
||||||
|
|
||||||
S2w_queue()
|
|
||||||
: head( 0 ), tail( 0 ), check_counter( 0 ), wait_counter( 0 ), eof( false )
|
|
||||||
{ xinit( &av_or_eof, &mutex ); }
|
|
||||||
|
|
||||||
~S2w_queue() { xdestroy( &av_or_eof, &mutex ); }
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct W2m_blk // Worker to muxer data block
|
|
||||||
{
|
|
||||||
unsigned long long id; // Block index as read from infd
|
|
||||||
W2m_blk *next; // Next block in list (unordered)
|
|
||||||
int produced; // Number of bytes in compr
|
|
||||||
uint8_t compr[1]; // Data to write to outfd, alloc.: compr_size
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct W2m_queue
|
|
||||||
{
|
|
||||||
unsigned long long needed_id; // Block needed for resuming writing
|
|
||||||
W2m_blk *head; // Block list (unordered)
|
|
||||||
unsigned long check_counter;
|
|
||||||
unsigned long wait_counter;
|
|
||||||
int num_working; // Number of workers still running
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
pthread_cond_t av_or_exit; // New block available or all workers exited
|
|
||||||
|
|
||||||
W2m_queue( const int num_workers )
|
|
||||||
: needed_id( 0 ), head( 0 ), check_counter( 0 ), wait_counter( 0 ),
|
|
||||||
num_working( num_workers )
|
|
||||||
{ xinit( &av_or_exit, &mutex ); }
|
|
||||||
|
|
||||||
~W2m_queue() { xdestroy( &av_or_exit, &mutex ); }
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct Splitter_arg
|
|
||||||
{
|
|
||||||
Slot_tally * slot_tally;
|
|
||||||
S2w_queue * s2w_queue;
|
|
||||||
int infd;
|
|
||||||
int data_size;
|
|
||||||
int s2w_blk_size;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
void * splitter( void * arg )
|
|
||||||
{
|
|
||||||
const Splitter_arg & tmp = *(Splitter_arg *)arg;
|
|
||||||
Slot_tally & slot_tally = *tmp.slot_tally;
|
|
||||||
S2w_queue & s2w_queue = *tmp.s2w_queue;
|
|
||||||
const int infd = tmp.infd;
|
|
||||||
const int data_size = tmp.data_size;
|
|
||||||
const int s2w_blk_size = tmp.s2w_blk_size;
|
|
||||||
|
|
||||||
for( unsigned long long id = 0; ; ++id )
|
|
||||||
{
|
|
||||||
S2w_blk * s2w_blk = (S2w_blk *)xalloc( s2w_blk_size );
|
|
||||||
|
|
||||||
// Fill block
|
|
||||||
const int rd = readblock( infd, s2w_blk->plain, data_size );
|
|
||||||
if( rd != data_size && errno ) { show_error( "read", errno ); fatal(); }
|
|
||||||
|
|
||||||
if( rd > 0 || id == 0 ) // first block can be empty
|
|
||||||
{
|
|
||||||
s2w_blk->id = id;
|
|
||||||
s2w_blk->next = 0;
|
|
||||||
s2w_blk->loaded = rd;
|
|
||||||
in_size += rd;
|
|
||||||
xlock( &slot_tally.mutex ); // Grab a free slot
|
|
||||||
++slot_tally.check_counter;
|
|
||||||
while( slot_tally.num_free == 0 )
|
|
||||||
{
|
|
||||||
++slot_tally.wait_counter;
|
|
||||||
xwait( &slot_tally.slot_av, &slot_tally.mutex );
|
|
||||||
}
|
|
||||||
--slot_tally.num_free;
|
|
||||||
xunlock( &slot_tally.mutex );
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{ (*freef)( s2w_blk ); s2w_blk = 0; }
|
|
||||||
|
|
||||||
xlock( &s2w_queue.mutex );
|
|
||||||
if( s2w_blk != 0 )
|
|
||||||
{
|
|
||||||
if( s2w_queue.tail == 0 ) s2w_queue.head = s2w_blk;
|
|
||||||
else s2w_queue.tail->next = s2w_blk;
|
|
||||||
s2w_queue.tail = s2w_blk;
|
|
||||||
xsignal( &s2w_queue.av_or_eof );
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
s2w_queue.eof = true;
|
|
||||||
xbroadcast( &s2w_queue.av_or_eof );
|
|
||||||
}
|
|
||||||
xunlock( &s2w_queue.mutex );
|
|
||||||
|
|
||||||
if( s2w_blk == 0 ) break;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void work_compr( const int dictionary_size, const int match_len_limit,
|
|
||||||
const S2w_blk & s2w_blk, W2m_queue & w2m_queue,
|
|
||||||
const int compr_size, const int w2m_blk_size )
|
|
||||||
{
|
|
||||||
assert( s2w_blk.loaded > 0 || s2w_blk.id == 0 );
|
|
||||||
|
|
||||||
W2m_blk * w2m_blk = (W2m_blk *)xalloc( w2m_blk_size );
|
|
||||||
|
|
||||||
const int dict_size = std::max( LZ_min_dictionary_size(),
|
|
||||||
std::min( dictionary_size, s2w_blk.loaded ) );
|
|
||||||
LZ_Encoder * const encoder =
|
|
||||||
LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
|
|
||||||
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
|
|
||||||
{ show_error( "LZ_compress_open failed." ); fatal(); }
|
|
||||||
|
|
||||||
int written = 0;
|
|
||||||
w2m_blk->produced = 0;
|
|
||||||
while( true )
|
|
||||||
{
|
|
||||||
if( LZ_compress_write_size( encoder ) > 0 )
|
|
||||||
{
|
|
||||||
if( written < s2w_blk.loaded )
|
|
||||||
{
|
|
||||||
const int wr = LZ_compress_write( encoder, s2w_blk.plain + written,
|
|
||||||
s2w_blk.loaded - written );
|
|
||||||
if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
|
|
||||||
written += wr;
|
|
||||||
}
|
|
||||||
if( written >= s2w_blk.loaded ) LZ_compress_finish( encoder );
|
|
||||||
}
|
|
||||||
assert( w2m_blk->produced < compr_size );
|
|
||||||
const int rd = LZ_compress_read( encoder, w2m_blk->compr + w2m_blk->produced,
|
|
||||||
compr_size - w2m_blk->produced );
|
|
||||||
if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
|
|
||||||
w2m_blk->produced += rd;
|
|
||||||
if( LZ_compress_finished( encoder ) == 1 ) break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if( LZ_compress_close( encoder ) < 0 )
|
|
||||||
{ show_error( "LZ_compress_close failed." ); fatal(); }
|
|
||||||
|
|
||||||
w2m_blk->id = s2w_blk.id;
|
|
||||||
|
|
||||||
// Push block to muxer queue
|
|
||||||
xlock( &w2m_queue.mutex );
|
|
||||||
w2m_blk->next = w2m_queue.head;
|
|
||||||
w2m_queue.head = w2m_blk;
|
|
||||||
if( w2m_blk->id == w2m_queue.needed_id ) xsignal( &w2m_queue.av_or_exit );
|
|
||||||
xunlock( &w2m_queue.mutex );
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
struct Worker_arg
|
|
||||||
{
|
|
||||||
int dictionary_size;
|
|
||||||
int match_len_limit;
|
|
||||||
S2w_queue * s2w_queue;
|
|
||||||
W2m_queue * w2m_queue;
|
|
||||||
int compr_size;
|
|
||||||
int w2m_blk_size;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
void * worker( void * arg )
|
|
||||||
{
|
|
||||||
const Worker_arg & tmp = *(Worker_arg *)arg;
|
|
||||||
const int dictionary_size = tmp.dictionary_size;
|
|
||||||
const int match_len_limit = tmp.match_len_limit;
|
|
||||||
S2w_queue & s2w_queue = *tmp.s2w_queue;
|
|
||||||
W2m_queue & w2m_queue = *tmp.w2m_queue;
|
|
||||||
const int compr_size = tmp.compr_size;
|
|
||||||
const int w2m_blk_size = tmp.w2m_blk_size;
|
|
||||||
|
|
||||||
while( true )
|
|
||||||
{
|
|
||||||
S2w_blk *s2w_blk;
|
|
||||||
|
|
||||||
// Grab a block to work on
|
|
||||||
xlock( &s2w_queue.mutex );
|
|
||||||
++s2w_queue.check_counter;
|
|
||||||
while( s2w_queue.head == 0 && !s2w_queue.eof )
|
|
||||||
{
|
|
||||||
++s2w_queue.wait_counter;
|
|
||||||
xwait( &s2w_queue.av_or_eof, &s2w_queue.mutex );
|
|
||||||
}
|
|
||||||
if( s2w_queue.head == 0 ) // No blocks available and splitter exited
|
|
||||||
{
|
|
||||||
xunlock( &s2w_queue.mutex );
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
s2w_blk = s2w_queue.head;
|
|
||||||
s2w_queue.head = s2w_blk->next;
|
|
||||||
if( s2w_queue.head == 0 ) s2w_queue.tail = 0;
|
|
||||||
xunlock( &s2w_queue.mutex );
|
|
||||||
|
|
||||||
work_compr( dictionary_size, match_len_limit, *s2w_blk, w2m_queue,
|
|
||||||
compr_size, w2m_blk_size );
|
|
||||||
(*freef)( s2w_blk );
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify muxer when last worker exits
|
|
||||||
xlock( &w2m_queue.mutex );
|
|
||||||
if( --w2m_queue.num_working == 0 && w2m_queue.head == 0 )
|
|
||||||
xsignal( &w2m_queue.av_or_exit );
|
|
||||||
xunlock( &w2m_queue.mutex );
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void muxer( Slot_tally & slot_tally, W2m_queue & w2m_queue,
|
|
||||||
const int num_slots, const int outfd )
|
|
||||||
{
|
|
||||||
unsigned long long needed_id = 0;
|
|
||||||
std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
|
|
||||||
|
|
||||||
xlock( &w2m_queue.mutex );
|
|
||||||
while( true )
|
|
||||||
{
|
|
||||||
// Grab all available compressed blocks in one step
|
|
||||||
++w2m_queue.check_counter;
|
|
||||||
while( w2m_queue.head == 0 && w2m_queue.num_working > 0 )
|
|
||||||
{
|
|
||||||
++w2m_queue.wait_counter;
|
|
||||||
xwait( &w2m_queue.av_or_exit, &w2m_queue.mutex );
|
|
||||||
}
|
|
||||||
if( w2m_queue.head == 0 ) break; // queue is empty. all workers exited
|
|
||||||
|
|
||||||
W2m_blk * w2m_blk = w2m_queue.head;
|
|
||||||
w2m_queue.head = 0;
|
|
||||||
xunlock( &w2m_queue.mutex );
|
|
||||||
|
|
||||||
// Merge blocks fetched this time into circular buffer
|
|
||||||
do {
|
|
||||||
// id collision shouldn't happen
|
|
||||||
assert( circular_buffer[w2m_blk->id%num_slots] == 0 );
|
|
||||||
circular_buffer[w2m_blk->id%num_slots] = w2m_blk;
|
|
||||||
W2m_blk * next = w2m_blk->next;
|
|
||||||
w2m_blk->next = 0;
|
|
||||||
w2m_blk = next;
|
|
||||||
} while( w2m_blk != 0 );
|
|
||||||
|
|
||||||
// Write out initial continuous sequence of reordered blocks
|
|
||||||
while( true )
|
|
||||||
{
|
|
||||||
w2m_blk = circular_buffer[needed_id%num_slots];
|
|
||||||
if( w2m_blk == 0 ) break;
|
|
||||||
|
|
||||||
out_size += w2m_blk->produced;
|
|
||||||
|
|
||||||
if( outfd >= 0 )
|
|
||||||
{
|
|
||||||
const int wr = writeblock( outfd, w2m_blk->compr, w2m_blk->produced );
|
|
||||||
if( wr != w2m_blk->produced )
|
|
||||||
{ show_error( "write", errno ); fatal(); }
|
|
||||||
}
|
|
||||||
circular_buffer[needed_id%num_slots] = 0;
|
|
||||||
++needed_id;
|
|
||||||
|
|
||||||
xlock( &slot_tally.mutex );
|
|
||||||
if( slot_tally.num_free++ == 0 ) xsignal( &slot_tally.slot_av );
|
|
||||||
xunlock( &slot_tally.mutex );
|
|
||||||
|
|
||||||
(*freef)( w2m_blk );
|
|
||||||
}
|
|
||||||
|
|
||||||
xlock( &w2m_queue.mutex );
|
|
||||||
w2m_queue.needed_id = needed_id;
|
|
||||||
}
|
|
||||||
xunlock( &w2m_queue.mutex );
|
|
||||||
|
|
||||||
for( int i = 0; i < num_slots; ++i )
|
|
||||||
if( circular_buffer[i] != 0 )
|
|
||||||
{ show_error( "circular buffer not empty" ); fatal(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
} // end namespace
|
|
||||||
|
|
||||||
|
|
||||||
int compress( const int data_size, const int dictionary_size,
|
|
||||||
const int match_len_limit, const int num_workers,
|
|
||||||
const int num_slots, const int infd, const int outfd,
|
|
||||||
const int debug_level )
|
|
||||||
{
|
|
||||||
if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
|
|
||||||
else { mallocf = malloc; freef = free; }
|
|
||||||
|
|
||||||
Slot_tally slot_tally( num_slots );
|
|
||||||
S2w_queue s2w_queue;
|
|
||||||
W2m_queue w2m_queue( num_workers );
|
|
||||||
|
|
||||||
Splitter_arg splitter_arg;
|
|
||||||
splitter_arg.slot_tally = &slot_tally;
|
|
||||||
splitter_arg.s2w_queue = &s2w_queue;
|
|
||||||
splitter_arg.infd = infd;
|
|
||||||
splitter_arg.data_size = data_size;
|
|
||||||
splitter_arg.s2w_blk_size = sizeof (S2w_blk) + data_size - 1;
|
|
||||||
|
|
||||||
pthread_t splitter_thread;
|
|
||||||
xcreate( &splitter_thread, splitter, &splitter_arg );
|
|
||||||
|
|
||||||
Worker_arg worker_arg;
|
|
||||||
worker_arg.dictionary_size = dictionary_size;
|
|
||||||
worker_arg.match_len_limit = match_len_limit;
|
|
||||||
worker_arg.s2w_queue = &s2w_queue;
|
|
||||||
worker_arg.w2m_queue = &w2m_queue;
|
|
||||||
worker_arg.compr_size = 6 + 20 + ( ( data_size / 8 ) * 9 );
|
|
||||||
worker_arg.w2m_blk_size = sizeof (W2m_blk) + worker_arg.compr_size - 1;
|
|
||||||
|
|
||||||
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
|
|
||||||
if( worker_threads == 0 )
|
|
||||||
{ show_error( "not enough memory.", errno ); fatal(); }
|
|
||||||
for( int i = 0; i < num_workers; ++i )
|
|
||||||
xcreate( &worker_threads[i], worker, &worker_arg );
|
|
||||||
|
|
||||||
muxer( slot_tally, w2m_queue, num_slots, outfd );
|
|
||||||
|
|
||||||
for( int i = num_workers - 1; i >= 0; --i )
|
|
||||||
xjoin(worker_threads[i]);
|
|
||||||
delete[] worker_threads; worker_threads = 0;
|
|
||||||
|
|
||||||
xjoin( splitter_thread );
|
|
||||||
|
|
||||||
if( verbosity >= 1 )
|
|
||||||
{
|
|
||||||
if( in_size <= 0 || out_size <= 0 )
|
|
||||||
std::fprintf( stderr, "no data compressed.\n" );
|
|
||||||
else
|
|
||||||
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
|
|
||||||
"%5.2f%% saved, %lld in, %lld out.\n",
|
|
||||||
(double)in_size / out_size,
|
|
||||||
( 8.0 * out_size ) / in_size,
|
|
||||||
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
|
|
||||||
in_size, out_size );
|
|
||||||
}
|
|
||||||
|
|
||||||
const int FW = ( sizeof (unsigned long) * 8 ) / 3 + 1;
|
|
||||||
if( debug_level & 1 )
|
|
||||||
std::fprintf( stderr,
|
|
||||||
"any worker tried to consume from splitter: %*lu\n"
|
|
||||||
"any worker stalled : %*lu\n"
|
|
||||||
"muxer tried to consume from workers : %*lu\n"
|
|
||||||
"muxer stalled : %*lu\n"
|
|
||||||
"splitter tried to fill a block : %*lu\n"
|
|
||||||
"splitter stalled : %*lu\n",
|
|
||||||
FW, s2w_queue.check_counter,
|
|
||||||
FW, s2w_queue.wait_counter,
|
|
||||||
FW, w2m_queue.check_counter,
|
|
||||||
FW, w2m_queue.wait_counter,
|
|
||||||
FW, slot_tally.check_counter,
|
|
||||||
FW, slot_tally.wait_counter );
|
|
||||||
|
|
||||||
assert( slot_tally.num_free == num_slots );
|
|
||||||
assert( s2w_queue.eof );
|
|
||||||
assert( s2w_queue.head == 0 );
|
|
||||||
assert( s2w_queue.tail == 0 );
|
|
||||||
assert( w2m_queue.num_working == 0 );
|
|
||||||
assert( w2m_queue.head == 0 );
|
|
||||||
return 0;
|
|
||||||
}
|
|
102
plzip.h
102
plzip.h
|
@ -1,4 +1,4 @@
|
||||||
/* Plzip - A parallel version of the lzip data compressor
|
/* Plzip - A parallel compressor compatible with lzip
|
||||||
Copyright (C) 2009 Laszlo Ersek.
|
Copyright (C) 2009 Laszlo Ersek.
|
||||||
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
||||||
|
|
||||||
|
@ -16,19 +16,113 @@
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
class Pretty_print
|
||||||
|
{
|
||||||
|
const char * const stdin_name;
|
||||||
|
const unsigned int stdin_name_len;
|
||||||
|
unsigned int longest_name;
|
||||||
|
std::string name_;
|
||||||
|
mutable bool first_post;
|
||||||
|
|
||||||
|
public:
|
||||||
|
Pretty_print( const std::vector< std::string > & filenames )
|
||||||
|
: stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ),
|
||||||
|
longest_name( 0 ), first_post( false )
|
||||||
|
{
|
||||||
|
for( unsigned int i = 0; i < filenames.size(); ++i )
|
||||||
|
{
|
||||||
|
const std::string & s = filenames[i];
|
||||||
|
const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() );
|
||||||
|
if( len > longest_name ) longest_name = len;
|
||||||
|
}
|
||||||
|
if( longest_name == 0 ) longest_name = stdin_name_len;
|
||||||
|
}
|
||||||
|
|
||||||
|
void set_name( const std::string & filename )
|
||||||
|
{
|
||||||
|
if( filename.size() && filename != "-" ) name_ = filename;
|
||||||
|
else name_ = stdin_name;
|
||||||
|
first_post = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void reset() const throw() { if( name_.size() ) first_post = true; }
|
||||||
|
const char * name() const throw() { return name_.c_str(); }
|
||||||
|
void operator()( const char * const msg = 0 ) const throw();
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/*--------------------- Defined in compress.cc ---------------------*/
|
||||||
|
|
||||||
|
void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex );
|
||||||
|
void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex );
|
||||||
|
void xlock( pthread_mutex_t * mutex );
|
||||||
|
void xunlock( pthread_mutex_t * mutex );
|
||||||
|
void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex );
|
||||||
|
void xsignal( pthread_cond_t * cond );
|
||||||
|
void xbroadcast( pthread_cond_t * cond );
|
||||||
|
void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg );
|
||||||
|
void xjoin( pthread_t thread );
|
||||||
|
|
||||||
|
|
||||||
|
class Slot_tally
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
unsigned long check_counter;
|
||||||
|
unsigned long wait_counter;
|
||||||
|
private:
|
||||||
|
const int num_slots; // total slots
|
||||||
|
int num_free; // remaining free slots
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
pthread_cond_t slot_av; // free slot available
|
||||||
|
|
||||||
|
public:
|
||||||
|
Slot_tally( const int slots )
|
||||||
|
: check_counter( 0 ), wait_counter( 0 ),
|
||||||
|
num_slots( slots ), num_free( slots )
|
||||||
|
{ xinit( &slot_av, &mutex ); }
|
||||||
|
|
||||||
|
~Slot_tally() { xdestroy( &slot_av, &mutex ); }
|
||||||
|
|
||||||
|
bool all_free() { return ( num_free == num_slots ); }
|
||||||
|
|
||||||
|
void get_slot() // wait for a free slot
|
||||||
|
{
|
||||||
|
xlock( &mutex );
|
||||||
|
++check_counter;
|
||||||
|
while( num_free == 0 )
|
||||||
|
{ ++wait_counter; xwait( &slot_av, &mutex ); }
|
||||||
|
--num_free;
|
||||||
|
xunlock( &mutex );
|
||||||
|
}
|
||||||
|
|
||||||
|
void leave_slot() // return a slot to the tally
|
||||||
|
{
|
||||||
|
xlock( &mutex );
|
||||||
|
if( num_free++ == 0 ) xsignal( &slot_av );
|
||||||
|
xunlock( &mutex );
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
int compress( const int data_size, const int dictionary_size,
|
int compress( const int data_size, const int dictionary_size,
|
||||||
const int match_len_limit, const int num_workers,
|
const int match_len_limit, const int num_workers,
|
||||||
const int num_slots, const int infd, const int outfd,
|
const int num_slots, const int infd, const int outfd,
|
||||||
const int debug_level );
|
const int debug_level );
|
||||||
|
|
||||||
|
|
||||||
|
/*-------------------- Defined in decompress.cc --------------------*/
|
||||||
|
|
||||||
|
int decompress( const int infd, const int outfd, const Pretty_print & pp,
|
||||||
|
const bool testing );
|
||||||
|
|
||||||
|
|
||||||
/*----------------------- Defined in main.cc -----------------------*/
|
/*----------------------- Defined in main.cc -----------------------*/
|
||||||
|
|
||||||
|
extern int verbosity;
|
||||||
|
|
||||||
void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
|
void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
|
||||||
|
void internal_error( const char * msg );
|
||||||
int readblock( const int fd, uint8_t * buf, const int size ) throw();
|
int readblock( const int fd, uint8_t * buf, const int size ) throw();
|
||||||
int writeblock( const int fd, const uint8_t * buf, const int size ) throw();
|
int writeblock( const int fd, const uint8_t * buf, const int size ) throw();
|
||||||
|
|
||||||
|
|
||||||
void fatal(); // Terminate the process
|
void fatal(); // Terminate the process
|
||||||
|
|
||||||
extern int verbosity;
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
#! /bin/sh
|
#! /bin/sh
|
||||||
# check script for Plzip - A parallel version of the lzip data compressor
|
# check script for Plzip - A parallel compressor compatible with lzip
|
||||||
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
||||||
#
|
#
|
||||||
# This script is free software: you have unlimited permission
|
# This script is free software: you have unlimited permission
|
||||||
|
|
Loading…
Add table
Reference in a new issue