[Darshan-commits] [Darshan] branch, dev-modular, updated. 8f85fd08a49f9c2862ae1636b3ed46f609806239

Service Account git at mcs.anl.gov
Fri Feb 20 16:57:43 CST 2015


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "".

The branch, dev-modular has been updated
       via  8f85fd08a49f9c2862ae1636b3ed46f609806239 (commit)
      from  efd500f045180f703b65ec537ed312c7bb13b40e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 8f85fd08a49f9c2862ae1636b3ed46f609806239
Author: Shane Snyder <ssnyder at mcs.anl.gov>
Date:   Fri Feb 20 16:56:39 2015 -0600

    log compression good to go, revamped logutils

-----------------------------------------------------------------------

Summary of changes:
 darshan-runtime/darshan-core.h      |    5 +
 darshan-runtime/lib/darshan-core.c  |  213 ++++++++++++++++++++++--------
 darshan-util/darshan-logutils.c     |  254 ++++++++++++++++++++++++++++++-----
 darshan-util/darshan-logutils.h     |    3 +-
 darshan-util/darshan-posix-parser.c |   42 ++++---
 5 files changed, 412 insertions(+), 105 deletions(-)


Diff of changes:
diff --git a/darshan-runtime/darshan-core.h b/darshan-runtime/darshan-core.h
index 7d98d71..8ecddaf 100644
--- a/darshan-runtime/darshan-core.h
+++ b/darshan-runtime/darshan-core.h
@@ -15,6 +15,10 @@
 /* TODO: enforce this when handing out ids */
 #define DARSHAN_CORE_MAX_RECORDS 1024
 
+/* default compression buffer size of 2 MiB */
+/* TODO: revisit this default size if we change memory per module */
+#define DARSHAN_COMP_BUF_SIZE (2 * 1024 * 1024)
+
 struct darshan_core_module
 {
     darshan_module_id id;
@@ -28,6 +32,7 @@ struct darshan_core_runtime
     char exe[DARSHAN_EXE_LEN+1];
     struct darshan_core_record_ref *rec_hash;
     struct darshan_core_module* mod_array[DARSHAN_MAX_MODS];
+    char comp_buf[DARSHAN_COMP_BUF_SIZE];
     double wtime_offset;
     char *trailing_data;
 };
diff --git a/darshan-runtime/lib/darshan-core.c b/darshan-runtime/lib/darshan-core.c
index 0bf770e..419291f 100644
--- a/darshan-runtime/lib/darshan-core.c
+++ b/darshan-runtime/lib/darshan-core.c
@@ -69,8 +69,10 @@ static void darshan_get_shared_record_ids(
     struct darshan_core_runtime *core, darshan_record_id *shared_recs);
 static int darshan_log_coll_open(
     char *logfile_name, MPI_File *log_fh);
+static int darshan_compress_buffer(void **pointers, int *lengths,
+    int count, char *comp_buf, int *comp_length);
 static int darshan_log_write_record_hash(
-    MPI_File log_fh, struct darshan_core_record_ref *rec_hash,
+    MPI_File log_fh, struct darshan_core_runtime *core,
     darshan_record_id *shared_recs, struct darshan_log_map *map);
 static int darshan_log_coll_write(
     MPI_File log_fh, void *buf, int count, struct darshan_log_map *map);
@@ -139,7 +141,6 @@ static void darshan_core_initialize(int *argc, char ***argv)
     if(internal_timing_flag)
         init_start = DARSHAN_MPI_CALL(PMPI_Wtime)();
 
-    DARSHAN_CORE_LOCK();
     /* setup darshan runtime if darshan is enabled and hasn't been initialized already */
     if(!getenv("DARSHAN_DISABLE") && !darshan_core)
     {
@@ -191,7 +192,6 @@ static void darshan_core_initialize(int *argc, char ***argv)
             darshan_core->trailing_data = darshan_get_exe_and_mounts(darshan_core);
         }
     }
-    DARSHAN_CORE_UNLOCK();
 
     if(internal_timing_flag)
     {
@@ -230,7 +230,7 @@ static void darshan_core_shutdown()
     long offset;
     struct darshan_header log_header;
     MPI_File log_fh;
-    MPI_Offset tmp_off;
+    MPI_Offset tmp_off = 0;
     MPI_Status status;
 
     if(getenv("DARSHAN_INTERNAL_TIMING"))
@@ -356,30 +356,37 @@ static void darshan_core_shutdown()
         return;
     }
 
