[Darshan-commits] [Git][darshan/darshan][mmap-dev] 3 commits: allow job end time to be set by darshan-merge

Shane Snyder xgitlab at cels.anl.gov
Tue Mar 1 17:44:53 CST 2016


Shane Snyder pushed to branch mmap-dev at darshan / darshan


Commits:
f26baefc by Shane Snyder at 2016-02-26T12:34:02-06:00
allow job end time to be set by darshan-merge

- - - - -
d4a3d112 by Shane Snyder at 2016-02-29T17:22:57-06:00
add variance counter reduction logic to logutils

- - - - -
04063da7 by Shane Snyder at 2016-03-01T17:43:38-06:00
common access counters are now sorted in logs

common value counters are sorted first by decreasing counts,
and then by decreasing size.

- - - - -


5 changed files:

- darshan-runtime/darshan-common.h
- darshan-runtime/share/darshan-mmap-epilog.sh.in
- darshan-util/darshan-merge.c
- darshan-util/darshan-mpiio-logutils.c
- darshan-util/darshan-posix-logutils.c


Changes:

=====================================
darshan-runtime/darshan-common.h
=====================================
--- a/darshan-runtime/darshan-common.h
+++ b/darshan-runtime/darshan-common.h
@@ -66,7 +66,10 @@
 } while(0)
 
 /* potentially set or increment a common value counter, depending on the __count
- * for the given __value
+ * for the given __value. This macro ensures common values are stored first in
+ * decreasing order of their total count, and second by decreasing order of
+ * their value.
+
  *
  * NOTE: This macro is hardcoded to expect that Darshan will only track the 4
  * most common (i.e., frequently occuring) values. __val_p is a pointer to the
@@ -78,32 +81,50 @@
  */
 #define DARSHAN_COMMON_VAL_COUNTER_INC(__val_p, __cnt_p, __value, __count, __online_flag) do {\
     int i; \
