[Darshan-commits] [Darshan] branch, dev-modular, updated. darshan-2.3.1-178-gcb808ec

Service Account git at mcs.anl.gov
Fri Sep 18 17:26:46 CDT 2015


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "".

The branch, dev-modular has been updated
       via  cb808ec2ab449f156045621f0e3998a544387adc (commit)
      from  8659e6a33068878916580d5f69bb1d19101d72c9 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit cb808ec2ab449f156045621f0e3998a544387adc
Author: Shane Snyder <ssnyder at mcs.anl.gov>
Date:   Fri Sep 18 17:26:08 2015 -0500

    add bzip2 implementation to darshan-logutils

-----------------------------------------------------------------------

Summary of changes:
 darshan-util/darshan-logutils.c |  304 +++++++++++++++++++++++++++++----------
 1 files changed, 230 insertions(+), 74 deletions(-)


Diff of changes:
diff --git a/darshan-util/darshan-logutils.c b/darshan-util/darshan-logutils.c
index e3560c3..25725ac 100644
--- a/darshan-util/darshan-logutils.c
+++ b/darshan-util/darshan-logutils.c
@@ -41,6 +41,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len)
 static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int len);
 static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int len);
 static int darshan_log_libz_flush(darshan_fd fd, int region_id);
+#ifdef HAVE_LIBBZ2
+static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len);
+static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len);
+static int darshan_log_bzip2_flush(darshan_fd fd, int region_id);
+#endif
 static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map);
 static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p);
 
@@ -697,6 +702,12 @@ void darshan_log_close(darshan_fd fd)
                 ret = darshan_log_libz_flush(fd, fd->dz_prev_reg_id);
                 if(ret == 0)
                     break;
+#ifdef HAVE_LIBBZ2
+            case DARSHAN_BZIP2_COMP:
+                ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id);
+                if(ret == 0)
+                    break;
+#endif 
             default:
                 /* if flush fails, remove the output log file */
                 fd->err = -1;
@@ -934,6 +945,43 @@ static int darshan_log_dzinit(darshan_fd fd)
             fd->dz_strm = tmp_zstrm;
             break;
         }
+#ifdef HAVE_LIBBZ2
+        case DARSHAN_BZIP2_COMP:
+        {
+            bz_stream *tmp_bzstrm = malloc(sizeof(*tmp_bzstrm));
+            if(!tmp_bzstrm)
+            {
+                free(fd->dz_buf);
+                return(-1);
+            }
+            tmp_bzstrm->bzalloc = NULL;
+            tmp_bzstrm->bzfree = NULL;
+            tmp_bzstrm->opaque = NULL;
+            tmp_bzstrm->avail_in = 0;
+            tmp_bzstrm->next_in = Z_NULL;
+
+            if(fd->o_flags == O_RDONLY)
+            {
+                /* read only file, init decompress algorithm */
+                ret = BZ2_bzDecompressInit(tmp_bzstrm, 1, 0);
+            }
+            else
+            {
+                /* write only file, init compress algorithm */
+                ret = BZ2_bzCompressInit(tmp_bzstrm, 9, 1, 30);
+                tmp_bzstrm->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
+                tmp_bzstrm->next_out = (char *)fd->dz_buf;
+            }
+            if(ret != BZ_OK)
+            {
+                free(tmp_bzstrm);
+                free(fd->dz_buf);
+                return(-1);
+            }
+            fd->dz_strm = tmp_bzstrm;
+            break;
+        }
+#endif
         default:
             fprintf(stderr, "Error: invalid compression type.\n");
             return(-1);
