[Darshan-commits] [Git][darshan/darshan][dev-stdio] implement shared reduction for stdio records

Philip Carns xgitlab at cels.anl.gov
Tue May 24 15:54:58 CDT 2016


Philip Carns pushed to branch dev-stdio at darshan / darshan


Commits:
3d7cc3c6 by Phil Carns at 2016-05-24T16:54:43-04:00
implement shared reduction for stdio records

- - - - -


2 changed files:

- darshan-runtime/lib/darshan-stdio.c
- darshan-stdio-log-format.h


Changes:

=====================================
darshan-runtime/lib/darshan-stdio.c
=====================================
--- a/darshan-runtime/lib/darshan-stdio.c
+++ b/darshan-runtime/lib/darshan-stdio.c
@@ -5,7 +5,6 @@
  */
 
 /* TODO list (general) for this module:
- * - implement reduction operator
  * - add stdio page to darshan-job-summary
  * - add regression test cases for all functions captured here
  *   - especially the scanf and printf variants with variable arguments
@@ -220,6 +219,9 @@ static void stdio_begin_shutdown(void);
 static void stdio_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
     int shared_rec_count, void **stdio_buf, int *stdio_buf_sz);
 static void stdio_shutdown(void);
+static void stdio_record_reduction_op(void* infile_v, void* inoutfile_v,
+    int *len, MPI_Datatype *datatype);
+static int stdio_record_compare(const void* a, const void* b);
 
 #define STDIO_LOCK() pthread_mutex_lock(&stdio_runtime_mutex)
 #define STDIO_UNLOCK() pthread_mutex_unlock(&stdio_runtime_mutex)
@@ -1152,14 +1154,168 @@ static void stdio_get_output_data(
     void **stdio_buf,
     int *stdio_buf_sz)
 {
+    struct stdio_file_runtime *file;
+    int i;
+    struct darshan_stdio_record *red_send_buf = NULL;
+    struct darshan_stdio_record *red_recv_buf = NULL;
+    MPI_Datatype red_type;
+    MPI_Op red_op;
+
     assert(stdio_runtime);
 
+    /* if there are globally shared files, do a shared file reduction */
+    /* NOTE: the shared file reduction is also skipped if the 
+     * DARSHAN_DISABLE_SHARED_REDUCTION environment variable is set.
+     */
+    if(shared_rec_count && !getenv("DARSHAN_DISABLE_SHARED_REDUCTION"))
+    {
+        /* necessary initialization of shared records */
+        for(i = 0; i < shared_rec_count; i++)
+        {
+            HASH_FIND(hlink, stdio_runtime->file_hash, &shared_recs[i],
+                sizeof(darshan_record_id), file);
+            assert(file);
+
+            file->file_record->rank = -1;
+        }
+
+        /* sort the array of files descending by rank so that we get all of the 
+         * shared files (marked by rank -1) in a contiguous portion at end 
+         * of the array
+         */
+        qsort(stdio_runtime->file_record_array, stdio_runtime->file_array_ndx,
+            sizeof(struct darshan_stdio_record), stdio_record_compare);
+
+        /* make *send_buf point to the shared files at the end of sorted array */
+        red_send_buf =
+            &(stdio_runtime->file_record_array[stdio_runtime->file_array_ndx-shared_rec_count]);
+
+        /* allocate memory for the reduction output on rank 0 */
+        if(my_rank == 0)
+        {
+            red_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_stdio_record));
+            if(!red_recv_buf)
+            {
+                return;
+            }
+        }
+
+        /* construct a datatype for a STDIO file record.  This is serving no purpose
+         * except to make sure we can do a reduction on proper boundaries
+         */
+        DARSHAN_MPI_CALL(PMPI_Type_contiguous)(sizeof(struct darshan_stdio_record),
+            MPI_BYTE, &red_type);
+        DARSHAN_MPI_CALL(PMPI_Type_commit)(&red_type);
+
+        /* register a STDIO file record reduction operator */
+        DARSHAN_MPI_CALL(PMPI_Op_create)(stdio_record_reduction_op, 1, &red_op);
+
+        /* reduce shared STDIO file records */
+        DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
+            shared_rec_count, red_type, red_op, 0, mod_comm);
+
+        /* clean up reduction state */
+        if(my_rank == 0)
+        {
+            int tmp_ndx = stdio_runtime->file_array_ndx - shared_rec_count;
+            memcpy(&(stdio_runtime->file_record_array[tmp_ndx]), red_recv_buf,
+                shared_rec_count * sizeof(struct darshan_stdio_record));
+            free(red_recv_buf);
+        }
+        else
+        {
+            stdio_runtime->file_array_ndx -= shared_rec_count;
+        }
+
+        DARSHAN_MPI_CALL(PMPI_Type_free)(&red_type);
+        DARSHAN_MPI_CALL(PMPI_Op_free)(&red_op);
+    }
+
     *stdio_buf = (void *)(stdio_runtime->file_record_array);
     *stdio_buf_sz = stdio_runtime->file_array_ndx * sizeof(struct darshan_stdio_record);
 
     return;
 }
 