-    int set = 0; \
-    int64_t min = *(__cnt_p); \
-    int min_index = 0; \
-    int inc_count; \
+    int inc_count, total_count; \
+    int64_t tmp_val[4] = {0}; \
+    int64_t tmp_cnt[4] = {0}; \
+    int tmp_ndx = 0; \
     if(__value == 0) break; \
     if(__online_flag) \
         inc_count = 1; \
     else \
         inc_count = __count; \
     for(i=0; i<4; i++) { \
-        /* increment bucket if already exists */ \
         if(*(__val_p + i) == __value) { \
-            *(__cnt_p + i) += inc_count; \
-            set = 1; \
+            total_count = *(__cnt_p + i) + inc_count; \
             break; \
         } \
-        /* otherwise find the least frequently used bucket */ \
-        else if(*(__cnt_p + i) < min) { \
-            min = *(__cnt_p + i); \
-            min_index = i; \
+    } \
+    if(i == 4) total_count = __count; \
+    /* first, copy over any counters that should be sorted above this one \
+     * (counters with higher counts or equal counts and larger values) \
+     */ \
+    for(i=0;i < 4; i++) { \
+        if((*(__cnt_p + i) > total_count) || \
+           ((*(__cnt_p + i) == total_count) && (*(__val_p + i) > __value))) { \
+            tmp_val[tmp_ndx] = *(__val_p + i); \
+            tmp_cnt[tmp_ndx] = *(__cnt_p + i); \
+            tmp_ndx++; \
         } \
+        else break; \
     } \
-    if(!set && (__count > min)) { \
-        *(__cnt_p + min_index) = __count; \
-        *(__val_p + min_index) = __value; \
+    if(tmp_ndx == 4) break; /* all done, updated counter is not added */ \
+    /* next, add the updated counter */ \
+    tmp_val[tmp_ndx] = __value; \
+    tmp_cnt[tmp_ndx] = total_count; \
+    tmp_ndx++; \
+    /* last, copy over any remaining counters to make sure we have 4 sets total */ \
+    while(tmp_ndx != 4) { \
+        if(*(__val_p + i) != __value) { \
+            tmp_val[tmp_ndx] = *(__val_p + i); \
+            tmp_cnt[tmp_ndx] = *(__cnt_p + i); \
+            tmp_ndx++; \
+        } \
+        i++; \
     } \
+    memcpy(__val_p, tmp_val, 4*sizeof(int64_t)); \
+    memcpy(__cnt_p, tmp_cnt, 4*sizeof(int64_t)); \
 } while(0)
 
 /* maximum number of common values that darshan will track per file at


=====================================
darshan-runtime/share/darshan-mmap-epilog.sh.in
=====================================
--- a/darshan-runtime/share/darshan-mmap-epilog.sh.in
+++ b/darshan-runtime/share/darshan-mmap-epilog.sh.in
@@ -11,6 +11,8 @@ DARSHAN_INSTALL_DIR=@prefix@
 # use the log dir specified at configure time
 DARSHAN_LOG_DIR=@__DARSHAN_LOG_PATH@
 
+JOB_END=$(date +"%s")
+
 # use the default mmap log directory (/tmp), unless the 
 # env variable is set to something
 if [ -z "$DARSHAN_MMAP_LOGPATH" ]; then
@@ -53,7 +55,7 @@ if [ $SLURM_NNODES -gt 1 ]; then
     mkdir -p $NODE_LOG_DIR
 
     # construct the per-node log file and store in the output directory
-    $DARSHAN_INSTALL_DIR/bin/darshan-merge \
+    $DARSHAN_INSTALL_DIR/bin/darshan-merge --job-end-time $JOB_END \
         --output ${NODE_LOG_DIR}/${LOG_NAME_PRE}_${NODE_NAME}.darshan \
         $DARSHAN_MMAP_LOG_GLOB
 else
@@ -61,7 +63,7 @@ else
 
     # single node, just create the final output darshan log
     LOG_WRITE_START=$(date +%s)
-    $DARSHAN_INSTALL_DIR/bin/darshan-merge
+    $DARSHAN_INSTALL_DIR/bin/darshan-merge --job-end-time $JOB_END \
         --shared-redux --output ${OUTPUT_LOG_DIR}/${TMP_LOG} \
         $DARSHAN_MMAP_LOG_GLOB
     LOG_WRITE_END=$(date +%s)
@@ -72,4 +74,6 @@ else
     mv ${OUTPUT_LOG_DIR}/${TMP_LOG} ${OUTPUT_LOG_DIR}/${FINAL_LOG}
 fi
 
+rm -f $DARSHAN_MMAP_LOG_GLOB
+
 exit 0


=====================================
darshan-util/darshan-merge.c
=====================================
--- a/darshan-util/darshan-merge.c
+++ b/darshan-util/darshan-merge.c
@@ -11,8 +11,6 @@
 
 #define DEF_MOD_BUF_SIZE 1024 /* 1 KiB is enough for all current mod records ... */
 
-/* TODO: set job end timestamp? */
-
 struct darshan_shared_record_ref
 {
     darshan_record_id id;
@@ -28,23 +26,27 @@ void usage(char *exename)
     fprintf(stderr, "Options:\n");
     fprintf(stderr, "\t--output\t(REQUIRED) Full path of the output darshan log file.\n");
     fprintf(stderr, "\t--shared-redux\tReduce globally shared records into a single record.\n");
+    fprintf(stderr, "\t--job-end-time\tSet the output log's job end time (requires argument of seconds since Epoch).\n");
 
     exit(1);
 }
 
 void parse_args(int argc, char **argv, char ***infile_list, int *n_files,
-    char **outlog_path, int *shared_redux)
+    char **outlog_path, int *shared_redux, int64_t *job_end_time)
 {
     int index;
+    char *check;
     static struct option long_opts[] =
     {
-        {"shared-redux", no_argument, NULL, 's'},
         {"output", required_argument, NULL, 'o'},
+        {"shared-redux", no_argument, NULL, 's'},
+        {"job-end-time", required_argument, NULL, 'e'},
         {0, 0, 0, 0}
     };
 
     *shared_redux = 0;
     *outlog_path = NULL;
+    *job_end_time = 0;
 
     while(1)
     {
@@ -60,6 +62,14 @@ void parse_args(int argc, char **argv, char ***infile_list, int *n_files,
             case 'o':
                 *outlog_path = optarg;
                 break;
+            case 'e':
+                *job_end_time = strtol(optarg, &check, 10);
+                if(optarg == check)
+                {
+                    fprintf(stderr, "Error: unable to parse job end time value.\n");
+                    exit(1);
+                }
+                break;
             case '?':
             default:
                 usage(argv[0]);
@@ -85,7 +95,7 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles,
     darshan_fd in_fd;
     struct darshan_base_record *base_rec;
     struct darshan_shared_record_ref *ref, *tmp;
-    int init = 0;
+    int init_rank = -1;
     int ret;
     int i;
 
@@ -104,9 +114,11 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles,
         while((ret = mod_logutils[mod_id]->log_get_record(in_fd, mod_buf)) == 1)
         {
             base_rec = (struct darshan_base_record *)mod_buf;
+            if(init_rank == -1)
+                init_rank = base_rec->rank;
 
             /* initialize the hash with the first rank's records */
-            if(!init)
+            if(base_rec->rank == init_rank)
             {
                 struct darshan_base_record *agg_base;
 
@@ -128,10 +140,9 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles,
                 ref->id = base_rec->id;
                 ref->ref_cnt = 1;
                 HASH_ADD(hlink, *shared_rec_hash, id, sizeof(darshan_record_id), ref);
-                init = 1;
             }
             else
-            {
+           {
                 /* search for this record in shared record hash */
                 HASH_FIND(hlink, *shared_rec_hash, &(base_rec->id),
                     sizeof(darshan_record_id), ref);
@@ -173,6 +184,7 @@ int main(int argc, char *argv[])
     char **infile_list;
     int n_infiles;
     int shared_redux;
+    int64_t job_end_time = 0;
     char *outlog_path;
     darshan_fd in_fd, merge_fd;
     struct darshan_job in_job, merge_job;
@@ -191,7 +203,7 @@ int main(int argc, char *argv[])
     int ret;
 
     /* grab command line arguments */
-    parse_args(argc, argv, &infile_list, &n_infiles, &outlog_path, &shared_redux);
+    parse_args(argc, argv, &infile_list, &n_infiles, &outlog_path, &shared_redux, &job_end_time);
 
     memset(&merge_job, 0, sizeof(struct darshan_job));
 
@@ -305,6 +317,10 @@ int main(int argc, char *argv[])
         darshan_log_close(in_fd);
     }
 
+    /* if a job end time was passed in, apply it to the output job */
+    if(job_end_time > 0)
+        merge_job.end_time = job_end_time;
+
     /* create the output "merged" log */
     merge_fd = darshan_log_create(outlog_path, DARSHAN_ZLIB_COMP, 1);
     if(merge_fd == NULL)


=====================================
darshan-util/darshan-mpiio-logutils.c
=====================================
--- a/darshan-util/darshan-mpiio-logutils.c
+++ b/darshan-util/darshan-mpiio-logutils.c
@@ -224,39 +224,33 @@ static void darshan_log_print_mpiio_file_diff(void *file_rec1, char *file_name1,
     return;
 }
 
+/* simple helper struct for determining time & byte variances */
+struct var_t
+{
+    double n;
+    double M;
+    double S;
+};
+
 static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag)
 {
     struct darshan_mpiio_file *mpi_rec = (struct darshan_mpiio_file *)rec;
     struct darshan_mpiio_file *agg_mpi_rec = (struct darshan_mpiio_file *)agg_rec;
     int i, j, k;
-    int set;
-    int min_ndx;
-    int64_t min;
+    int total_count;
+    int64_t tmp_val[4];
+    int64_t tmp_cnt[4];
+    int tmp_ndx;
+    double old_M;
     double mpi_time = mpi_rec->fcounters[MPIIO_F_READ_TIME] +
         mpi_rec->fcounters[MPIIO_F_WRITE_TIME] +
         mpi_rec->fcounters[MPIIO_F_META_TIME];
-
-    /* special case initialization of shared record for
-     * first call of this function
-     */
-    if(init_flag)
-    {
-        /* set fastest/slowest rank counters according to root rank.
-         * these counters will be determined as the aggregation progresses.
-         */
-        agg_mpi_rec->counters[MPIIO_FASTEST_RANK] = mpi_rec->base_rec.rank;
-        agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] =
-            mpi_rec->counters[MPIIO_BYTES_READ] +
-            mpi_rec->counters[MPIIO_BYTES_WRITTEN];
-        agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME] = mpi_time;
-
-        agg_mpi_rec->counters[MPIIO_SLOWEST_RANK] =
-            agg_mpi_rec->counters[MPIIO_FASTEST_RANK];
-        agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] =
-            agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES];
-        agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME] =
-            agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME];
-    }
+    double mpi_bytes = (double)mpi_rec->counters[MPIIO_BYTES_READ] +
+        mpi_rec->counters[MPIIO_BYTES_WRITTEN];
+    struct var_t *var_time_p = (struct var_t *)
+        ((char *)rec + sizeof(struct darshan_mpiio_file));
+    struct var_t *var_bytes_p = (struct var_t *)
+        ((char *)var_time_p + sizeof(struct var_t));
 
     for(i = 0; i < MPIIO_NUM_INDICES; i++)
     {
@@ -315,30 +309,69 @@ static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag)
                 break;
             case MPIIO_ACCESS1_ACCESS:
                 /* increment common value counters */