@@ -948,15 +996,20 @@ static void darshan_log_dzdestroy(darshan_fd fd)
     {
         case DARSHAN_ZLIB_COMP:
             if(fd->o_flags == O_RDONLY)
-            {
                 inflateEnd(fd->dz_strm);
-            }
             else
-            {
                 deflateEnd(fd->dz_strm);
-            }
             free(fd->dz_strm);
             break;
+#ifdef HAVE_LIBBZ2
+        case DARSHAN_BZIP2_COMP:
+            if(fd->o_flags == O_RDONLY)
+                BZ2_bzDecompressEnd(fd->dz_strm);
+            else
+                BZ2_bzCompressEnd(fd->dz_strm);
+            free(fd->dz_strm);
+            break;
+#endif
         default:
             fprintf(stderr, "Error: invalid compression type.\n");
     }
@@ -974,6 +1027,11 @@ static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len)
         case DARSHAN_ZLIB_COMP:
             ret = darshan_log_libz_read(fd, region_id, buf, len);
             break;
+#ifdef HAVE_LIBBZ2
+        case DARSHAN_BZIP2_COMP:
+            ret = darshan_log_bzip2_read(fd, region_id, buf, len);
+            break;
+#endif
         default:
             fprintf(stderr, "Error: invalid compression type.\n");
             return(-1);
@@ -991,6 +1049,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len)
         case DARSHAN_ZLIB_COMP:
             ret = darshan_log_libz_write(fd, region_id, buf, len);
             break;
+#ifdef HAVE_LIBBZ2
+        case DARSHAN_BZIP2_COMP:
+            ret = darshan_log_bzip2_write(fd, region_id, buf, len);
+            break;
+#endif
         default:
             fprintf(stderr, "Error: invalid compression type.\n");
             return(-1);
@@ -1007,7 +1070,7 @@ static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int le
     struct darshan_log_map map;
     z_stream *z_strmp = (z_stream *)fd->dz_strm;
 
-    assert(fd->dz_strm);
+    assert(z_strmp);
 
     /* if new log region, we reload buffers and clear eor flag */
     if(region_id != fd->dz_prev_reg_id)
@@ -1080,7 +1143,7 @@ static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int l
     struct darshan_log_map *map_p;
     z_stream *z_strmp = (z_stream *)fd->dz_strm;
 
-    assert(fd->dz_strm);
+    assert(z_strmp);
 
     /* if new log region, finish prev region's zstream and flush to log file */
     if(region_id != fd->dz_prev_reg_id)
@@ -1149,6 +1212,8 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id)
     struct darshan_log_map *map_p;
     z_stream *z_strmp = (z_stream *)fd->dz_strm;
 
+    assert(z_strmp);
+
     if(region_id == DARSHAN_JOB_REGION_ID)
         map_p = &(fd->job_map);
     else if(region_id == DARSHAN_REC_MAP_REGION_ID)
@@ -1185,110 +1250,201 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id)
     return(0);
 }
 
-#if 0
 #ifdef HAVE_LIBBZ2
-static int darshan_bzip2_decomp(char* comp_buf, int comp_buf_sz,
-    char* decomp_buf, int* inout_decomp_buf_sz)
+
+static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len)
 {
     int ret;
-    int total_out = 0;
-    bz_stream tmp_stream;
-
-    memset(&tmp_stream, 0, sizeof(tmp_stream));
-    tmp_stream.bzalloc = NULL;
-    tmp_stream.bzfree = NULL;
-    tmp_stream.opaque = NULL;
-    tmp_stream.next_in = comp_buf;
-    tmp_stream.avail_in = comp_buf_sz;
-    tmp_stream.next_out = decomp_buf;
-    tmp_stream.avail_out = *inout_decomp_buf_sz;
-
-    ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0);
-    if(ret != BZ_OK)
+    int total_bytes = 0;
+    int tmp_out_bytes;
+    struct darshan_log_map map;
+    bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
+
+    assert(bz_strmp);
+
+    /* if new log region, we reload buffers and clear eor flag */
+    if(region_id != fd->dz_prev_reg_id)
     {
-        return(-1);
+        bz_strmp->avail_in = 0;
+        fd->dz_eor = 0;
+        fd->dz_prev_reg_id = region_id;
     }
 