-    /* rank 0 is responsible for writing the darshan job information */
+    /* rank 0 is responsible for writing the compressed darshan job information */
     if(my_rank == 0)
     {
-        unsigned char tmp_buf[DARSHAN_JOB_RECORD_SIZE];
-        unsigned char *tmp_ptr;
-
-        /* pack the job info and exe/mount info into a buffer for writing */
-        tmp_ptr = tmp_buf;
-        memcpy(tmp_ptr, &final_core->log_job, sizeof(struct darshan_job));
-        tmp_ptr += sizeof(struct darshan_job);
-        memcpy(tmp_ptr, final_core->trailing_data, DARSHAN_EXE_LEN+1);
-
-        /* write the job information, making sure to prealloc space for the log header */
-        all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, sizeof(struct darshan_header),
-                tmp_buf, DARSHAN_JOB_RECORD_SIZE, MPI_BYTE, &status);
-        if(all_ret != MPI_SUCCESS)
+        void *pointers[2] = {&final_core->log_job, final_core->trailing_data};
+        int lengths[2] = {sizeof(struct darshan_job), DARSHAN_EXE_LEN+1};
+        int comp_buf_sz = 0;
+        
+        /* compress the job info and the trailing mount/exe data */
+        all_ret = darshan_compress_buffer(pointers, lengths, 2,
+            final_core->comp_buf, &comp_buf_sz);
+        if(all_ret)
         {
-            fprintf(stderr, "darshan library warning: unable to write job data to log file %s\n",
-                    logfile_name);
+            fprintf(stderr, "darshan library warning: unable to compress job data\n");
             unlink(logfile_name);
         }
+        else
+        {
+            /* write the job information, preallocing space for the log header */
+            all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(
+                log_fh, sizeof(struct darshan_header), final_core->comp_buf,
+                comp_buf_sz, MPI_BYTE, &status);
+            if(all_ret != MPI_SUCCESS)
+            {
+                fprintf(stderr, "darshan library warning: unable to write job data to log file %s\n",
+                        logfile_name);
+                unlink(logfile_name);
+            }
 
-        /* TODO: after compression is added, this should be fixed */
-        log_header.rec_map.off = sizeof(struct darshan_header) + DARSHAN_JOB_RECORD_SIZE;
+            /* set the beginning offset of record hash, which precedes job info just written */
+            log_header.rec_map.off = sizeof(struct darshan_header) + comp_buf_sz;
+        }
     }
 
     /* error out if unable to write job information */
@@ -392,8 +399,7 @@ static void darshan_core_shutdown()
     }
 
     /* write the record name->id hash to the log file */
-    ret = darshan_log_write_record_hash(log_fh, final_core->rec_hash,
-        shared_recs, &log_header.rec_map);
+    ret = darshan_log_write_record_hash(log_fh, final_core, shared_recs, &log_header.rec_map);
 
     /* error out if unable to write record hash */
     DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
@@ -402,7 +408,7 @@ static void darshan_core_shutdown()
     {
         if(my_rank == 0)
         {
-            fprintf(stderr, "darshan library warning: unable to write record map to log file %s\n",
+            fprintf(stderr, "darshan library warning: unable to write record hash to log file %s\n",
                 logfile_name);
             unlink(logfile_name);
         }
@@ -423,7 +429,8 @@ static void darshan_core_shutdown()
         struct darshan_core_module* this_mod = final_core->mod_array[i];
         MPI_Comm mod_comm;
         void* mod_buf = NULL;
-        int mod_buf_size = 0;
+        int mod_buf_sz = 0;
+        int comp_buf_sz = 0;
 
         if(!global_mod_use_count[i])
         {
@@ -447,19 +454,33 @@ static void darshan_core_shutdown()
         if(local_mod_use[i])
         {
             /* get output buffer from module */
-            this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_size);
+            this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_sz);
         }
 
-        /* set the starting offset of this module */
-        if(tmp_off == 0)
-            tmp_off = log_header.rec_map.off + log_header.rec_map.len;
+        /* compress the module buffer */
+        ret = darshan_compress_buffer((void**)&mod_buf, &mod_buf_sz, 1,
+            final_core->comp_buf, &comp_buf_sz);
+        if(ret == 0)
+        {
+            /* set the starting offset of this module */
+            if(tmp_off == 0)
+                tmp_off = log_header.rec_map.off + log_header.rec_map.len;
 
-        log_header.mod_map[i].off = tmp_off;
+            log_header.mod_map[i].off = tmp_off;
 
-        /* write module data buffer to the darshan log file */
-        ret = darshan_log_coll_write(log_fh, mod_buf, mod_buf_size, &log_header.mod_map[i]);
+            /* write (compressed) module data buffer to the darshan log file */
+            ret = darshan_log_coll_write(log_fh, final_core->comp_buf, comp_buf_sz,
+                &log_header.mod_map[i]);
+        }
+        else
+        {
+            /* error in compression, participate in collective write to avoid
+             * deadlock, but preserve return value to terminate darshan_shutdown.
+             */
+            (void)darshan_log_coll_write(log_fh, NULL, 0, &log_header.mod_map[i]);
+        }
 
-        /* error out if unable to write this module's data */
+        /* error out if the compression or collective write failed */
         DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
             MPI_LOR, MPI_COMM_WORLD);
         if(all_ret != 0)
