529 lines
13 KiB
C++
529 lines
13 KiB
C++
/* Plzip - A parallel version of the lzip data compressor
|
|
Copyright (C) 2009 Laszlo Ersek.
|
|
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#define _FILE_OFFSET_BITS 64
|
|
|
|
#include <algorithm>
|
|
#include <cassert>
|
|
#include <cerrno>
|
|
#include <climits>
|
|
#include <csignal>
|
|
#include <cstdio>
|
|
#include <cstdlib>
|
|
#include <vector>
|
|
#include <stdint.h>
|
|
#include <unistd.h>
|
|
#include <lzlib.h>
|
|
|
|
#include "main.h"
|
|
#include "plzip.h"
|
|
|
|
#ifndef LLONG_MAX
|
|
#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
|
|
#endif
|
|
#ifndef LLONG_MIN
|
|
#define LLONG_MIN (-LLONG_MAX - 1LL)
|
|
#endif
|
|
#ifndef ULLONG_MAX
|
|
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
|
|
#endif
|
|
|
|
|
|
namespace {
|
|
|
|
long long in_size = 0;
|
|
long long out_size = 0;
|
|
|
|
void *(*mallocf)(size_t size);
|
|
void (*freef)(void *ptr);
|
|
|
|
|
|
void * trace_malloc(size_t size)
|
|
{
|
|
int save_errno = 0;
|
|
|
|
void * ret = malloc(size);
|
|
if( ret == 0 ) save_errno = errno;
|
|
fprintf(stderr, "malloc(%lu) == %p\n", (long unsigned)size, ret);
|
|
if( ret == 0 ) errno = save_errno;
|
|
return ret;
|
|
}
|
|
|
|
|
|
void trace_free(void *ptr)
|
|
{
|
|
fprintf(stderr, "free(%p)\n", ptr);
|
|
free(ptr);
|
|
}
|
|
|
|
|
|
void * xalloc(size_t size)
|
|
{
|
|
void *ret = (*mallocf)(size);
|
|
if( 0 == ret ) fail("(*mallocf)()", errno);
|
|
return ret;
|
|
}
|
|
|
|
|
|
struct S2w_blk /* Splitter to workers. */
|
|
{
|
|
unsigned long long id; /* Block serial number as read from infd. */
|
|
S2w_blk *next; /* Next in queue. */
|
|
size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
|
|
unsigned char plain[1]; /* Data read from infd, allocated: sizeof_plain. */
|
|
};
|
|
|
|
|
|
struct S2w_q
|
|
{
|
|
Cond av_or_eof; /* New block available or splitter done. */
|
|
S2w_blk *tail, /* Splitter will append here. */
|
|
*head; /* Next ready worker shall compress this. */
|
|
int eof; /* Splitter done. */
|
|
};
|
|
|
|
|
|
void
|
|
s2w_q_init(S2w_q *s2w_q)
|
|
{
|
|
xinit(&s2w_q->av_or_eof);
|
|
s2w_q->tail = 0;
|
|
s2w_q->head = 0;
|
|
s2w_q->eof = 0;
|
|
}
|
|
|
|
|
|
void
|
|
s2w_q_uninit(S2w_q *s2w_q)
|
|
{
|
|
assert(0 != s2w_q->eof);
|
|
assert(0 == s2w_q->head);
|
|
assert(0 == s2w_q->tail);
|
|
xdestroy(&s2w_q->av_or_eof);
|
|
}
|
|
|
|
|
|
struct W2m_blk /* Workers to muxer data block. */
|
|
{
|
|
unsigned long long id; /* Block index as read from infd. */
|
|
W2m_blk *next; /* Next block in list (unordered). */
|
|
int produced; /* Number of bytes in compr. */
|
|
unsigned char compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
|
|
};
|
|
|
|
|
|
struct W2m_q
|
|
{
|
|
Cond av_or_exit; /* New block available or all workers exited. */
|
|
unsigned long long needed; /* Block needed for resuming writing. */
|
|
W2m_blk *head; /* Block list (unordered). */
|
|
unsigned working; /* Number of workers still running. */
|
|
};
|
|
|
|
|
|
void
|
|
w2m_q_init(W2m_q *w2m_q, int num_workers)
|
|
{
|
|
assert(0 < num_workers);
|
|
xinit(&w2m_q->av_or_exit);
|
|
w2m_q->needed = 0;
|
|
w2m_q->head = 0;
|
|
w2m_q->working = num_workers;
|
|
}
|
|
|
|
|
|
void
|
|
w2m_q_uninit(W2m_q *w2m_q)
|
|
{
|
|
assert(0 == w2m_q->working);
|
|
assert(0 == w2m_q->head);
|
|
xdestroy(&w2m_q->av_or_exit);
|
|
}
|
|
|
|
|
|
struct M2s_q // Muxer to splitter queue
|
|
{
|
|
Cond av; // Free slot available
|
|
int num_free; // Number of free slots
|
|
|
|
M2s_q( const int slots )
|
|
{
|
|
xinit(&av);
|
|
num_free = slots;
|
|
}
|
|
|
|
~M2s_q() { xdestroy(&av); }
|
|
};
|
|
|
|
|
|
struct Splitter_arg
|
|
{
|
|
M2s_q *m2s_q;
|
|
S2w_q *s2w_q;
|
|
int infd;
|
|
int sizeof_plain;
|
|
size_t sizeof_S2w_blk;
|
|
};
|
|
|
|
|
|
void * splitter( void * arg )
|
|
{
|
|
const Splitter_arg & tmp = *(Splitter_arg *)arg;
|
|
M2s_q *m2s_q = tmp.m2s_q;
|
|
S2w_q *s2w_q = tmp.s2w_q;
|
|
const int infd = tmp.infd;
|
|
const int sizeof_plain = tmp.sizeof_plain;
|
|
const size_t sizeof_s2w_blk = tmp.sizeof_S2w_blk;
|
|
|
|
for( unsigned long long id = 0; ; ++id )
|
|
{
|
|
/* Grab a free slot. */
|
|
xlock_pred(&m2s_q->av);
|
|
while( m2s_q->num_free == 0 ) xwait(&m2s_q->av);
|
|
--m2s_q->num_free;
|
|
xunlock(&m2s_q->av);
|
|
S2w_blk * s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
|
|
|
|
/* Fill block. */
|
|
const int rd = readblock( infd, (char *)s2w_blk->plain, sizeof_plain );
|
|
if( rd != sizeof_plain && errno ) fail("read()", errno);
|
|
|
|
if( rd == 0 && id != 0 )
|
|
{
|
|
/* EOF on first read, but not for first input block. */
|
|
(*freef)(s2w_blk);
|
|
xlock(&m2s_q->av);
|
|
++m2s_q->num_free;
|
|
xunlock(&m2s_q->av);
|
|
}
|
|
else
|
|
{
|
|
s2w_blk->id = id;
|
|
s2w_blk->next = 0;
|
|
s2w_blk->loaded = rd;
|
|
in_size += rd;
|
|
}
|
|
|
|
xlock(&s2w_q->av_or_eof);
|
|
if( s2w_q->head == 0 ) xbroadcast(&s2w_q->av_or_eof);
|
|
|
|
if( rd > 0 || id == 0 )
|
|
{
|
|
if( s2w_q->tail == 0 ) s2w_q->head = s2w_blk;
|
|
else s2w_q->tail->next = s2w_blk;
|
|
s2w_q->tail = s2w_blk;
|
|
}
|
|
s2w_q->eof = ( rd == 0 );
|
|
xunlock(&s2w_q->av_or_eof);
|
|
|
|
if( rd <= 0 ) break;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
void work_lz_rd(W2m_blk *w2m_blk, const int sizeof_compr, LZ_Encoder * lz)
|
|
{
|
|
int rd;
|
|
|
|
assert(w2m_blk->produced < sizeof_compr);
|
|
rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
|
|
sizeof_compr - w2m_blk->produced);
|
|
if( -1 == rd ) {
|
|
show_error( "LZ_compress_read() failed." );
|
|
fatal();
|
|
}
|
|
w2m_blk->produced += rd;
|
|
}
|
|
|
|
|
|
void work_compr( const int dictionary_size, const int match_len_limit,
|
|
S2w_blk *s2w_blk, W2m_q *w2m_q,
|
|
const int sizeof_compr, const size_t sizeof_w2m_blk )
|
|
{
|
|
W2m_blk *w2m_blk;
|
|
|
|
assert(0 < s2w_blk->loaded || 0 == s2w_blk->id);
|
|
|
|
w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
|
|
|
|
/* Single member compression. Settings like with lzip -6. */
|
|
{
|
|
LZ_Encoder * lz;
|
|
size_t written;
|
|
|
|
lz = LZ_compress_open( dictionary_size, match_len_limit, LLONG_MAX );
|
|
if( LZ_ok != LZ_compress_errno(lz) ) {
|
|
show_error( "LZ_compress_open() failed." );
|
|
fatal();
|
|
}
|
|
|
|
written = 0;
|
|
w2m_blk->produced = 0;
|
|
while( written < s2w_blk->loaded ) {
|
|
int wr;
|
|
|
|
wr = LZ_compress_write(lz, s2w_blk->plain + written,
|
|
s2w_blk->loaded - written);
|
|
if( -1 == wr ) {
|
|
show_error( "LZ_compress_write() failed." );
|
|
fatal();
|
|
}
|
|
written += (size_t)wr;
|
|
|
|
work_lz_rd(w2m_blk, sizeof_compr, lz);
|
|
}
|
|
|
|
if( -1 == LZ_compress_finish(lz) ) {
|
|
show_error( "LZ_compress_finish() failed." );
|
|
fatal();
|
|
}
|
|
|
|
while( !LZ_compress_finished(lz) ) {
|
|
work_lz_rd(w2m_blk, sizeof_compr, lz);
|
|
}
|
|
|
|
if( -1 == LZ_compress_close(lz) ) {
|
|
show_error( "LZ_compress_close() failed." );
|
|
fatal();
|
|
}
|
|
}
|
|
|
|
w2m_blk->id = s2w_blk->id;
|
|
|
|
/* Push block to muxer. */
|
|
xlock(&w2m_q->av_or_exit);
|
|
w2m_blk->next = w2m_q->head;
|
|
w2m_q->head = w2m_blk;
|
|
if( w2m_blk->id == w2m_q->needed ) {
|
|
xsignal(&w2m_q->av_or_exit);
|
|
}
|
|
xunlock(&w2m_q->av_or_exit);
|
|
}
|
|
|
|
|
|
struct Worker_arg
|
|
{
|
|
int dictionary_size;
|
|
int match_len_limit;
|
|
S2w_q *s2w_q;
|
|
W2m_q *w2m_q;
|
|
int sizeof_compr;
|
|
size_t sizeof_W2m_blk;
|
|
};
|
|
|
|
|
|
void * worker( void * arg )
|
|
{
|
|
const Worker_arg & tmp = *(Worker_arg *)arg;
|
|
const int dictionary_size = tmp.dictionary_size;
|
|
const int match_len_limit = tmp.match_len_limit;
|
|
S2w_q *s2w_q = tmp.s2w_q;
|
|
W2m_q *w2m_q = tmp.w2m_q;
|
|
const int sizeof_compr = tmp.sizeof_compr;
|
|
const size_t sizeof_w2m_blk = tmp.sizeof_W2m_blk;
|
|
|
|
while( true )
|
|
{
|
|
S2w_blk *s2w_blk;
|
|
|
|
/* Grab a block to work on. */
|
|
xlock_pred(&s2w_q->av_or_eof);
|
|
while( 0 == s2w_q->head && !s2w_q->eof ) {
|
|
xwait(&s2w_q->av_or_eof);
|
|
}
|
|
if( 0 == s2w_q->head ) {
|
|
/* No blocks available and splitter exited. */
|
|
xunlock(&s2w_q->av_or_eof);
|
|
break;
|
|
}
|
|
s2w_blk = s2w_q->head;
|
|
s2w_q->head = s2w_blk->next;
|
|
if( 0 == s2w_q->head ) {
|
|
s2w_q->tail = 0;
|
|
}
|
|
xunlock(&s2w_q->av_or_eof);
|
|
|
|
work_compr( dictionary_size, match_len_limit, s2w_blk, w2m_q,
|
|
sizeof_compr, sizeof_w2m_blk );
|
|
(*freef)(s2w_blk);
|
|
}
|
|
|
|
/* Notify muxer when last worker exits. */
|
|
xlock(&w2m_q->av_or_exit);
|
|
if( 0 == --w2m_q->working && 0 == w2m_q->head ) {
|
|
xsignal(&w2m_q->av_or_exit);
|
|
}
|
|
xunlock(&w2m_q->av_or_exit);
|
|
return 0;
|
|
}
|
|
|
|
|
|
void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outfd )
|
|
{
|
|
unsigned long long needed_id = 0;
|
|
std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
|
|
|
|
xlock_pred(&w2m_q->av_or_exit);
|
|
while( true )
|
|
{
|
|
/* Grab all available compressed blocks in one step. */
|
|
while( w2m_q->head == 0 && w2m_q->working > 0 )
|
|
xwait(&w2m_q->av_or_exit);
|
|
|
|
if( w2m_q->head == 0 ) break; // queue is empty. all workers exited
|
|
|
|
W2m_blk * w2m_blk = w2m_q->head;
|
|
w2m_q->head = 0;
|
|
xunlock(&w2m_q->av_or_exit);
|
|
|
|
// Merge blocks fetched this time into circular buffer
|
|
do {
|
|
// id collision shouldn't happen
|
|
assert( circular_buffer[w2m_blk->id%num_slots] == 0 );
|
|
circular_buffer[w2m_blk->id%num_slots] = w2m_blk;
|
|
W2m_blk * next = w2m_blk->next;
|
|
w2m_blk->next = 0;
|
|
w2m_blk = next;
|
|
} while( w2m_blk != 0 );
|
|
|
|
// Write out initial continuous sequence of reordered blocks
|
|
while( true )
|
|
{
|
|
W2m_blk * needed_w2m_blk = circular_buffer[needed_id%num_slots];
|
|
if( needed_w2m_blk == 0 ) break;
|
|
|
|
out_size += needed_w2m_blk->produced;
|
|
|
|
if( outfd >= 0 )
|
|
{
|
|
const int wr = writeblock( outfd, (char *)needed_w2m_blk->compr, needed_w2m_blk->produced );
|
|
if( wr != needed_w2m_blk->produced ) fail("write()", errno);
|
|
}
|
|
circular_buffer[needed_id%num_slots] = 0;
|
|
++needed_id;
|
|
|
|
xlock(&m2s_q->av);
|
|
if( 0 == m2s_q->num_free++ ) xsignal(&m2s_q->av);
|
|
xunlock(&m2s_q->av);
|
|
|
|
(*freef)(needed_w2m_blk);
|
|
}
|
|
|
|
xlock_pred(&w2m_q->av_or_exit);
|
|
w2m_q->needed = needed_id;
|
|
}
|
|
xunlock(&w2m_q->av_or_exit);
|
|
|
|
for( int i = 0; i < num_slots; ++i )
|
|
if( circular_buffer[i] != 0 )
|
|
{ show_error( "circular buffer not empty" ); fatal(); }
|
|
}
|
|
|
|
} // end namespace
|
|
|
|
|
|
void * muxer( void * arg )
|
|
{
|
|
const Muxer_arg & tmp = *(Muxer_arg *)arg;
|
|
const int dictionary_size = tmp.dictionary_size;
|
|
const int match_len_limit = tmp.match_len_limit;
|
|
const int num_workers = tmp.num_workers;
|
|
const int num_slots = tmp.num_slots;
|
|
const int debug_level = tmp.debug_level;
|
|
const int infd = tmp.infd;
|
|
const int outfd = tmp.outfd;
|
|
S2w_q s2w_q;
|
|
W2m_q w2m_q;
|
|
|
|
if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
|
|
else { mallocf = malloc; freef = free; }
|
|
|
|
s2w_q_init(&s2w_q);
|
|
w2m_q_init(&w2m_q, num_workers);
|
|
M2s_q m2s_q( num_slots );
|
|
|
|
|
|
Splitter_arg splitter_arg;
|
|
splitter_arg.m2s_q = &m2s_q;
|
|
splitter_arg.s2w_q = &s2w_q;
|
|
splitter_arg.infd = infd;
|
|
splitter_arg.sizeof_plain = 2 * std::max( 65536, dictionary_size );
|
|
splitter_arg.sizeof_S2w_blk = sizeof(S2w_blk) + splitter_arg.sizeof_plain - 1;
|
|
|
|
pthread_t splitter_thread;
|
|
xcreate(&splitter_thread, splitter, &splitter_arg);
|
|
|
|
Worker_arg worker_arg;
|
|
worker_arg.dictionary_size = dictionary_size;
|
|
worker_arg.match_len_limit = match_len_limit;
|
|
worker_arg.s2w_q = &s2w_q;
|
|
worker_arg.w2m_q = &w2m_q;
|
|
worker_arg.sizeof_compr = 6 + 20 + ( ( splitter_arg.sizeof_plain / 8 ) * 9 );
|
|
worker_arg.sizeof_W2m_blk = sizeof(W2m_blk) + worker_arg.sizeof_compr - 1;
|
|
|
|
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
|
|
if( worker_threads == 0 ) fail("not enough memory.", errno);
|
|
for( int i = 0; i < num_workers; ++i )
|
|
xcreate(&worker_threads[i], worker, &worker_arg);
|
|
|
|
muxer_loop( &w2m_q, &m2s_q, num_slots, outfd );
|
|
|
|
for( int i = num_workers - 1; i >= 0; --i )
|
|
xjoin(worker_threads[i]);
|
|
delete[] worker_threads; worker_threads = 0;
|
|
|
|
xjoin(splitter_thread);
|
|
|
|
if( verbosity >= 1 )
|
|
{
|
|
if( in_size <= 0 || out_size <= 0 )
|
|
std::fprintf( stderr, "no data compressed.\n" );
|
|
else
|
|
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
|
|
"%5.2f%% saved, %lld in, %lld out.\n",
|
|
(double)in_size / out_size,
|
|
( 8.0 * out_size ) / in_size,
|
|
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
|
|
in_size, out_size );
|
|
}
|
|
|
|
const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1;
|
|
if( ( debug_level & 1 ) && 0 > fprintf(stderr,
|
|
"any worker tried to consume from splitter: %*lu\n"
|
|
"any worker stalled : %*lu\n"
|
|
"muxer tried to consume from workers : %*lu\n"
|
|
"muxer stalled : %*lu\n"
|
|
"splitter tried to consume from muxer : %*lu\n"
|
|
"splitter stalled : %*lu\n",
|
|
FW, s2w_q.av_or_eof.ccount,
|
|
FW, s2w_q.av_or_eof.wcount,
|
|
FW, w2m_q.av_or_exit.ccount,
|
|
FW, w2m_q.av_or_exit.wcount,
|
|
FW, m2s_q.av.ccount,
|
|
FW, m2s_q.av.wcount) )
|
|
{
|
|
fatal();
|
|
}
|
|
|
|
assert( m2s_q.num_free == num_slots );
|
|
w2m_q_uninit(&w2m_q);
|
|
s2w_q_uninit(&s2w_q);
|
|
xraise(SIGUSR2);
|
|
return 0;
|
|
}
|