-    /* while we have not finished consuming all of the uncompressed input data */
-    while(tmp_stream.avail_in)
+    if(region_id == DARSHAN_JOB_REGION_ID)
+        map = fd->job_map;
+    else if(region_id == DARSHAN_REC_MAP_REGION_ID)
+        map = fd->rec_map;
+    else
+        map = fd->mod_map[region_id];
+
+    bz_strmp->avail_out = len;
+    bz_strmp->next_out = buf;
+
+    /* we just decompress until the output buffer is full, assuming there
+     * is enough compressed data in file to satisfy the request size.
+     */
+    while(bz_strmp->avail_out)
     {
-        if(tmp_stream.avail_out == 0)
+        /* check if we need more compressed data */
+        if(bz_strmp->avail_in == 0)
         {
-            /* We ran out of buffer space for decompression.  In theory,
-             * we could just alloc more space, but probably just easier
-             * to bump up the default size of the output buffer.
+            /* if the eor flag is set, clear it and return -- future
+             * reads of this log region will restart at the beginning
              */
-            BZ2_bzDecompressEnd(&tmp_stream);
-            return(-1);
+            if(fd->dz_eor)
+            {
+                fd->dz_eor = 0;
+                break;
+            }
+
+            /* read more data from input file */
+            ret = darshan_log_dzload(fd, map);
+            if(ret < 0)
+                return(-1);
+            assert(fd->dz_size > 0);
+
+            bz_strmp->avail_in = fd->dz_size;
+            bz_strmp->next_in = (char *)fd->dz_buf;
         }
 
-        /* decompress data */
-        ret = BZ2_bzDecompress(&tmp_stream);
-        if(ret != BZ_STREAM_END)
+        tmp_out_bytes = bz_strmp->total_out_lo32;
+        ret = BZ2_bzDecompress(bz_strmp);
+        if(ret != BZ_OK && ret != BZ_STREAM_END)
         {
-            BZ2_bzDecompressEnd(&tmp_stream);
+            fprintf(stderr, "Error: unable to decompress darshan log data.\n");
             return(-1);
         }
+        total_bytes += (bz_strmp->total_out_lo32 - tmp_out_bytes);
 
-        assert(tmp_stream.total_out_hi32 == 0);
-        total_out += tmp_stream.total_out_lo32;
-        if(tmp_stream.avail_in)
+        /* reset the decompression if we encountered end of stream */
+        if(ret == BZ_STREAM_END)
         {
-            /* reinitialize bzip2 stream, we have more data to
-             * decompress
-             */
-            BZ2_bzDecompressEnd(&tmp_stream);
-            ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0);
-            if(ret != BZ_OK)
-            {
-                return(-1);
-            }
+            BZ2_bzDecompressEnd(bz_strmp);
+            BZ2_bzDecompressInit(bz_strmp, 1, 0);
         }
     }
-    BZ2_bzDecompressEnd(&tmp_stream);
 
-    *inout_decomp_buf_sz = total_out;
-    return(0);
+    return(total_bytes);
 }
 
-static int darshan_bzip2_comp(char* decomp_buf, int decomp_buf_sz,
-    char* comp_buf, int* inout_comp_buf_sz)
+static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len)
 {
     int ret;
-    bz_stream tmp_stream;
-
-    memset(&tmp_stream, 0, sizeof(tmp_stream));
-    tmp_stream.bzalloc = NULL;
-    tmp_stream.bzfree = NULL;
-    tmp_stream.opaque = NULL;
-    tmp_stream.next_in = decomp_buf;
-    tmp_stream.avail_in = decomp_buf_sz;
-    tmp_stream.next_out = comp_buf;
-    tmp_stream.avail_out = *inout_comp_buf_sz;
-
-    ret = BZ2_bzCompressInit(&tmp_stream, 9, 1, 30);
-    if(ret != BZ_OK)
+    int total_bytes = 0;
+    int tmp_in_bytes;
+    int tmp_out_bytes;
+    struct darshan_log_map *map_p;
+    bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
+
+    assert(bz_strmp);
+
+    /* if new log region, finish prev region's zstream and flush to log file */
+    if(region_id != fd->dz_prev_reg_id)
     {
-        return(-1);
+        /* error out if the region we are writing to precedes the previous
+         * region we wrote -- we shouldn't be moving backwards in the log
+         */
+        if(region_id < fd->dz_prev_reg_id)
+            return(-1);
+
+        if(fd->dz_prev_reg_id != DARSHAN_HEADER_REGION_ID)
+        {
+            ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id);
+            if(ret < 0)
+                return(-1);
+        }
+
+        fd->dz_prev_reg_id = region_id;
     }
 