@@ -1050,10 +1071,81 @@ static int darshan_log_coll_open(char *logfile_name, MPI_File *log_fh)
     return(0);
 }
 
+static int darshan_compress_buffer(void **pointers, int *lengths, int count,
+    char *comp_buf, int *comp_length)
+{
+    int ret = 0;
+    int i;
+    int total_target = 0;
+    z_stream tmp_stream;
+
+    memset(&tmp_stream, 0, sizeof(tmp_stream));
+    tmp_stream.zalloc = Z_NULL;
+    tmp_stream.zfree = Z_NULL;
+    tmp_stream.opaque = Z_NULL;
+
+    /* initialize the zlib compression parameters */
+    /* TODO: check these parameters? */
+//    ret = deflateInit2(&tmp_stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
+//        31, 8, Z_DEFAULT_STRATEGY);
+    ret = deflateInit(&tmp_stream, Z_DEFAULT_COMPRESSION);
+    if(ret != Z_OK)
+    {
+        return(-1);
+    }
+
+    tmp_stream.next_out = comp_buf;
+    tmp_stream.avail_out = DARSHAN_COMP_BUF_SIZE;
+
+    /* loop over the input pointers */
+    for(i = 0; i < count; i++)
+    {
+        total_target += lengths[i];
+        tmp_stream.next_in = pointers[i];
+        tmp_stream.avail_in = lengths[i];
+        /* while we have not finished consuming all of the data available to
+         * this point in the loop
+         */
+        while(tmp_stream.total_in < total_target)
+        {
+            if(tmp_stream.avail_out == 0)
+            {
+                /* We ran out of buffer space for compression.  In theory,
+                 * we could start using some of the file_array buffer space
+                 * without having to malloc again.  In practice, this case 
+                 * is going to be practically impossible to hit.
+                 */
+                deflateEnd(&tmp_stream);
+                return(-1);
+            }
+
+            /* compress data */
+            ret = deflate(&tmp_stream, Z_NO_FLUSH);
+            if(ret != Z_OK)
+            {
+                deflateEnd(&tmp_stream);
+                return(-1);
+            }
+        }
+    }
+
+    /* flush compression and end */
+    ret = deflate(&tmp_stream, Z_FINISH);
+    if(ret != Z_STREAM_END)
+    {
+        deflateEnd(&tmp_stream);
+        return(-1);
+    }
+    deflateEnd(&tmp_stream);
+
+    *comp_length = tmp_stream.total_out;
+    return(0);
+}
+
 /* NOTE: the map written to file may contain duplicate id->name entries if a
  *       record is opened by multiple ranks, but not all ranks
  */