+                if(mpi_rec->counters[i] == 0) break;
+
+                /* first, collapse duplicates */
                 for(j = i; j < i + 4; j++)
                 {
-                    min = agg_mpi_rec->counters[i + 4];
-                    min_ndx = 0;
-                    set = 0;
                     for(k = 0; k < 4; k++)
                     {
                         if(agg_mpi_rec->counters[i + k] == mpi_rec->counters[j])
                         {
                             agg_mpi_rec->counters[i + k + 4] += mpi_rec->counters[j + 4];
-                            set = 1;
+                            mpi_rec->counters[j] = mpi_rec->counters[j + 4] = 0;
+                        }
+                    }
+                }
+
+                /* second, add new counters */
+                for(j = i; j < i + 4; j++)
+                {
+                    tmp_ndx = 0;
+                    memset(tmp_val, 0, 4 * sizeof(int64_t));
+                    memset(tmp_cnt, 0, 4 * sizeof(int64_t));
+
+                    for(k = 0; k < 4; k++)
+                    {
+                        if(agg_mpi_rec->counters[i + k] == mpi_rec->counters[j])
+                        {
+                            total_count = agg_mpi_rec->counters[i + k + 4] +
+                                mpi_rec->counters[j + 4];
                             break;
                         }
-                        else if(agg_mpi_rec->counters[i + k + 4] < min)
+                    }
+                    if(k == 4) total_count = mpi_rec->counters[j + 4];
+
+                    for(k = 0; k < 4; k++)
+                    {
+                        if((agg_mpi_rec->counters[i + k + 4] > total_count) ||
+                           ((agg_mpi_rec->counters[i + k + 4] == total_count) &&
+                            (agg_mpi_rec->counters[i + k] > mpi_rec->counters[j])))
                         {
-                            min = agg_mpi_rec->counters[i + k + 4];
-                            min_ndx = k;
+                            tmp_val[tmp_ndx] = agg_mpi_rec->counters[i + k];
+                            tmp_cnt[tmp_ndx] = agg_mpi_rec->counters[i + k + 4];
+                            tmp_ndx++;
                         }
+                        else break;
                     }
-                    if(!set && (mpi_rec->counters[j + 4] > min))
+                    if(tmp_ndx == 4) break;
+
+                    tmp_val[tmp_ndx] = mpi_rec->counters[j];
+                    tmp_cnt[tmp_ndx] = mpi_rec->counters[j + 4];
+                    tmp_ndx++;
+
+                    while(tmp_ndx != 4)
                     {
-                        agg_mpi_rec->counters[i + min_ndx] = mpi_rec->counters[j];
-                        agg_mpi_rec->counters[i + min_ndx + 4] = mpi_rec->counters[j + 4];
+                        if(agg_mpi_rec->counters[i + k] != mpi_rec->counters[j])
+                        {
+                            tmp_val[tmp_ndx] = agg_mpi_rec->counters[i + k];
+                            tmp_cnt[tmp_ndx] = agg_mpi_rec->counters[i + k + 4];
+                            tmp_ndx++;
+                        }
+                        k++;
                     }
+                    memcpy(&(agg_mpi_rec->counters[i]), tmp_val, 4 * sizeof(int64_t));
+                    memcpy(&(agg_mpi_rec->counters[i + 4]), tmp_cnt, 4 * sizeof(int64_t));
                 }
                 break;
             case MPIIO_ACCESS2_ACCESS:
@@ -403,27 +436,80 @@ static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag)
                 }
                 break;
             case MPIIO_F_FASTEST_RANK_TIME:
+                if(init_flag)
+                {
+                    /* set fastest rank counters according to root rank. these counters
+                     * will be determined as the aggregation progresses.
+                     */
+                    agg_mpi_rec->counters[MPIIO_FASTEST_RANK] = mpi_rec->base_rec.rank;
+                    agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] = mpi_bytes;
+                    agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME] = mpi_time;
+                }
+
                 if(mpi_time < agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME])
                 {
                     agg_mpi_rec->counters[MPIIO_FASTEST_RANK] = mpi_rec->base_rec.rank;
-                    agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] =
-                        mpi_rec->counters[MPIIO_BYTES_READ] +
-                        mpi_rec->counters[MPIIO_BYTES_WRITTEN];
+                    agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] = mpi_bytes;
                     agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME] = mpi_time;
                 }
                 break;
             case MPIIO_F_SLOWEST_RANK_TIME:
+                if(init_flag)
+                {
+                    /* set slowest rank counters according to root rank. these counters
+                     * will be determined as the aggregation progresses.
+                     */
+                    agg_mpi_rec->counters[MPIIO_SLOWEST_RANK] = mpi_rec->base_rec.rank;
+                    agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] = mpi_bytes;
+                    agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME] = mpi_time;
+                }
+
                 if(mpi_time > agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME])
                 {
                     agg_mpi_rec->counters[MPIIO_SLOWEST_RANK] = mpi_rec->base_rec.rank;
-                    agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] =
-                        mpi_rec->counters[MPIIO_BYTES_READ] +
-                        mpi_rec->counters[MPIIO_BYTES_WRITTEN];
+                    agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] = mpi_bytes;
                     agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME] = mpi_time;
                 }
                 break;