-    /* compress data */
+    if(region_id == DARSHAN_JOB_REGION_ID)
+        map_p = &(fd->job_map);
+    else if(region_id == DARSHAN_REC_MAP_REGION_ID)
+        map_p = &(fd->rec_map);
+    else
+        map_p = &(fd->mod_map[region_id]);
+
+    bz_strmp->avail_in = len;
+    bz_strmp->next_in = buf;
+
+    /* compress input data until none left */
+    while(bz_strmp->avail_in)
+    {
+        /* if we are out of output, flush to log file */
+        if(bz_strmp->avail_out == 0)
+        {
+            assert(fd->dz_size == DARSHAN_DEF_COMP_BUF_SZ);
+
+            ret = darshan_log_dzunload(fd, map_p);
+            if(ret < 0)
+                return(-1);
+
+            bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
+            bz_strmp->next_out = (char *)fd->dz_buf;
+        }
+
+        tmp_in_bytes = bz_strmp->total_in_lo32;
+        tmp_out_bytes = bz_strmp->total_out_lo32;
+        ret = BZ2_bzCompress(bz_strmp, BZ_RUN);
+        if(ret != BZ_RUN_OK)
+        {
+            fprintf(stderr, "Error: unable to compress darshan log data.\n");
+            return(-1);
+        }
+        total_bytes += (bz_strmp->total_in_lo32 - tmp_in_bytes);
+        fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes);
+    }
+
+    return(total_bytes);
+}
+
+static int darshan_log_bzip2_flush(darshan_fd fd, int region_id)
+{
+    int ret;
+    int tmp_out_bytes;
+    struct darshan_log_map *map_p;
+    bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
+
+    assert(bz_strmp);
+
+    if(region_id == DARSHAN_JOB_REGION_ID)
+        map_p = &(fd->job_map);
+    else if(region_id == DARSHAN_REC_MAP_REGION_ID)
+        map_p = &(fd->rec_map);
+    else
+        map_p = &(fd->mod_map[region_id]);
+
+    /* make sure deflate finishes this stream */
+    bz_strmp->avail_in = 0;
+    bz_strmp->next_in = NULL;
     do
     {
-        ret = BZ2_bzCompress(&tmp_stream, BZ_FINISH);
+        tmp_out_bytes = bz_strmp->total_out_lo32;
+        ret = BZ2_bzCompress(bz_strmp, BZ_FINISH);
         if(ret < 0)
         {
-            BZ2_bzCompressEnd(&tmp_stream);
+            fprintf(stderr, "Error: unable to compress darshan log data.\n");
             return(-1);
         }
+        fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes);
+
+        if(fd->dz_size)
+        {
+            /* flush to file */
+            if(darshan_log_dzunload(fd, map_p) < 0)
+                return(-1);
+
+            bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
+            bz_strmp->next_out = (char *)fd->dz_buf;
+        }
     } while (ret != BZ_STREAM_END);
-    BZ2_bzCompressEnd(&tmp_stream);
 
-    assert(tmp_stream.total_out_hi32 == 0);
-    *inout_comp_buf_sz = tmp_stream.total_out_lo32;
+    
+    BZ2_bzCompressEnd(bz_strmp);
+    BZ2_bzCompressInit(bz_strmp, 9, 1, 30);
     return(0);
 }
-#endif
+
 #endif
 
 static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map)


hooks/post-receive
--



More information about the Darshan-commits mailing list