-static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_record_ref *rec_hash,
+static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_runtime *core,
     darshan_record_id *shared_recs, struct darshan_log_map *map)
 {
     int i;
@@ -1062,20 +1154,20 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
     uint32_t name_len;
     size_t record_sz;
     size_t hash_buf_sz = 0;
-    unsigned char *hash_buf;
-    unsigned char *hash_buf_off;
+    char *hash_buf;
+    char *hash_buf_off;
     MPI_Status status;
 
-    /* non-root ranks (rank > 0) remove shared records from their map --
+    /* non-root ranks (rank > 0) remove shared records from their hash --
      * these records will be written by rank 0
      */
     if(my_rank > 0)
     {
         for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && shared_recs[i]); i++)
         {
-            HASH_FIND(hlink, rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
+            HASH_FIND(hlink, core->rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
             assert(ref); /* this id had better be in the hash ... */
-            HASH_DELETE(hlink, rec_hash, ref);
+            HASH_DELETE(hlink, core->rec_hash, ref);
             if(ref->rec.name) free(ref->rec.name);
             free(ref);
         }
@@ -1090,19 +1182,19 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
         return(-1);
     }
 
-    /* serialize the record map into a buffer for writing */
+    /* serialize the record hash into a buffer for writing */
     hash_buf_off = hash_buf;
-    HASH_ITER(hlink, rec_hash, ref, tmp)
+    HASH_ITER(hlink, core->rec_hash, ref, tmp)
     {
         name_len = strlen(ref->rec.name);
         record_sz = sizeof(darshan_record_id) + sizeof(int) + name_len;
         /* make sure there is room in the buffer for this record */
         if((hash_buf_off + record_sz) > (hash_buf + hash_buf_sz))
         {
-            unsigned char *tmp_buf;
+            char *tmp_buf;
             size_t old_buf_sz;
 
-            /* if no room, reallocate the map buffer at twice the current size */
+            /* if no room, reallocate the hash buffer at twice the current size */
             old_buf_sz = hash_buf_off - hash_buf;
             hash_buf_sz *= 2;
             tmp_buf = malloc(hash_buf_sz);
@@ -1118,8 +1210,8 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
             hash_buf_off = hash_buf + old_buf_sz;
         }
 
-        /* now serialize the record into the map buffer.
-         * NOTE: darshan record map serialization method: 
+        /* now serialize the record into the hash buffer.
+         * NOTE: darshan record hash serialization method: 
          *          ... darshan_record_id | (uint32_t) path_len | path ...
          */
         *((darshan_record_id *)hash_buf_off) = ref->rec.id;
@@ -1130,11 +1222,27 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
         hash_buf_off += name_len;
     }
 
-    /* collectively write out the record map to the darshan log */
+    /* collectively write out the record hash to the darshan log */
     if(hash_buf_off > hash_buf)
     {
-        /* we have records to contribute to the collective write of the record map */
-        ret = darshan_log_coll_write(log_fh, hash_buf, (hash_buf_off-hash_buf), map);
+        int hash_buf_sz = hash_buf_off - hash_buf;
+        int comp_buf_sz = 0;
+
+        /* compress the record hash buffer */
+        ret = darshan_compress_buffer((void **)&hash_buf, &hash_buf_sz, 1,
+            core->comp_buf, &comp_buf_sz);
+        if(ret < 0)
+        {
+            /* participate in collective write to avoid deadlock, but preserve
+             * the compression error to pass back to the caller.
+             */
+            (void)darshan_log_coll_write(log_fh, NULL, 0, map);
+        }
+        else
+        {
+            /* we have records to contribute to the write of the record hash */
+            ret = darshan_log_coll_write(log_fh, core->comp_buf, comp_buf_sz, map);
+        }
     }
     else
     {
@@ -1217,14 +1325,11 @@ void darshan_core_register_module(
     struct darshan_core_module* mod;
     *runtime_mem_limit = 0;
 
-    DARSHAN_CORE_LOCK();
     if(!darshan_core || (id >= DARSHAN_MAX_MODS))
-    {
-        DARSHAN_CORE_UNLOCK();
         return;
-    }
 
     /* see if this module is already registered */
+    DARSHAN_CORE_LOCK();
     if(darshan_core->mod_array[id])
     {
         /* if module is already registered just return */
@@ -1273,8 +1378,8 @@ void darshan_core_lookup_record_id(
     /* hash the input name to get a unique id for this record */
     tmp_id = darshan_hash(name, len, 0);
 
-    DARSHAN_CORE_LOCK();
     /* check to see if we've already stored the id->name mapping for this record */
+    DARSHAN_CORE_LOCK();
     HASH_FIND(hlink, darshan_core->rec_hash, &tmp_id, sizeof(darshan_record_id), ref);
     if(!ref)
     {
diff --git a/darshan-util/darshan-logutils.c b/darshan-util/darshan-logutils.c
index 7fe7632..a801157 100644
--- a/darshan-util/darshan-logutils.c
+++ b/darshan-util/darshan-logutils.c
@@ -23,6 +23,9 @@
 
 #include "darshan-logutils.h"
 
+/* default to a compression buffer size of 4 MiB */
+#define DARSHAN_DEF_DECOMP_BUF_SZ (4*1024*1024)
+
 struct darshan_fd_s
 {
     int pf;
@@ -38,7 +41,8 @@ struct darshan_fd_s
 static int darshan_log_seek(darshan_fd fd, off_t offset);
 static int darshan_log_read(darshan_fd fd, void *buf, int len);
 static int darshan_log_write(darshan_fd fd, void *buf, int len);
-
+static int darshan_decompress_buffer(char *comp_buf, int comp_buf_sz,
+    char *decomp_buf, int *inout_decomp_buf_sz);
 
 /* darshan_log_open()
  *
@@ -150,24 +154,45 @@ int darshan_log_getheader(darshan_fd fd, struct darshan_header *header)
  */
 int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
 {
+    char *comp_buf;
     char job_buf[DARSHAN_JOB_RECORD_SIZE] = {0};
+    int job_buf_sz = DARSHAN_JOB_RECORD_SIZE;
     int ret;
 
+    /* allocate a buffer to store the (compressed) darshan job info */
+    comp_buf = malloc(fd->job_map.len);
+    if(!comp_buf)
+        return(-1);
+
     ret = darshan_log_seek(fd, fd->job_map.off);
     if(ret < 0)
     {
         fprintf(stderr, "Error: unable to seek in darshan log file.\n");
+        free(comp_buf);
         return(ret);
     }
 
     /* read the job data from the log file */
-    ret = darshan_log_read(fd, job_buf, fd->job_map.len);
+    ret = darshan_log_read(fd, comp_buf, fd->job_map.len);
     if(ret < fd->job_map.len)
     {
         fprintf(stderr, "Error: invalid darshan log file (failed to read job data).\n");
+        free(comp_buf);
         return(-1);
     }
 
+    /* decompress the job data */
+    ret = darshan_decompress_buffer(comp_buf, fd->job_map.len,
+        job_buf, &job_buf_sz);
+    if(ret < 0)
+    {
+        fprintf(stderr, "Error: unable to decompress darshan job data.\n");
+        free(comp_buf);
+        return(-1);
+    }
+    assert(job_buf_sz == DARSHAN_JOB_RECORD_SIZE);
+    free(comp_buf);
+
     memcpy(job, job_buf, sizeof(*job));
 
     if(fd->swap_flag)
@@ -181,7 +206,8 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
     }
 
     /* save trailing job data, so exe and mount information can be retrieved later */
-    fd->exe_mnt_data = malloc(DARSHAN_EXE_LEN+1);
+    if(!fd->exe_mnt_data)
+        fd->exe_mnt_data = malloc(DARSHAN_EXE_LEN+1);
     if(!fd->exe_mnt_data)
         return(-1);
     memcpy(fd->exe_mnt_data, &job_buf[sizeof(*job)], DARSHAN_EXE_LEN+1);
@@ -193,9 +219,17 @@ int darshan_log_getexe(darshan_fd fd, char *buf)
 {
     char *newline;
 
-    /* TODO: try reading log job one more time to set this buffer up */
+    /* if the exe/mount info has not been saved yet, read in the job
+     * header to get this data.
+     */
     if(!fd->exe_mnt_data)
-        return(-1);
+    {
+        struct darshan_job job;
+        (void)darshan_log_getjob(fd, &job);
+
+        if(!fd->exe_mnt_data)
+            return(-1);
+    }
 
     newline = strchr(fd->exe_mnt_data, '\n');
 
@@ -219,11 +253,18 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
     char *pos;
     int array_index = 0;
 
-    /* TODO: try reading log job one more time to set this buffer up */
+    /* if the exe/mount info has not been saved yet, read in the job
+     * header to get this data.
+     */
     if(!fd->exe_mnt_data)
-        return(-1);
+    {
+        struct darshan_job job;
+        (void)darshan_log_getjob(fd, &job);
+
+        if(!fd->exe_mnt_data)
+            return(-1);
+    }
 
-    /* count entries */
     *count = 0;
     pos = fd->exe_mnt_data;
     while((pos = strchr(pos, '\n')) != NULL)
@@ -278,37 +319,51 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
  */
 int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
 {
-    unsigned char *hash_buf;
-    unsigned char *buf_ptr;
+    char *comp_buf;
+    char hash_buf[DARSHAN_DEF_DECOMP_BUF_SZ] = {0};
+    int hash_buf_sz = DARSHAN_DEF_DECOMP_BUF_SZ;
+    char *buf_ptr;
     darshan_record_id *rec_id_ptr;
     uint32_t *path_len_ptr;
     char *path_ptr;
     struct darshan_record_ref *ref;
     int ret;
 
+    /* allocate a buffer to store the (compressed, serialized) darshan record hash */
+    comp_buf = malloc(fd->rec_map.len);
+    if(!comp_buf)
+        return(-1);
+
     ret = darshan_log_seek(fd, fd->rec_map.off);
     if(ret < 0)
     {
         fprintf(stderr, "Error: unable to seek in darshan log file.\n");
+        free(comp_buf);
         return(ret);
     }
 
-    /* allocate a buffer to store the (serialized) darshan record hash */
-    hash_buf = malloc(fd->rec_map.len);
-    if(!hash_buf)
-        return(-1);
-
-    /* read the record map from the log file */
-    ret = darshan_log_read(fd, hash_buf, fd->rec_map.len);
+    /* read the record hash from the log file */
+    ret = darshan_log_read(fd, comp_buf, fd->rec_map.len);
     if(ret < fd->rec_map.len)
     {
         fprintf(stderr, "Error: invalid darshan log file (failed to read record hash).\n");
-        free(hash_buf);
+        free(comp_buf);
         return(-1);
     }
 
+    /* decompress the record hash buffer */
+    ret = darshan_decompress_buffer(comp_buf, fd->rec_map.len,
+        hash_buf, &hash_buf_sz);
+    if(ret < 0)
+    {
+        fprintf(stderr, "Error: unable to decompress darshan job data.\n");
+        free(comp_buf);
+        return(-1);
+    }
+    free(comp_buf);
+
     buf_ptr = hash_buf;
-    while(buf_ptr < (hash_buf + fd->rec_map.len))
+    while(buf_ptr < (hash_buf + hash_buf_sz))
     {
         /* get pointers for each field of this darshan record */
         /* NOTE: darshan record hash serialization method: 
@@ -324,13 +379,11 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
         ref = malloc(sizeof(*ref));
         if(!ref)
         {
-            free(hash_buf);
             return(-1);
         }
         ref->rec.name = malloc(*path_len_ptr + 1);
         if(!ref->rec.name)
         {
-            free(hash_buf);
             free(ref);
             return(-1);
         }
@@ -351,16 +404,88 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
         HASH_ADD(hlink, *hash, rec.id, sizeof(darshan_record_id), ref);
     }
 
-    free(hash_buf);
+    return(0);
+}
+
+int darshan_log_getmod(darshan_fd fd, int mod_id, void **mod_buf,
+    int *mod_buf_sz)
+{
+    char *comp_buf;
+    char *tmp_buf;
+    int tmp_buf_sz;
+    int ret;
+    *mod_buf = NULL;
+    *mod_buf_sz = 0;
+
+    if(mod_id < 0 || mod_id >= DARSHAN_MAX_MODS)
+    {
+        fprintf(stderr, "Error: invalid Darshan module id.\n");
+        return(-1);
+    }
+
+    if(fd->mod_map[mod_id].len == 0)
+    {
+        /* this module has no data in the log */
+        return(0);
+    }
+
+    comp_buf = malloc(fd->mod_map[mod_id].len);
+    if(!comp_buf)
+        return(-1);
+
+    ret = darshan_log_seek(fd, fd->mod_map[mod_id].off);
+    if(ret < 0)
+    {
+        fprintf(stderr, "Error: unable to seek in darshan log file.\n");
+        free(comp_buf);
+        return(ret);
+    }
+
+    /* read the given module's (compressed) data from the log file */
+    ret = darshan_log_read(fd, comp_buf, fd->mod_map[mod_id].len);
+    if(ret < fd->mod_map[mod_id].len)
+    {
+        fprintf(stderr, "Error: invalid darshan log file (failed to read module %s data).\n",
+            darshan_module_names[mod_id]);
+        free(comp_buf);
+        return(-1);
+    }
+
+    tmp_buf_sz = DARSHAN_DEF_DECOMP_BUF_SZ;
+    tmp_buf = malloc(DARSHAN_DEF_DECOMP_BUF_SZ);
+    if(!tmp_buf)
+    {
+        free(comp_buf);
+        return(-1);
+    }
+
+    /* decompress this module's data */
+    ret = darshan_decompress_buffer(comp_buf, fd->mod_map[mod_id].len, tmp_buf,
+        &tmp_buf_sz);
+    if(ret < 0)
+    {
+        fprintf(stderr, "Error: unable to decompress module %s data.\n",
+            darshan_module_names[mod_id]);
+        free(tmp_buf);
+        return(-1);
+    }
+    free(comp_buf);
+
+    /* pass back the final decompressed data pointer */
+    *mod_buf = tmp_buf;
+    *mod_buf_sz = tmp_buf_sz;
 
     return(0);
 }
 
+#if 0
 /* TODO: hardcoded for posix -- what can we do generally?
  *       different function for each module and a way to map to this function?
  */
 int darshan_log_getfile(darshan_fd fd, struct darshan_posix_file *file)
 {
+    char *comp_buf;
+    char hash_buf[DARSHAN_DEF_DECOMP_BUF_SZ] = {0};
     int ret;
     const char* err_string;
     int i;
@@ -416,6 +541,7 @@ int darshan_log_getfile(darshan_fd fd, struct darshan_posix_file *file)
     fprintf(stderr, "Error: %s\n", err_string);
     return(-1);
 }
+#endif
 
 /* darshan_log_close()
  *
@@ -436,6 +562,25 @@ void darshan_log_close(darshan_fd fd)
 
 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
 
+/* return 0 on successful seek to offset, -1 on failure.
+ */
+static int darshan_log_seek(darshan_fd fd, off_t offset)
+{
+    off_t ret_off;
+
+    if(fd->pos == offset)
+        return(0);
+
+    ret_off = lseek(fd->pf, offset, SEEK_SET);
+    if(ret_off == offset)
+    {
+        fd->pos = offset;
+        return(0);
+    }
+
+    return(-1);
+}
+
 /* return amount written on success, -1 on failure.
  */
 static int darshan_log_write(darshan_fd fd, void* buf, int len)
@@ -470,21 +615,64 @@ static int darshan_log_read(darshan_fd fd, void* buf, int len)
     return(-1);
 }
 
-/* return 0 on successful seek to offset, -1 on failure.
- */
-static int darshan_log_seek(darshan_fd fd, off_t offset)
+/* TODO bz2 compression support */
+static int darshan_decompress_buffer(char *comp_buf, int comp_buf_sz,
+    char *decomp_buf, int *inout_decomp_buf_sz)
 {
-    off_t ret_off;
+    int ret;
+    z_stream tmp_stream;
+
+    memset(&tmp_stream, 0, sizeof(tmp_stream));
+    tmp_stream.zalloc = Z_NULL;
+    tmp_stream.zfree = Z_NULL;
+    tmp_stream.opaque = Z_NULL;
+
+    /* initialize the zlib decompression parameters */
+    /* TODO: check these parameters? */
+    //ret = inflateInit2(&tmp_stream, 31);
+    ret = inflateInit(&tmp_stream);
+    if(ret != Z_OK)
+    {
+        return(-1);
+    }
 
-    if(fd->pos == offset)
-        return(0);
+    tmp_stream.next_in = comp_buf;
+    tmp_stream.avail_in = comp_buf_sz;
+    tmp_stream.next_out = decomp_buf;
+    tmp_stream.avail_out = *inout_decomp_buf_sz;
 
-    ret_off = lseek(fd->pf, offset, SEEK_SET);
-    if(ret_off == offset)
+    /* while we have not finished consuming all of the compressed input data */
+    while(tmp_stream.total_in < comp_buf_sz)
     {
-        fd->pos = offset;
-        return(0);
+        if(tmp_stream.avail_out == 0)
+        {
+            /* We ran out of buffer space for compression.  In theory,
+             * we could just alloc more space, but probably just easier
+             * to bump up the default size of the output buffer.
+             */
+            inflateEnd(&tmp_stream);
+            return(-1);
+        }
+
+        /* decompress data */
+        ret = inflate(&tmp_stream, Z_NO_FLUSH);
+        if(ret != Z_STREAM_END)
+        {
+            inflateEnd(&tmp_stream);
+            return(-1);
+        }
     }
+    inflateEnd(&tmp_stream);
 
-    return(-1);
+    *inout_decomp_buf_sz = tmp_stream.total_out;
+    return(0);
 }
+
+/*
+ * Local variables:
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ * End:
+ *
+ * vim: ts=8 sts=4 sw=4 expandtab
+ */
diff --git a/darshan-util/darshan-logutils.h b/darshan-util/darshan-logutils.h
index fbaa0ac..d1d2ee3 100644
--- a/darshan-util/darshan-logutils.h
+++ b/darshan-util/darshan-logutils.h
@@ -22,10 +22,11 @@ darshan_fd darshan_log_open(const char *name, const char* mode);
 int darshan_log_getheader(darshan_fd file, struct darshan_header *header);
 int darshan_log_getjob(darshan_fd file, struct darshan_job *job);
 int darshan_log_gethash(darshan_fd file, struct darshan_record_ref **hash);
-int darshan_log_getfile(darshan_fd fd, struct darshan_posix_file *file);
 int darshan_log_getexe(darshan_fd fd, char *buf);
 int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
     char*** fs_types, int* count);
+int darshan_log_getmod(darshan_fd fd, int mod_id, void **mod_buf,
+    int *mod_buf_sz);
 void darshan_log_close(darshan_fd file);
 
 /* convenience macros for printing out counters */
diff --git a/darshan-util/darshan-posix-parser.c b/darshan-util/darshan-posix-parser.c
index c4a453e..394d0e3 100644
--- a/darshan-util/darshan-posix-parser.c
+++ b/darshan-util/darshan-posix-parser.c
@@ -32,11 +32,12 @@ int main(int argc, char **argv)
     int mount_count;
     char** mnt_pts;
     char** fs_types;
-    struct darshan_posix_file next_rec;
     time_t tmp_time = 0;
     char *token;
     char *save;
     char buffer[DARSHAN_JOB_METADATA_LEN];
+    struct darshan_posix_file *posix_mod_buf;
+    int posix_mod_buf_sz = 0;
 
     assert(argc == 2);
     filename = argv[1];
@@ -80,8 +81,8 @@ int main(int argc, char **argv)
 
     /* print job summary */
     printf("# darshan log version: %s\n", header.version_string);
-    printf("# size of POSIX file statistics: %zu bytes\n", sizeof(next_rec));
-    printf("# size of job statistics: %zu bytes\n", sizeof(job));
+    printf("# size of POSIX file statistics: %zu bytes\n", sizeof(struct darshan_posix_file));
+    printf("# size of job statistics: %zu bytes\n", sizeof(struct darshan_job));
     printf("# exe: %s\n", tmp_string);
     printf("# uid: %" PRId64 "\n", job.uid);
     printf("# jobid: %" PRId64 "\n", job.jobid);
@@ -132,6 +133,13 @@ int main(int argc, char **argv)
         printf("# mount entry:\t%s\t%s\n", mnt_pts[i], fs_types[i]);
     }
 
+    uint64_t job_size = header.rec_map.off - sizeof(struct darshan_header);
+    printf("\nDarshan log file module sizes:\n");
+    printf("\tHEADER size = %"PRIu64"\n\tJOB size = %"PRIu64"\n\tREC_HASH size = %"PRIu64"\n",
+        sizeof(struct darshan_header), job_size, header.rec_map.len);
+    printf("\t%s MODULE size = %"PRIu64"\n", darshan_module_names[DARSHAN_POSIX_MOD],
+        header.mod_map[DARSHAN_POSIX_MOD].len);
+
     /* read hash of darshan records */
     ret = darshan_log_gethash(file, &rec_hash);
     if(ret < 0)
@@ -141,15 +149,18 @@ int main(int argc, char **argv)
         return(-1);
     }
 
-    /* try to retrieve first record (may not exist) */
-    ret = darshan_log_getfile(file, &next_rec);
-    if(ret < 0)
+    printf("\n*** FILE RECORD DATA ***\n");
+
+    /* get the log data for the POSIX module */
+    ret = darshan_log_getmod(file, DARSHAN_POSIX_MOD, (void**)&posix_mod_buf, &posix_mod_buf_sz);
+    if(ret < 0 || posix_mod_buf_sz % sizeof(struct darshan_posix_file))
     {
-        fprintf(stderr, "Error: failed to parse log file.\n");
-        fflush(stderr);
+        fprintf(stderr, "darshan_log_getmod() failed to read POSIX module data.\n");
+        darshan_log_close(file);
         return(-1);
     }
-    if(ret == 0)
+
+    if(posix_mod_buf_sz == 0)
     {
         /* it looks like the app didn't open any files */
         printf("# no files opened.\n");
@@ -157,11 +168,10 @@ int main(int argc, char **argv)
         return(0);
     }
 
-    /* iterate the posix file records stored in the darshan log */
-    printf("\n*** FILE RECORD DATA ***\n");
-    i = 0;
-    do{
-        struct darshan_record_ref *ref;
+    /* loop over the POSIX file records and print out counters */
+    for(i = 0; i < (posix_mod_buf_sz / sizeof(struct darshan_posix_file)); i++)
+    {
+        struct darshan_posix_file next_rec = posix_mod_buf[i];
 
         /* get the pathname for this record */
         HASH_FIND(hlink, rec_hash, &next_rec.f_id, sizeof(darshan_record_id), ref);
@@ -172,9 +182,7 @@ int main(int argc, char **argv)
         printf("\t\tPOSIX_OPENS:\t%"PRIu64"\n\t\tF_OPEN_TIMESTAMP:\t%lf\n\t\tF_CLOSE_TIMESTAMP:\t%lf\n",
             next_rec.counters[CP_POSIX_OPENS], next_rec.fcounters[CP_F_OPEN_TIMESTAMP],
             next_rec.fcounters[CP_F_CLOSE_TIMESTAMP]);
-
-        i++;
-    } while((ret = darshan_log_getfile(file, &next_rec)) == 1);
+    }
 
     /* free mount info */
     for(i=0; i<mount_count; i++)


hooks/post-receive
--



More information about the Darshan-commits mailing list