+            case MPIIO_F_VARIANCE_RANK_TIME:
+                if(init_flag)
+                {
+                    var_time_p->n = 1;
+                    var_time_p->M = mpi_time;
+                    var_time_p->S = 0;
+                }
+                else
+                {
+                    old_M = var_time_p->M;
+
+                    var_time_p->n++;
+                    var_time_p->M += (mpi_time - var_time_p->M) / var_time_p->n;
+                    var_time_p->S += (mpi_time - var_time_p->M) * (mpi_time - old_M);
+
+                    agg_mpi_rec->fcounters[MPIIO_F_VARIANCE_RANK_TIME] =
+                        var_time_p->S / var_time_p->n;
+                }
+                break;
+            case MPIIO_F_VARIANCE_RANK_BYTES:
+                if(init_flag)
+                {
+                    var_bytes_p->n = 1;
+                    var_bytes_p->M = mpi_bytes;
+                    var_bytes_p->S = 0;
+                }
+                else
+                {
+                    old_M = var_bytes_p->M;
+
+                    var_bytes_p->n++;
+                    var_bytes_p->M += (mpi_bytes - var_bytes_p->M) / var_bytes_p->n;
+                    var_bytes_p->S += (mpi_bytes - var_bytes_p->M) * (mpi_bytes - old_M);
+
+                    agg_mpi_rec->fcounters[MPIIO_F_VARIANCE_RANK_BYTES] =
+                        var_bytes_p->S / var_bytes_p->n;
+                }
+                break;
             default:
-                /* TODO: variance */
                 agg_mpi_rec->fcounters[i] = -1;
                 break;
         }


=====================================
darshan-util/darshan-posix-logutils.c
=====================================
--- a/darshan-util/darshan-posix-logutils.c
+++ b/darshan-util/darshan-posix-logutils.c
@@ -225,39 +225,33 @@ static void darshan_log_print_posix_file_diff(void *file_rec1, char *file_name1,
     return;
 }
 