+/* compare function for sorting file records by descending rank */
+static int stdio_record_compare(const void* a_p, const void* b_p)
+{
+    const struct darshan_stdio_record* a = a_p;
+    const struct darshan_stdio_record* b = b_p;
+
+    if(a->rank < b->rank)
+        return 1;
+    if(a->rank > b->rank)
+        return -1;
+
+    return 0;
+}
+
+static void stdio_record_reduction_op(void* infile_v, void* inoutfile_v,
+    int *len, MPI_Datatype *datatype)
+{
+    struct darshan_stdio_record tmp_file;
+    struct darshan_stdio_record *infile = infile_v;
+    struct darshan_stdio_record *inoutfile = inoutfile_v;
+    int i, j;
+
+    assert(stdio_runtime);
+
+    for(i=0; i<*len; i++)
+    {
+        memset(&tmp_file, 0, sizeof(struct darshan_stdio_record));
+        tmp_file.f_id = infile->f_id;
+        tmp_file.rank = -1;
+
+        /* sum */
+        for(j=STDIO_OPENS; j<=STDIO_BYTES_READ; j++)
+        {
+            tmp_file.counters[j] = infile->counters[j] + inoutfile->counters[j];
+        }
+        
+        /* max */
+        for(j=STDIO_MAX_BYTE_READ; j<=STDIO_MAX_BYTE_WRITTEN; j++)
+        {
+            if(infile->counters[j] > inoutfile->counters[j])
+                tmp_file.counters[j] = infile->counters[j];
+            else
+                tmp_file.counters[j] = inoutfile->counters[j];
+        }
+
+        /* sum */
+        for(j=STDIO_F_META_TIME; j<=STDIO_F_READ_TIME; j++)
+        {
+            tmp_file.fcounters[j] = infile->fcounters[j] + inoutfile->fcounters[j];
+        }
+
+        /* min non-zero (if available) value */
+        for(j=STDIO_F_OPEN_START_TIMESTAMP; j<=STDIO_F_READ_START_TIMESTAMP; j++)
+        {
+            if((infile->fcounters[j] < inoutfile->fcounters[j] &&
+               infile->fcounters[j] > 0) || inoutfile->fcounters[j] == 0) 
+                tmp_file.fcounters[j] = infile->fcounters[j];
+            else
+                tmp_file.fcounters[j] = inoutfile->fcounters[j];
+        }
+
+        /* max */
+        for(j=STDIO_F_OPEN_END_TIMESTAMP; j<=STDIO_F_READ_END_TIMESTAMP; j++)
+        {
+            if(infile->fcounters[j] > inoutfile->fcounters[j])
+                tmp_file.fcounters[j] = infile->fcounters[j];
+            else
+                tmp_file.fcounters[j] = inoutfile->fcounters[j];
+        }
+
+        /* update pointers */
+        *inoutfile = tmp_file;
+        inoutfile++;
+        infile++;
+    }
+
+    return;
+}
+
 static void stdio_shutdown()
 {
     struct stdio_file_runtime_ref *ref, *tmp;


=====================================
darshan-stdio-log-format.h
=====================================
--- a/darshan-stdio-log-format.h
+++ b/darshan-stdio-log-format.h
@@ -13,48 +13,48 @@
 #define STDIO_COUNTERS \
     /* count of fopens */\
     X(STDIO_OPENS) \
-    /* maximum byte (offset) written */\
-    X(STDIO_MAX_BYTE_WRITTEN) \
-    /* total bytes written */ \
-    X(STDIO_BYTES_WRITTEN) \
-    /* number of writes */ \
-    X(STDIO_WRITES) \
-    /* maximum byte (offset) read */\
-    X(STDIO_MAX_BYTE_READ) \
-    /* total bytes read */ \
-    X(STDIO_BYTES_READ) \
     /* number of reads */ \
     X(STDIO_READS) \
+    /* number of writes */ \
+    X(STDIO_WRITES) \
     /* count of seeks */\
     X(STDIO_SEEKS) \
     /* count of flushes */\
     X(STDIO_FLUSHES) \
+    /* total bytes written */ \
+    X(STDIO_BYTES_WRITTEN) \
+    /* total bytes read */ \
+    X(STDIO_BYTES_READ) \
+    /* maximum byte (offset) read */\
+    X(STDIO_MAX_BYTE_READ) \
+    /* maximum byte (offset) written */\
+    X(STDIO_MAX_BYTE_WRITTEN) \
     /* end of counters */\
     X(STDIO_NUM_INDICES)
 
 #define STDIO_F_COUNTERS \
+    /* cumulative meta time */\
+    X(STDIO_F_META_TIME) \
+    /* cumulative write time */\
+    X(STDIO_F_WRITE_TIME) \
+    /* cumulative read time */\
+    X(STDIO_F_READ_TIME) \
     /* timestamp of first open */\
     X(STDIO_F_OPEN_START_TIMESTAMP) \
-    /* timestamp of last open completion */\
-    X(STDIO_F_OPEN_END_TIMESTAMP) \
     /* timestamp of first close */\
     X(STDIO_F_CLOSE_START_TIMESTAMP) \
-    /* timestamp of last close completion */\
-    X(STDIO_F_CLOSE_END_TIMESTAMP) \
     /* timestamp of first write */\
     X(STDIO_F_WRITE_START_TIMESTAMP) \
-    /* timestamp of last write completion */\
-    X(STDIO_F_WRITE_END_TIMESTAMP) \
     /* timestamp of first read */\
     X(STDIO_F_READ_START_TIMESTAMP) \
+    /* timestamp of last open completion */\
+    X(STDIO_F_OPEN_END_TIMESTAMP) \
+    /* timestamp of last close completion */\
+    X(STDIO_F_CLOSE_END_TIMESTAMP) \
+    /* timestamp of last write completion */\
+    X(STDIO_F_WRITE_END_TIMESTAMP) \
     /* timestamp of last read completion */\
     X(STDIO_F_READ_END_TIMESTAMP) \
-    /* cumulative meta time */\
-    X(STDIO_F_META_TIME) \
-    /* cumulative write time */\
-    X(STDIO_F_WRITE_TIME) \
-    /* cumulative read time */\
-    X(STDIO_F_READ_TIME) \
     /* end of counters */\
     X(STDIO_F_NUM_INDICES)
 



View it on GitLab: https://xgitlab.cels.anl.gov/darshan/darshan/commit/3d7cc3c6996bc031c9876ca274cec36ad8db3ceb
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mcs.anl.gov/pipermail/darshan-commits/attachments/20160524/af7fc2a0/attachment.html>


More information about the Darshan-commits mailing list