[Darshan-commits] [Darshan] branch, dev-modular, updated. darshan-2.3.1-178-gcb808ec
Service Account
git at mcs.anl.gov
Fri Sep 18 17:26:46 CDT 2015
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "".
The branch, dev-modular has been updated
via cb808ec2ab449f156045621f0e3998a544387adc (commit)
from 8659e6a33068878916580d5f69bb1d19101d72c9 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
commit cb808ec2ab449f156045621f0e3998a544387adc
Author: Shane Snyder <ssnyder at mcs.anl.gov>
Date: Fri Sep 18 17:26:08 2015 -0500
add bzip2 implementation to darshan-logutils
-----------------------------------------------------------------------
Summary of changes:
darshan-util/darshan-logutils.c | 304 +++++++++++++++++++++++++++++----------
1 files changed, 230 insertions(+), 74 deletions(-)
Diff of changes:
diff --git a/darshan-util/darshan-logutils.c b/darshan-util/darshan-logutils.c
index e3560c3..25725ac 100644
--- a/darshan-util/darshan-logutils.c
+++ b/darshan-util/darshan-logutils.c
@@ -41,6 +41,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len)
static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_libz_flush(darshan_fd fd, int region_id);
+#ifdef HAVE_LIBBZ2
+static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len);
+static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len);
+static int darshan_log_bzip2_flush(darshan_fd fd, int region_id);
+#endif
static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map);
static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p);
@@ -697,6 +702,12 @@ void darshan_log_close(darshan_fd fd)
ret = darshan_log_libz_flush(fd, fd->dz_prev_reg_id);
if(ret == 0)
break;
+#ifdef HAVE_LIBBZ2
+ case DARSHAN_BZIP2_COMP:
+ ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id);
+ if(ret == 0)
+ break;
+#endif
default:
/* if flush fails, remove the output log file */
fd->err = -1;
@@ -934,6 +945,43 @@ static int darshan_log_dzinit(darshan_fd fd)
fd->dz_strm = tmp_zstrm;
break;
}
+#ifdef HAVE_LIBBZ2
+ case DARSHAN_BZIP2_COMP:
+ {
+ bz_stream *tmp_bzstrm = malloc(sizeof(*tmp_bzstrm));
+ if(!tmp_bzstrm)
+ {
+ free(fd->dz_buf);
+ return(-1);
+ }
+ tmp_bzstrm->bzalloc = NULL;
+ tmp_bzstrm->bzfree = NULL;
+ tmp_bzstrm->opaque = NULL;
+ tmp_bzstrm->avail_in = 0;
+ tmp_bzstrm->next_in = Z_NULL;
+
+ if(fd->o_flags == O_RDONLY)
+ {
+ /* read only file, init decompress algorithm */
+ ret = BZ2_bzDecompressInit(tmp_bzstrm, 1, 0);
+ }
+ else
+ {
+ /* write only file, init compress algorithm */
+ ret = BZ2_bzCompressInit(tmp_bzstrm, 9, 1, 30);
+ tmp_bzstrm->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
+ tmp_bzstrm->next_out = (char *)fd->dz_buf;
+ }
+ if(ret != BZ_OK)
+ {
+ free(tmp_bzstrm);
+ free(fd->dz_buf);
+ return(-1);
+ }
+ fd->dz_strm = tmp_bzstrm;
+ break;
+ }
+#endif
default:
fprintf(stderr, "Error: invalid compression type.\n");
return(-1);
@@ -948,15 +996,20 @@ static void darshan_log_dzdestroy(darshan_fd fd)
{
case DARSHAN_ZLIB_COMP:
if(fd->o_flags == O_RDONLY)
- {
inflateEnd(fd->dz_strm);
- }
else
- {
deflateEnd(fd->dz_strm);
- }
free(fd->dz_strm);
break;
+#ifdef HAVE_LIBBZ2
+ case DARSHAN_BZIP2_COMP:
+ if(fd->o_flags == O_RDONLY)
+ BZ2_bzDecompressEnd(fd->dz_strm);
+ else
+ BZ2_bzCompressEnd(fd->dz_strm);
+ free(fd->dz_strm);
+ break;
+#endif
default:
fprintf(stderr, "Error: invalid compression type.\n");
}
@@ -974,6 +1027,11 @@ static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len)
case DARSHAN_ZLIB_COMP:
ret = darshan_log_libz_read(fd, region_id, buf, len);
break;
+#ifdef HAVE_LIBBZ2
+ case DARSHAN_BZIP2_COMP:
+ ret = darshan_log_bzip2_read(fd, region_id, buf, len);
+ break;
+#endif
default:
fprintf(stderr, "Error: invalid compression type.\n");
return(-1);
@@ -991,6 +1049,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len)
case DARSHAN_ZLIB_COMP:
ret = darshan_log_libz_write(fd, region_id, buf, len);
break;
+#ifdef HAVE_LIBBZ2
+ case DARSHAN_BZIP2_COMP:
+ ret = darshan_log_bzip2_write(fd, region_id, buf, len);
+ break;
+#endif
default:
fprintf(stderr, "Error: invalid compression type.\n");
return(-1);
@@ -1007,7 +1070,7 @@ static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int le
struct darshan_log_map map;
z_stream *z_strmp = (z_stream *)fd->dz_strm;
- assert(fd->dz_strm);
+ assert(z_strmp);
/* if new log region, we reload buffers and clear eor flag */
if(region_id != fd->dz_prev_reg_id)
@@ -1080,7 +1143,7 @@ static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int l
struct darshan_log_map *map_p;
z_stream *z_strmp = (z_stream *)fd->dz_strm;
- assert(fd->dz_strm);
+ assert(z_strmp);
/* if new log region, finish prev region's zstream and flush to log file */
if(region_id != fd->dz_prev_reg_id)
@@ -1149,6 +1212,8 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id)
struct darshan_log_map *map_p;
z_stream *z_strmp = (z_stream *)fd->dz_strm;
+ assert(z_strmp);
+
if(region_id == DARSHAN_JOB_REGION_ID)
map_p = &(fd->job_map);
else if(region_id == DARSHAN_REC_MAP_REGION_ID)
@@ -1185,110 +1250,201 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id)
return(0);
}
-#if 0
#ifdef HAVE_LIBBZ2
-static int darshan_bzip2_decomp(char* comp_buf, int comp_buf_sz,
- char* decomp_buf, int* inout_decomp_buf_sz)
+
+static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len)
{
int ret;
- int total_out = 0;
- bz_stream tmp_stream;
-
- memset(&tmp_stream, 0, sizeof(tmp_stream));
- tmp_stream.bzalloc = NULL;
- tmp_stream.bzfree = NULL;
- tmp_stream.opaque = NULL;
- tmp_stream.next_in = comp_buf;
- tmp_stream.avail_in = comp_buf_sz;
- tmp_stream.next_out = decomp_buf;
- tmp_stream.avail_out = *inout_decomp_buf_sz;
-
- ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0);
- if(ret != BZ_OK)
+ int total_bytes = 0;
+ int tmp_out_bytes;
+ struct darshan_log_map map;
+ bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
+
+ assert(bz_strmp);
+
+ /* if new log region, we reload buffers and clear eor flag */
+ if(region_id != fd->dz_prev_reg_id)
{
- return(-1);
+ bz_strmp->avail_in = 0;
+ fd->dz_eor = 0;
+ fd->dz_prev_reg_id = region_id;
}
- /* while we have not finished consuming all of the uncompressed input data */
- while(tmp_stream.avail_in)
+ if(region_id == DARSHAN_JOB_REGION_ID)
+ map = fd->job_map;
+ else if(region_id == DARSHAN_REC_MAP_REGION_ID)
+ map = fd->rec_map;
+ else
+ map = fd->mod_map[region_id];
+
+ bz_strmp->avail_out = len;
+ bz_strmp->next_out = buf;
+
+ /* we just decompress until the output buffer is full, assuming there
+ * is enough compressed data in file to satisfy the request size.
+ */
+ while(bz_strmp->avail_out)
{
- if(tmp_stream.avail_out == 0)
+ /* check if we need more compressed data */
+ if(bz_strmp->avail_in == 0)
{
- /* We ran out of buffer space for decompression. In theory,
- * we could just alloc more space, but probably just easier
- * to bump up the default size of the output buffer.
+ /* if the eor flag is set, clear it and return -- future
+ * reads of this log region will restart at the beginning
*/
- BZ2_bzDecompressEnd(&tmp_stream);
- return(-1);
+ if(fd->dz_eor)
+ {
+ fd->dz_eor = 0;
+ break;
+ }
+
+ /* read more data from input file */
+ ret = darshan_log_dzload(fd, map);
+ if(ret < 0)
+ return(-1);
+ assert(fd->dz_size > 0);
+
+ bz_strmp->avail_in = fd->dz_size;
+ bz_strmp->next_in = (char *)fd->dz_buf;
}
- /* decompress data */
- ret = BZ2_bzDecompress(&tmp_stream);
- if(ret != BZ_STREAM_END)
+ tmp_out_bytes = bz_strmp->total_out_lo32;
+ ret = BZ2_bzDecompress(bz_strmp);
+ if(ret != BZ_OK && ret != BZ_STREAM_END)
{
- BZ2_bzDecompressEnd(&tmp_stream);
+ fprintf(stderr, "Error: unable to decompress darshan log data.\n");
return(-1);
}
+ total_bytes += (bz_strmp->total_out_lo32 - tmp_out_bytes);
- assert(tmp_stream.total_out_hi32 == 0);
- total_out += tmp_stream.total_out_lo32;
- if(tmp_stream.avail_in)
+ /* reset the decompression if we encountered end of stream */
+ if(ret == BZ_STREAM_END)
{
- /* reinitialize bzip2 stream, we have more data to
- * decompress
- */
- BZ2_bzDecompressEnd(&tmp_stream);
- ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0);
- if(ret != BZ_OK)
- {
- return(-1);
- }
+ BZ2_bzDecompressEnd(bz_strmp);
+ BZ2_bzDecompressInit(bz_strmp, 1, 0);
}
}
- BZ2_bzDecompressEnd(&tmp_stream);
- *inout_decomp_buf_sz = total_out;
- return(0);
+ return(total_bytes);
}
-static int darshan_bzip2_comp(char* decomp_buf, int decomp_buf_sz,
- char* comp_buf, int* inout_comp_buf_sz)
+static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len)
{
int ret;
- bz_stream tmp_stream;
-
- memset(&tmp_stream, 0, sizeof(tmp_stream));
- tmp_stream.bzalloc = NULL;
- tmp_stream.bzfree = NULL;
- tmp_stream.opaque = NULL;
- tmp_stream.next_in = decomp_buf;
- tmp_stream.avail_in = decomp_buf_sz;
- tmp_stream.next_out = comp_buf;
- tmp_stream.avail_out = *inout_comp_buf_sz;
-
- ret = BZ2_bzCompressInit(&tmp_stream, 9, 1, 30);
- if(ret != BZ_OK)
+ int total_bytes = 0;
+ int tmp_in_bytes;
+ int tmp_out_bytes;
+ struct darshan_log_map *map_p;
+ bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
+
+ assert(bz_strmp);
+
+ /* if new log region, finish prev region's zstream and flush to log file */
+ if(region_id != fd->dz_prev_reg_id)
{
- return(-1);
+ /* error out if the region we are writing to precedes the previous
+ * region we wrote -- we shouldn't be moving backwards in the log
+ */
+ if(region_id < fd->dz_prev_reg_id)
+ return(-1);
+
+ if(fd->dz_prev_reg_id != DARSHAN_HEADER_REGION_ID)
+ {
+ ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id);
+ if(ret < 0)
+ return(-1);
+ }
+
+ fd->dz_prev_reg_id = region_id;
}
- /* compress data */
+ if(region_id == DARSHAN_JOB_REGION_ID)
+ map_p = &(fd->job_map);
+ else if(region_id == DARSHAN_REC_MAP_REGION_ID)
+ map_p = &(fd->rec_map);
+ else
+ map_p = &(fd->mod_map[region_id]);
+
+ bz_strmp->avail_in = len;
+ bz_strmp->next_in = buf;
+
+ /* compress input data until none left */
+ while(bz_strmp->avail_in)
+ {
+ /* if we are out of output, flush to log file */
+ if(bz_strmp->avail_out == 0)
+ {
+ assert(fd->dz_size == DARSHAN_DEF_COMP_BUF_SZ);
+
+ ret = darshan_log_dzunload(fd, map_p);
+ if(ret < 0)
+ return(-1);
+
+ bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
+ bz_strmp->next_out = (char *)fd->dz_buf;
+ }
+
+ tmp_in_bytes = bz_strmp->total_in_lo32;
+ tmp_out_bytes = bz_strmp->total_out_lo32;
+ ret = BZ2_bzCompress(bz_strmp, BZ_RUN);
+ if(ret != BZ_RUN_OK)
+ {
+ fprintf(stderr, "Error: unable to compress darshan log data.\n");
+ return(-1);
+ }
+ total_bytes += (bz_strmp->total_in_lo32 - tmp_in_bytes);
+ fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes);
+ }
+
+ return(total_bytes);
+}
+
+static int darshan_log_bzip2_flush(darshan_fd fd, int region_id)
+{
+ int ret;
+ int tmp_out_bytes;
+ struct darshan_log_map *map_p;
+ bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
+
+ assert(bz_strmp);
+
+ if(region_id == DARSHAN_JOB_REGION_ID)
+ map_p = &(fd->job_map);
+ else if(region_id == DARSHAN_REC_MAP_REGION_ID)
+ map_p = &(fd->rec_map);
+ else
+ map_p = &(fd->mod_map[region_id]);
+
+ /* make sure deflate finishes this stream */
+ bz_strmp->avail_in = 0;
+ bz_strmp->next_in = NULL;
do
{
- ret = BZ2_bzCompress(&tmp_stream, BZ_FINISH);
+ tmp_out_bytes = bz_strmp->total_out_lo32;
+ ret = BZ2_bzCompress(bz_strmp, BZ_FINISH);
if(ret < 0)
{
- BZ2_bzCompressEnd(&tmp_stream);
+ fprintf(stderr, "Error: unable to compress darshan log data.\n");
return(-1);
}
+ fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes);
+
+ if(fd->dz_size)
+ {
+ /* flush to file */
+ if(darshan_log_dzunload(fd, map_p) < 0)
+ return(-1);
+
+ bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
+ bz_strmp->next_out = (char *)fd->dz_buf;
+ }
} while (ret != BZ_STREAM_END);
- BZ2_bzCompressEnd(&tmp_stream);
- assert(tmp_stream.total_out_hi32 == 0);
- *inout_comp_buf_sz = tmp_stream.total_out_lo32;
+
+ BZ2_bzCompressEnd(bz_strmp);
+ BZ2_bzCompressInit(bz_strmp, 9, 1, 30);
return(0);
}
-#endif
+
#endif
static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map)
hooks/post-receive
--
More information about the Darshan-commits
mailing list