670 lines
15 KiB
C++
670 lines
15 KiB
C++
|
/* Plzip - A parallel version of the lzip data compressor
|
||
|
Copyright (C) 2009 Laszlo Ersek.
|
||
|
Copyright (C) 2009 Antonio Diaz Diaz.
|
||
|
|
||
|
This program is free software: you can redistribute it and/or modify
|
||
|
it under the terms of the GNU General Public License as published by
|
||
|
the Free Software Foundation, either version 3 of the License, or
|
||
|
(at your option) any later version.
|
||
|
|
||
|
This program is distributed in the hope that it will be useful,
|
||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
GNU General Public License for more details.
|
||
|
|
||
|
You should have received a copy of the GNU General Public License
|
||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
|
*/
|
||
|
|
||
|
#define _FILE_OFFSET_BITS 64
|
||
|
|
||
|
#include <cassert>
|
||
|
#include <cerrno>
|
||
|
#include <climits>
|
||
|
#include <csignal>
|
||
|
#include <cstdio>
|
||
|
#include <cstdlib>
|
||
|
#include <stdint.h>
|
||
|
#include <unistd.h>
|
||
|
#include <lzlib.h>
|
||
|
|
||
|
#include "main.h"
|
||
|
#include "plzip.h"
|
||
|
#include "lacos_rbtree.h"
|
||
|
|
||
|
|
||
|
struct S2w_blk /* Splitter to workers. */
|
||
|
{
|
||
|
uint64_t id; /* Block serial number as read from infd. */
|
||
|
S2w_blk *next; /* Next in queue. */
|
||
|
size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
|
||
|
char unsigned plain[1]; /* Data read from infd, allocated: sizeof_plain. */
|
||
|
};
|
||
|
|
||
|
|
||
|
struct S2w_q
|
||
|
{
|
||
|
Cond av_or_eof; /* New block available or splitter done. */
|
||
|
S2w_blk *tail, /* Splitter will append here. */
|
||
|
*head; /* Next ready worker shall compress this. */
|
||
|
int eof; /* Splitter done. */
|
||
|
};
|
||
|
|
||
|
|
||
|
static void
|
||
|
s2w_q_init(S2w_q *s2w_q)
|
||
|
{
|
||
|
xinit(&s2w_q->av_or_eof);
|
||
|
s2w_q->tail = 0;
|
||
|
s2w_q->head = 0;
|
||
|
s2w_q->eof = 0;
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
s2w_q_uninit(S2w_q *s2w_q)
|
||
|
{
|
||
|
assert(0 != s2w_q->eof);
|
||
|
assert(0 == s2w_q->head);
|
||
|
assert(0 == s2w_q->tail);
|
||
|
xdestroy(&s2w_q->av_or_eof);
|
||
|
}
|
||
|
|
||
|
|
||
|
struct W2m_blk /* Workers to muxer. */
|
||
|
{
|
||
|
uint64_t id; /* Block index as read from infd. */
|
||
|
W2m_blk *next; /* Next block in list (unordered). */
|
||
|
size_t produced; /* Number of bytes in compr. */
|
||
|
char unsigned compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
|
||
|
};
|
||
|
|
||
|
|
||
|
static int
|
||
|
w2m_blk_cmp(const void *v_a, const void *v_b)
|
||
|
{
|
||
|
uint64_t a,
|
||
|
b;
|
||
|
|
||
|
a = ((const W2m_blk *)v_a)->id;
|
||
|
b = ((const W2m_blk *)v_b)->id;
|
||
|
|
||
|
return
|
||
|
a < b ? -1
|
||
|
: a > b ? 1
|
||
|
: 0;
|
||
|
}
|
||
|
|
||
|
|
||
|
struct W2m_q
|
||
|
{
|
||
|
Cond av_or_exit; /* New block available or all workers exited. */
|
||
|
uint64_t needed; /* Block needed for resuming writing. */
|
||
|
W2m_blk *head; /* Block list (unordered). */
|
||
|
unsigned working; /* Number of workers still running. */
|
||
|
};
|
||
|
|
||
|
|
||
|
static void
|
||
|
w2m_q_init(W2m_q *w2m_q, unsigned num_worker)
|
||
|
{
|
||
|
assert(0u < num_worker);
|
||
|
xinit(&w2m_q->av_or_exit);
|
||
|
w2m_q->needed = 0u;
|
||
|
w2m_q->head = 0;
|
||
|
w2m_q->working = num_worker;
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
w2m_q_uninit(W2m_q *w2m_q)
|
||
|
{
|
||
|
assert(0u == w2m_q->working);
|
||
|
assert(0 == w2m_q->head);
|
||
|
xdestroy(&w2m_q->av_or_exit);
|
||
|
}
|
||
|
|
||
|
|
||
|
struct M2s_q /* Muxer to splitter. */
|
||
|
{
|
||
|
Cond av; /* Free slot available. */
|
||
|
unsigned num_free; /* Number of free slots. */
|
||
|
};
|
||
|
|
||
|
|
||
|
static void
|
||
|
m2s_q_init(M2s_q *m2s_q, unsigned num_free)
|
||
|
{
|
||
|
assert(0u < num_free);
|
||
|
xinit(&m2s_q->av);
|
||
|
m2s_q->num_free = num_free;
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
m2s_q_uninit(M2s_q *m2s_q, unsigned num_free)
|
||
|
{
|
||
|
assert(m2s_q->num_free == num_free);
|
||
|
xdestroy(&m2s_q->av);
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
split(M2s_q *m2s_q, S2w_q *s2w_q, int infd,
|
||
|
const size_t sizeof_plain, const size_t sizeof_s2w_blk)
|
||
|
{
|
||
|
uint64_t id;
|
||
|
ssize_t rd;
|
||
|
|
||
|
id = 0u;
|
||
|
do {
|
||
|
S2w_blk *s2w_blk;
|
||
|
size_t vacant;
|
||
|
|
||
|
/* Grab a free slot. */
|
||
|
xlock_pred(&m2s_q->av);
|
||
|
while (0u == m2s_q->num_free) {
|
||
|
xwait(&m2s_q->av);
|
||
|
}
|
||
|
--m2s_q->num_free;
|
||
|
xunlock(&m2s_q->av);
|
||
|
s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
|
||
|
|
||
|
/* Fill block. */
|
||
|
vacant = sizeof_plain;
|
||
|
do {
|
||
|
rd = read(infd, s2w_blk->plain + (sizeof_plain - vacant),
|
||
|
vacant > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : vacant);
|
||
|
} while (0 < rd && 0u < (vacant -= (size_t)rd));
|
||
|
|
||
|
/* Read error. */
|
||
|
if (-1 == rd) {
|
||
|
fail("read()", errno);
|
||
|
}
|
||
|
|
||
|
if (sizeof_plain == vacant && 0u < id) {
|
||
|
/* EOF on first read, but not for first input block. */
|
||
|
(*freef)(s2w_blk);
|
||
|
xlock(&m2s_q->av);
|
||
|
++m2s_q->num_free;
|
||
|
xunlock(&m2s_q->av);
|
||
|
}
|
||
|
else {
|
||
|
s2w_blk->id = id;
|
||
|
s2w_blk->next = 0;
|
||
|
s2w_blk->loaded = sizeof_plain - vacant;
|
||
|
}
|
||
|
|
||
|
/* We either push a block, or set EOF, or both. */
|
||
|
assert(sizeof_plain > vacant || 0 == rd);
|
||
|
|
||
|
xlock(&s2w_q->av_or_eof);
|
||
|
if (0 == s2w_q->head) {
|
||
|
xbroadcast(&s2w_q->av_or_eof);
|
||
|
}
|
||
|
|
||
|
if (sizeof_plain > vacant || 0u == id) {
|
||
|
if (0 == s2w_q->tail) {
|
||
|
s2w_q->head = s2w_blk;
|
||
|
}
|
||
|
else {
|
||
|
s2w_q->tail->next = s2w_blk;
|
||
|
}
|
||
|
s2w_q->tail = s2w_blk;
|
||
|
}
|
||
|
s2w_q->eof = (0 == rd);
|
||
|
xunlock(&s2w_q->av_or_eof);
|
||
|
|
||
|
/*
|
||
|
If we didn't push a block, then this is bogus, but then we did set EOF,
|
||
|
so it doesn't matter, because we'll leave immediately.
|
||
|
*/
|
||
|
++id;
|
||
|
} while (0 < rd);
|
||
|
}
|
||
|
|
||
|
|
||
|
struct Split_arg
|
||
|
{
|
||
|
M2s_q *m2s_q;
|
||
|
S2w_q *s2w_q;
|
||
|
int infd;
|
||
|
size_t sizeof_plain,
|
||
|
sizeof_S2w_blk;
|
||
|
};
|
||
|
|
||
|
|
||
|
static void *
|
||
|
split_wrap(void *v_split_arg)
|
||
|
{
|
||
|
Split_arg *split_arg = (Split_arg *)v_split_arg;
|
||
|
|
||
|
split(
|
||
|
split_arg->m2s_q,
|
||
|
split_arg->s2w_q,
|
||
|
split_arg->infd,
|
||
|
split_arg->sizeof_plain,
|
||
|
split_arg->sizeof_S2w_blk
|
||
|
);
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
work_lz_rd(W2m_blk *w2m_blk, const size_t sizeof_compr, void *lz)
|
||
|
{
|
||
|
int rd;
|
||
|
|
||
|
assert(w2m_blk->produced < sizeof_compr);
|
||
|
rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
|
||
|
sizeof_compr - w2m_blk->produced);
|
||
|
if (-1 == rd) {
|
||
|
show_error( "LZ_compress_read() failed." );
|
||
|
fatal();
|
||
|
}
|
||
|
w2m_blk->produced += (size_t)rd;
|
||
|
}
|
||
|
|
||
|
|
||
|
struct Compr_lev
|
||
|
{
|
||
|
unsigned dict_size,
|
||
|
mx_match;
|
||
|
};
|
||
|
|
||
|
|
||
|
static const Compr_lev compr_lev[] = {
|
||
|
{ 1u * 1024u * 1024u, 10u },
|
||
|
{ 1u * 1024u * 1024u, 12u },
|
||
|
{ 1u * 1024u * 1024u, 17u },
|
||
|
{ 2u * 1024u * 1024u, 26u },
|
||
|
{ 4u * 1024u * 1024u, 44u },
|
||
|
{ 8u * 1024u * 1024u, 80u },
|
||
|
{ 16u * 1024u * 1024u, 108u },
|
||
|
{ 16u * 1024u * 1024u, 163u },
|
||
|
{ 32u * 1024u * 1024u, 273u }
|
||
|
};
|
||
|
|
||
|
|
||
|
static void
|
||
|
work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx,
|
||
|
const size_t sizeof_compr, const size_t sizeof_w2m_blk)
|
||
|
{
|
||
|
W2m_blk *w2m_blk;
|
||
|
|
||
|
assert(0u < s2w_blk->loaded || 0u == s2w_blk->id);
|
||
|
|
||
|
w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
|
||
|
|
||
|
/* Single member compression. Settings like with lzip -6. */
|
||
|
{
|
||
|
void *lz;
|
||
|
size_t written;
|
||
|
|
||
|
lz = LZ_compress_open(compr_lev[clidx].dict_size,
|
||
|
compr_lev[clidx].mx_match, (uint64_t)-1 / 2u);
|
||
|
if (LZ_ok != LZ_compress_errno(lz)) {
|
||
|
show_error( "LZ_compress_open() failed." );
|
||
|
fatal();
|
||
|
}
|
||
|
|
||
|
written = 0u;
|
||
|
w2m_blk->produced = 0u;
|
||
|
while (written < s2w_blk->loaded) {
|
||
|
int wr;
|
||
|
|
||
|
wr = LZ_compress_write(lz, s2w_blk->plain + written,
|
||
|
s2w_blk->loaded - written);
|
||
|
if (-1 == wr) {
|
||
|
show_error( "LZ_compress_write() failed." );
|
||
|
fatal();
|
||
|
}
|
||
|
written += (size_t)wr;
|
||
|
|
||
|
work_lz_rd(w2m_blk, sizeof_compr, lz);
|
||
|
}
|
||
|
|
||
|
if (-1 == LZ_compress_finish(lz)) {
|
||
|
show_error( "LZ_compress_finish() failed." );
|
||
|
fatal();
|
||
|
}
|
||
|
|
||
|
while (!LZ_compress_finished(lz)) {
|
||
|
work_lz_rd(w2m_blk, sizeof_compr, lz);
|
||
|
}
|
||
|
|
||
|
if (-1 == LZ_compress_close(lz)) {
|
||
|
show_error( "LZ_compress_close() failed." );
|
||
|
fatal();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
w2m_blk->id = s2w_blk->id;
|
||
|
|
||
|
/* Push block to muxer. */
|
||
|
xlock(&w2m_q->av_or_exit);
|
||
|
w2m_blk->next = w2m_q->head;
|
||
|
w2m_q->head = w2m_blk;
|
||
|
if (w2m_blk->id == w2m_q->needed) {
|
||
|
xsignal(&w2m_q->av_or_exit);
|
||
|
}
|
||
|
xunlock(&w2m_q->av_or_exit);
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
work(S2w_q *s2w_q, W2m_q *w2m_q, unsigned clidx,
|
||
|
const size_t sizeof_compr, const size_t sizeof_w2m_blk)
|
||
|
{
|
||
|
for (;;) {
|
||
|
S2w_blk *s2w_blk;
|
||
|
|
||
|
/* Grab a block to work on. */
|
||
|
xlock_pred(&s2w_q->av_or_eof);
|
||
|
while (0 == s2w_q->head && !s2w_q->eof) {
|
||
|
xwait(&s2w_q->av_or_eof);
|
||
|
}
|
||
|
if (0 == s2w_q->head) {
|
||
|
/* No blocks available and splitter exited. */
|
||
|
xunlock(&s2w_q->av_or_eof);
|
||
|
break;
|
||
|
}
|
||
|
s2w_blk = s2w_q->head;
|
||
|
s2w_q->head = s2w_blk->next;
|
||
|
if (0 == s2w_q->head) {
|
||
|
s2w_q->tail = 0;
|
||
|
}
|
||
|
xunlock(&s2w_q->av_or_eof);
|
||
|
|
||
|
work_compr(s2w_blk, w2m_q, clidx, sizeof_compr, sizeof_w2m_blk);
|
||
|
(*freef)(s2w_blk);
|
||
|
}
|
||
|
|
||
|
/* Notify muxer when last worker exits. */
|
||
|
xlock(&w2m_q->av_or_exit);
|
||
|
if (0u == --w2m_q->working && 0 == w2m_q->head) {
|
||
|
xsignal(&w2m_q->av_or_exit);
|
||
|
}
|
||
|
xunlock(&w2m_q->av_or_exit);
|
||
|
}
|
||
|
|
||
|
|
||
|
struct Work_arg
|
||
|
{
|
||
|
S2w_q *s2w_q;
|
||
|
W2m_q *w2m_q;
|
||
|
unsigned clidx;
|
||
|
size_t sizeof_compr,
|
||
|
sizeof_W2m_blk;
|
||
|
};
|
||
|
|
||
|
|
||
|
static void *
|
||
|
work_wrap(void *v_work_arg)
|
||
|
{
|
||
|
Work_arg *work_arg;
|
||
|
|
||
|
work_arg = (Work_arg *)v_work_arg;
|
||
|
work(
|
||
|
work_arg->s2w_q,
|
||
|
work_arg->w2m_q,
|
||
|
work_arg->clidx,
|
||
|
work_arg->sizeof_compr,
|
||
|
work_arg->sizeof_W2m_blk
|
||
|
);
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
|
||
|
static void *
|
||
|
reord_alloc(size_t size, void *)
|
||
|
{
|
||
|
return (*mallocf)(size);
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
reord_dealloc(void *ptr, void *)
|
||
|
{
|
||
|
(*freef)(ptr);
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
mux_write(M2s_q *m2s_q, lacos_rbtree_node **reord,
|
||
|
uint64_t *reord_needed, int outfd)
|
||
|
{
|
||
|
assert(0 != *reord);
|
||
|
|
||
|
/*
|
||
|
Go on until the tree becomes empty or the next block is found to be
|
||
|
missing.
|
||
|
*/
|
||
|
do {
|
||
|
lacos_rbtree_node *reord_head;
|
||
|
W2m_blk *reord_w2m_blk;
|
||
|
|
||
|
reord_head = lacos_rbtree_min(*reord);
|
||
|
assert(0 != reord_head);
|
||
|
|
||
|
reord_w2m_blk = (W2m_blk *)(*(void **)reord_head);
|
||
|
if (reord_w2m_blk->id != *reord_needed) {
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
/* Write out "reord_w2m_blk". */
|
||
|
if (-1 != outfd) {
|
||
|
char unsigned *cp;
|
||
|
|
||
|
cp = reord_w2m_blk->compr;
|
||
|
while (reord_w2m_blk->produced > 0u) {
|
||
|
ssize_t written;
|
||
|
|
||
|
written = write(outfd, cp, reord_w2m_blk->produced > (size_t)SSIZE_MAX
|
||
|
? (size_t)SSIZE_MAX : reord_w2m_blk->produced);
|
||
|
if (-1 == written) {
|
||
|
fail("write()", errno);
|
||
|
}
|
||
|
|
||
|
reord_w2m_blk->produced -= (size_t)written;
|
||
|
cp += written;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
++*reord_needed;
|
||
|
|
||
|
xlock(&m2s_q->av);
|
||
|
if (0u == m2s_q->num_free++) {
|
||
|
xsignal(&m2s_q->av);
|
||
|
}
|
||
|
xunlock(&m2s_q->av);
|
||
|
|
||
|
lacos_rbtree_delete(
|
||
|
reord, /* new_root */
|
||
|
reord_head, /* old_node */
|
||
|
0, /* old_data */
|
||
|
reord_dealloc, /* dealloc() */
|
||
|
0 /* alloc_ctl */
|
||
|
);
|
||
|
|
||
|
/* Release "reord_w2m_blk". */
|
||
|
(*freef)(reord_w2m_blk);
|
||
|
} while (0 != *reord);
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
mux(W2m_q *w2m_q, M2s_q *m2s_q, int outfd)
|
||
|
{
|
||
|
lacos_rbtree_node *reord;
|
||
|
uint64_t reord_needed;
|
||
|
|
||
|
reord = 0;
|
||
|
reord_needed = 0u;
|
||
|
|
||
|
xlock_pred(&w2m_q->av_or_exit);
|
||
|
for (;;) {
|
||
|
W2m_blk *w2m_blk;
|
||
|
|
||
|
/* Grab all available compressed blocks in one step. */
|
||
|
while (0 == w2m_q->head && 0u < w2m_q->working) {
|
||
|
xwait(&w2m_q->av_or_exit);
|
||
|
}
|
||
|
|
||
|
if (0 == w2m_q->head) {
|
||
|
/* w2m_q is empty and all workers exited */
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
w2m_blk = w2m_q->head;
|
||
|
w2m_q->head = 0;
|
||
|
xunlock(&w2m_q->av_or_exit);
|
||
|
|
||
|
/* Merge blocks fetched this time into tree. */
|
||
|
do {
|
||
|
lacos_rbtree_node *new_node;
|
||
|
W2m_blk *next;
|
||
|
|
||
|
if (-1 == lacos_rbtree_insert(
|
||
|
&reord, /* new_root */
|
||
|
&new_node, /* new_node */
|
||
|
w2m_blk, /* new_data */
|
||
|
w2m_blk_cmp, /* cmp() */
|
||
|
reord_alloc, /* alloc() */
|
||
|
0 /* alloc_ctl */
|
||
|
)) {
|
||
|
/* id collision shouldn't happen */
|
||
|
assert(0 == new_node);
|
||
|
show_error( "lacos_rbtree_insert(): out of memory." );
|
||
|
fatal();
|
||
|
}
|
||
|
|
||
|
next = w2m_blk->next;
|
||
|
w2m_blk->next = 0;
|
||
|
w2m_blk = next;
|
||
|
} while (0 != w2m_blk);
|
||
|
|
||
|
/* Write out initial continuous sequence of reordered blocks. */
|
||
|
mux_write(m2s_q, &reord, &reord_needed, outfd);
|
||
|
|
||
|
xlock_pred(&w2m_q->av_or_exit);
|
||
|
w2m_q->needed = reord_needed;
|
||
|
}
|
||
|
xunlock(&w2m_q->av_or_exit);
|
||
|
|
||
|
assert(0 == reord);
|
||
|
}
|
||
|
|
||
|
|
||
|
static void
|
||
|
plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs,
|
||
|
int infd, int outfd)
|
||
|
{
|
||
|
S2w_q s2w_q;
|
||
|
W2m_q w2m_q;
|
||
|
M2s_q m2s_q;
|
||
|
Split_arg split_arg;
|
||
|
pthread_t splitter;
|
||
|
Work_arg work_arg;
|
||
|
pthread_t *worker;
|
||
|
unsigned i;
|
||
|
|
||
|
assert(clidx < sizeof compr_lev / sizeof compr_lev[0]);
|
||
|
|
||
|
s2w_q_init(&s2w_q);
|
||
|
w2m_q_init(&w2m_q, num_worker);
|
||
|
m2s_q_init(&m2s_q, num_slot);
|
||
|
|
||
|
#define SIZES(struc, arr, arsz_unsigned, arg) \
|
||
|
do { \
|
||
|
unsigned tmp; \
|
||
|
\
|
||
|
tmp = arsz_unsigned; \
|
||
|
if ((size_t)-1 < tmp) { \
|
||
|
show_error( "size_t overflow in sizeof_" #arr "." ); \
|
||
|
fatal(); \
|
||
|
} \
|
||
|
arg ## _arg . sizeof_ ## arr = tmp; \
|
||
|
\
|
||
|
if ((size_t)-1 - sizeof(struc) \
|
||
|
< arg ## _arg . sizeof_ ## arr - (size_t)1) { \
|
||
|
show_error( "size_t overflow in sizeof_" #struc "." ); \
|
||
|
fatal(); \
|
||
|
} \
|
||
|
arg ## _arg . sizeof_ ## struc = sizeof(struc) \
|
||
|
+ (arg ## _arg . sizeof_ ## arr - (size_t)1); \
|
||
|
} while (0)
|
||
|
|
||
|
split_arg.m2s_q = &m2s_q;
|
||
|
split_arg.s2w_q = &s2w_q;
|
||
|
split_arg.infd = infd;
|
||
|
SIZES(S2w_blk, plain, 2u * compr_lev[clidx].dict_size, split);
|
||
|
xcreate(&splitter, split_wrap, &split_arg);
|
||
|
|
||
|
work_arg.s2w_q = &s2w_q;
|
||
|
work_arg.w2m_q = &w2m_q;
|
||
|
work_arg.clidx = clidx;
|
||
|
SIZES(W2m_blk, compr, (4u + 1u + 1u)
|
||
|
+ ((unsigned)split_arg.sizeof_plain * 9u + 7u) / 8u + (4u + 8u + 8u),
|
||
|
work);
|
||
|
|
||
|
#undef SIZES
|
||
|
|
||
|
assert(0u < num_worker);
|
||
|
assert((size_t)-1 / sizeof *worker >= num_worker);
|
||
|
worker = (pthread_t *)xalloc(num_worker * sizeof *worker);
|
||
|
for (i = 0u; i < num_worker; ++i) {
|
||
|
xcreate(&worker[i], work_wrap, &work_arg);
|
||
|
}
|
||
|
|
||
|
mux(&w2m_q, &m2s_q, outfd);
|
||
|
|
||
|
i = num_worker;
|
||
|
do {
|
||
|
xjoin(worker[--i]);
|
||
|
} while (0u < i);
|
||
|
(*freef)(worker);
|
||
|
|
||
|
xjoin(splitter);
|
||
|
|
||
|
const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1;
|
||
|
if (print_cctrs && 0 > fprintf(stderr,
|
||
|
"any worker tried to consume from splitter: %*lu\n"
|
||
|
"any worker stalled : %*lu\n"
|
||
|
"muxer tried to consume from workers : %*lu\n"
|
||
|
"muxer stalled : %*lu\n"
|
||
|
"splitter tried to consume from muxer : %*lu\n"
|
||
|
"splitter stalled : %*lu\n",
|
||
|
FW, s2w_q.av_or_eof.ccount,
|
||
|
FW, s2w_q.av_or_eof.wcount,
|
||
|
FW, w2m_q.av_or_exit.ccount,
|
||
|
FW, w2m_q.av_or_exit.wcount,
|
||
|
FW, m2s_q.av.ccount,
|
||
|
FW, m2s_q.av.wcount) )
|
||
|
{
|
||
|
fatal();
|
||
|
}
|
||
|
|
||
|
m2s_q_uninit(&m2s_q, num_slot);
|
||
|
w2m_q_uninit(&w2m_q);
|
||
|
s2w_q_uninit(&s2w_q);
|
||
|
}
|
||
|
|
||
|
|
||
|
void *
|
||
|
plzip_wrap(void *v_arg)
|
||
|
{
|
||
|
Plzip_arg *arg = (Plzip_arg *)v_arg;
|
||
|
plzip(
|
||
|
arg->num_worker,
|
||
|
arg->num_slot,
|
||
|
arg->clidx,
|
||
|
arg->print_cctrs,
|
||
|
arg->infd,
|
||
|
arg->outfd
|
||
|
);
|
||
|
|
||
|
xraise(SIGUSR2);
|
||
|
return 0;
|
||
|
}
|