+/* simple helper struct for determining time & byte variances */
+struct var_t
+{
+    double n;
+    double M;
+    double S;
+};
+
 static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
 {
     struct darshan_posix_file *psx_rec = (struct darshan_posix_file *)rec;
     struct darshan_posix_file *agg_psx_rec = (struct darshan_posix_file *)agg_rec;
     int i, j, k;
-    int set;
-    int min_ndx;
-    int64_t min;
+    int total_count;
+    int64_t tmp_val[4];
+    int64_t tmp_cnt[4];
+    int tmp_ndx;
+    double old_M;
     double psx_time = psx_rec->fcounters[POSIX_F_READ_TIME] +
         psx_rec->fcounters[POSIX_F_WRITE_TIME] +
         psx_rec->fcounters[POSIX_F_META_TIME];
-
-    /* special case initialization of shared record for
-     * first call of this function
-     */
-    if(init_flag)
-    {
-        /* set fastest/slowest rank counters according to root rank.
-         * these counters will be determined as the aggregation progresses.
-         */
-        agg_psx_rec->counters[POSIX_FASTEST_RANK] = psx_rec->base_rec.rank;
-        agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] =
-            psx_rec->counters[POSIX_BYTES_READ] +
-            psx_rec->counters[POSIX_BYTES_WRITTEN];
-        agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] = psx_time;
-
-        agg_psx_rec->counters[POSIX_SLOWEST_RANK] =
-            agg_psx_rec->counters[POSIX_FASTEST_RANK];
-        agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] =
-            agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES];
-        agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] =
-            agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME];
-    }
+    double psx_bytes = (double)psx_rec->counters[POSIX_BYTES_READ] +
+        psx_rec->counters[POSIX_BYTES_WRITTEN];
+    struct var_t *var_time_p = (struct var_t *)
+        ((char *)rec + sizeof(struct darshan_posix_file));
+    struct var_t *var_bytes_p = (struct var_t *)
+        ((char *)var_time_p + sizeof(struct var_t));
 
     for(i = 0; i < POSIX_NUM_INDICES; i++)
     {
@@ -332,30 +326,69 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
             case POSIX_STRIDE1_STRIDE:
             case POSIX_ACCESS1_ACCESS:
                 /* increment common value counters */
+                if(psx_rec->counters[i] == 0) break;
+
+                /* first, collapse duplicates */
                 for(j = i; j < i + 4; j++)
                 {
-                    min = agg_psx_rec->counters[i + 4];
-                    min_ndx = 0;
-                    set = 0;
                     for(k = 0; k < 4; k++)
                     {
                         if(agg_psx_rec->counters[i + k] == psx_rec->counters[j])
                         {
                             agg_psx_rec->counters[i + k + 4] += psx_rec->counters[j + 4];
-                            set = 1;
+                            psx_rec->counters[j] = psx_rec->counters[j + 4] = 0;
+                        }
+                    }
+                }
+
+                /* second, add new counters */
+                for(j = i; j < i + 4; j++)
+                {
+                    tmp_ndx = 0;
+                    memset(tmp_val, 0, 4 * sizeof(int64_t));
+                    memset(tmp_cnt, 0, 4 * sizeof(int64_t));
+
+                    for(k = 0; k < 4; k++)
+                    {
+                        if(agg_psx_rec->counters[i + k] == psx_rec->counters[j])
+                        {
+                            total_count = agg_psx_rec->counters[i + k + 4] +
+                                psx_rec->counters[j + 4];
                             break;
                         }
-                        else if(agg_psx_rec->counters[i + k + 4] < min)
+                    }
+                    if(k == 4) total_count = psx_rec->counters[j + 4];
+
+                    for(k = 0; k < 4; k++)
+                    {
+                        if((agg_psx_rec->counters[i + k + 4] > total_count) ||
+                           ((agg_psx_rec->counters[i + k + 4] == total_count) &&
+                            (agg_psx_rec->counters[i + k] > psx_rec->counters[j])))
                         {
-                            min = agg_psx_rec->counters[i + k + 4];
-                            min_ndx = k;
+                            tmp_val[tmp_ndx] = agg_psx_rec->counters[i + k];
+                            tmp_cnt[tmp_ndx] = agg_psx_rec->counters[i + k + 4];
+                            tmp_ndx++;
                         }
+                        else break;
                     }
-                    if(!set && (psx_rec->counters[j + 4] > min))
+                    if(tmp_ndx == 4) break;
+
+                    tmp_val[tmp_ndx] = psx_rec->counters[j];
+                    tmp_cnt[tmp_ndx] = psx_rec->counters[j + 4];
+                    tmp_ndx++;
+
+                    while(tmp_ndx != 4)
                     {
-                        agg_psx_rec->counters[i + min_ndx] = psx_rec->counters[j];
-                        agg_psx_rec->counters[i + min_ndx + 4] = psx_rec->counters[j + 4];
+                        if(agg_psx_rec->counters[i + k] != psx_rec->counters[j])
+                        {
+                            tmp_val[tmp_ndx] = agg_psx_rec->counters[i + k];
+                            tmp_cnt[tmp_ndx] = agg_psx_rec->counters[i + k + 4];
+                            tmp_ndx++;
+                        }
+                        k++;
                     }
+                    memcpy(&(agg_psx_rec->counters[i]), tmp_val, 4 * sizeof(int64_t));
+                    memcpy(&(agg_psx_rec->counters[i + 4]), tmp_cnt, 4 * sizeof(int64_t));
                 }
                 break;
             case POSIX_STRIDE2_STRIDE:
@@ -427,27 +460,80 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
                 }
                 break;
             case POSIX_F_FASTEST_RANK_TIME:
+                if(init_flag)
+                {
+                    /* set fastest rank counters according to root rank. these counters
+                     * will be determined as the aggregation progresses.
+                     */
+                    agg_psx_rec->counters[POSIX_FASTEST_RANK] = psx_rec->base_rec.rank;
+                    agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] = psx_bytes;
+                    agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] = psx_time;
+                }
+
                 if(psx_time < agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME])
                 {
                     agg_psx_rec->counters[POSIX_FASTEST_RANK] = psx_rec->base_rec.rank;
-                    agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] =
-                        psx_rec->counters[POSIX_BYTES_READ] +
-                        psx_rec->counters[POSIX_BYTES_WRITTEN];
+                    agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] = psx_bytes;
                     agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] = psx_time;
                 }
                 break;
             case POSIX_F_SLOWEST_RANK_TIME:
+                if(init_flag)
+                {
+                    /* set slowest rank counters according to root rank. these counters
+                     * will be determined as the aggregation progresses.
+                     */
+                    agg_psx_rec->counters[POSIX_SLOWEST_RANK] = psx_rec->base_rec.rank;
+                    agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] = psx_bytes;
+                    agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] = psx_time;
+                }
+
                 if(psx_time > agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME])
                 {
                     agg_psx_rec->counters[POSIX_SLOWEST_RANK] = psx_rec->base_rec.rank;
-                    agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] =
-                        psx_rec->counters[POSIX_BYTES_READ] +
-                        psx_rec->counters[POSIX_BYTES_WRITTEN];
+                    agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] = psx_bytes;
                     agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] = psx_time;
                 }
                 break;
+            case POSIX_F_VARIANCE_RANK_TIME:
+                if(init_flag)
+                {
+                    var_time_p->n = 1;
+                    var_time_p->M = psx_time;
+                    var_time_p->S = 0;
+                }
+                else
+                {
+                    old_M = var_time_p->M;
+
+                    var_time_p->n++;
+                    var_time_p->M += (psx_time - var_time_p->M) / var_time_p->n;
+                    var_time_p->S += (psx_time - var_time_p->M) * (psx_time - old_M);
+
+                    agg_psx_rec->fcounters[POSIX_F_VARIANCE_RANK_TIME] =
+                        var_time_p->S / var_time_p->n;
+                }
+                break;
+            case POSIX_F_VARIANCE_RANK_BYTES:
+                if(init_flag)
+                {
+                    var_bytes_p->n = 1;
+                    var_bytes_p->M = psx_bytes;
+                    var_bytes_p->S = 0;
+                }
+                else
+                {
+                    old_M = var_bytes_p->M;
+
+                    var_bytes_p->n++;
+                    var_bytes_p->M += (psx_bytes - var_bytes_p->M) / var_bytes_p->n;
+                    var_bytes_p->S += (psx_bytes - var_bytes_p->M) * (psx_bytes - old_M);
+
+                    agg_psx_rec->fcounters[POSIX_F_VARIANCE_RANK_BYTES] =
+                        var_bytes_p->S / var_bytes_p->n;
+                }
+                break;
             default:
-                /* TODO: variance */
                 agg_psx_rec->fcounters[i] = -1;
                 break;
         }



View it on GitLab: https://xgitlab.cels.anl.gov/darshan/darshan/compare/37b1bb02d5c0a9bb80b024890d8ebaf6a73d977b...04063da70950ee779371188c7937956eb89b4273
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mcs.anl.gov/pipermail/darshan-commits/attachments/20160301/d2dbd17e/attachment-0001.html>


More information about the Darshan-commits mailing list