[mpich2-commits] r7708 - mpich2/trunk/src/pm/hydra/pm/pmiserv

balaji at mcs.anl.gov balaji at mcs.anl.gov
Thu Jan 13 05:09:17 CST 2011


Author: balaji
Date: 2011-01-13 05:09:17 -0600 (Thu, 13 Jan 2011)
New Revision: 7708

Modified:
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_cb.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v1.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v2.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmiserv_cb.c
Log:
Cleanup the PMI FD activation. Now we preload PMI ranks when launching
processes. If we forked out the MPI processes (PMI_FD format), we
preload the PMI FDs as well. If we are using a PMI PORT, we wait for
the application to send a PMI initialization message before loading
the PMI FD.

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_cb.c	2011-01-13 11:09:07 UTC (rev 7707)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_cb.c	2011-01-13 11:09:17 UTC (rev 7708)
@@ -209,7 +209,7 @@
 static HYD_status pmi_cb(int fd, HYD_event_t events, void *userp)
 {
     char *buf = NULL, *pmi_cmd = NULL, *args[HYD_NUM_TMP_STRINGS] = { 0 };
-    int closed, repeat, sent, i = -1, linelen, pid;
+    int closed, repeat, sent, i = -1, linelen, pid = -1;
     struct HYD_pmcd_hdr hdr;
     struct HYD_pmcd_pmip_pmi_handle *h;
     char ftb_event_payload[HYDT_FTB_MAX_PAYLOAD_DATA];
@@ -227,13 +227,12 @@
                             &linelen, &closed, 0);
     HYDU_ERR_POP(status, "unable to read PMI command\n");
 
-    /* If we used the PMI_FD format, try to find the PMI FD */
-    if (!using_pmi_port) {
-        for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
-            if (HYD_pmcd_pmip.downstream.pmi_fd[i] == fd)
-                break;
-        HYDU_ASSERT(i < HYD_pmcd_pmip.local.proxy_process_count, status);
-        pid = i;
+    /* Try to find the PMI FD */
+    for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++) {
+        if (HYD_pmcd_pmip.downstream.pmi_fd[i] == fd) {
+            pid = i;
+            break;
+        }
     }
 
     if (closed) {
@@ -242,8 +241,12 @@
          * get closed socket event as we deregister this socket as
          * soon as we get the finalize message. For non-PMI
          * applications, this is harder to identify, so we just let
-         * the user cleanup the processes on a failure. */
-        if (using_pmi_port || HYD_pmcd_pmip.downstream.pmi_fd_active[pid]) {
+         * the user cleanup the processes on a failure.
+         *
+         * We check of we found the PMI FD, and if the FD is "PMI
+         * active" (which means that this is an MPI application).
+         */
+        if (pid != -1 && HYD_pmcd_pmip.downstream.pmi_fd_active[pid]) {
             MPL_snprintf(ftb_event_payload, HYDT_FTB_MAX_PAYLOAD_DATA,
                          "pgid:%d rank:%d",
                          HYD_pmcd_pmip.local.pgid, HYD_pmcd_pmip.downstream.pmi_rank[pid]);
@@ -282,8 +285,11 @@
         storage[storage_len] = 0;
     }
 
-    /* This is a PMI application */
-    if (!using_pmi_port)
+    /* We were able to read the PMI command correctly. If we were able
+     * to identify what PMI FD this is, activate it. If we were not
+     * able to identify the PMI FD, we will activate it when we get
+     * the PMI initialization command. */
+    if (pid != -1 && !HYD_pmcd_pmip.downstream.pmi_fd_active[pid])
         HYD_pmcd_pmip.downstream.pmi_fd_active[pid] = 1;
 
     do {
@@ -425,13 +431,12 @@
 
 static HYD_status launch_procs(void)
 {
-    int i, j, arg, process_id, pmi_rank;
+    int i, j, arg, process_id;
     char *str, *envstr, *list, *pmi_port;
     char *client_args[HYD_NUM_TMP_STRINGS];
     struct HYD_env *env, *force_env = NULL;
     struct HYD_exec *exec;
     struct HYD_pmcd_hdr hdr;
-    int *pmi_ranks;
     int sent, closed, pmi_fds[2] = { HYD_FD_UNSET, HYD_FD_UNSET };
     struct HYDT_bind_cpuset_t cpuset;
     char ftb_event_payload[HYDT_FTB_MAX_PAYLOAD_DATA];
@@ -443,15 +448,6 @@
     for (exec = HYD_pmcd_pmip.exec_list; exec; exec = exec->next)
         HYD_pmcd_pmip.local.proxy_process_count += exec->proc_count;
 
-    HYDU_MALLOC(pmi_ranks, int *, HYD_pmcd_pmip.local.proxy_process_count * sizeof(int),
-                status);
-    for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++) {
-        pmi_ranks[i] =
-            HYDU_local_to_global_id(i, HYD_pmcd_pmip.start_pid,
-                                    HYD_pmcd_pmip.local.proxy_core_count,
-                                    HYD_pmcd_pmip.system_global.global_core_count);
-    }
-
     HYDU_MALLOC(HYD_pmcd_pmip.downstream.out, int *,
                 HYD_pmcd_pmip.local.proxy_process_count * sizeof(int), status);
     HYDU_MALLOC(HYD_pmcd_pmip.downstream.err, int *,
@@ -467,17 +463,29 @@
     HYDU_MALLOC(HYD_pmcd_pmip.downstream.pmi_fd_active, int *,
                 HYD_pmcd_pmip.local.proxy_process_count * sizeof(int), status);
 
-    /* Initialize the PMI_FD and PMI FD active state */
+    /* Initialize the PMI_FD and PMI FD active state, and exit status */
     for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++) {
-        HYD_pmcd_pmip.downstream.pmi_rank[i] = -1;
+        /* The exit status is populated when the processes terminate */
+        HYD_pmcd_pmip.downstream.exit_status[i] = -1;
+
+        /* If we use PMI_FD, the pmi_fd and pmi_fd_active arrays will
+         * be filled out in this function. But if we are using
+         * PMI_PORT, we will fill them out later when the processes
+         * send the PMI initialization message. Note that non-MPI
+         * processes are never "PMI active" when we use the PMI
+         * PORT. */
         HYD_pmcd_pmip.downstream.pmi_fd[i] = HYD_FD_UNSET;
         HYD_pmcd_pmip.downstream.pmi_fd_active[i] = 0;
+
+        if (HYD_pmcd_pmip.system_global.pmi_rank == -1)
+            HYD_pmcd_pmip.downstream.pmi_rank[i] =
+                HYDU_local_to_global_id(i, HYD_pmcd_pmip.start_pid,
+                                        HYD_pmcd_pmip.local.proxy_core_count,
+                                        HYD_pmcd_pmip.system_global.global_core_count);
+        else
+            HYD_pmcd_pmip.downstream.pmi_rank[i] = HYD_pmcd_pmip.system_global.pmi_rank;
     }
 
-    /* Initialize the exit status */
-    for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
-        HYD_pmcd_pmip.downstream.exit_status[i] = -1;
-
     status = HYDT_bind_init(HYD_pmcd_pmip.local.local_binding ?
                             HYD_pmcd_pmip.local.local_binding :
                             HYD_pmcd_pmip.user_global.binding,
@@ -512,8 +520,9 @@
                      [HYD_pmcd_pmip.local.proxy_process_count - 1]);
         status = HYDT_ckpoint_restart(HYD_pmcd_pmip.local.pgid, HYD_pmcd_pmip.local.id,
                                       env, HYD_pmcd_pmip.local.proxy_process_count,
-                                      pmi_ranks,
-                                      pmi_ranks[0] ? NULL : &HYD_pmcd_pmip.downstream.in,
+                                      HYD_pmcd_pmip.downstream.pmi_rank,
+                                      HYD_pmcd_pmip.downstream.pmi_rank[0] ? NULL :
+                                      &HYD_pmcd_pmip.downstream.in,
                                       HYD_pmcd_pmip.downstream.out,
                                       HYD_pmcd_pmip.downstream.err,
                                       HYD_pmcd_pmip.downstream.pid);
@@ -610,36 +619,22 @@
                                 HYDU_strerror(errno));
 
         for (i = 0; i < exec->proc_count; i++) {
+            if (using_pmi_port) {
+                /* If we are using the PMI_PORT format */
 
-            if (HYD_pmcd_pmip.system_global.pmi_rank == -1)
-                pmi_rank = HYDU_local_to_global_id(process_id,
-                                                   HYD_pmcd_pmip.start_pid,
-                                                   HYD_pmcd_pmip.local.proxy_core_count,
-                                                   HYD_pmcd_pmip.
-                                                   system_global.global_core_count);
-            else
-                pmi_rank = HYD_pmcd_pmip.system_global.pmi_rank;
-
-            HYD_pmcd_pmip.downstream.pmi_rank[process_id] = pmi_rank;
-
-            if (HYD_pmcd_pmip.system_global.pmi_port ||
-                HYD_pmcd_pmip.user_global.ckpoint_prefix) {
-                /* If a global PMI_PORT is provided, or this is a
-                 * checkpointing case, use PMI_PORT format */
-
                 /* PMI_PORT */
                 status = HYDU_append_env_to_list("PMI_PORT", pmi_port, &force_env);
                 HYDU_ERR_POP(status, "unable to add env to list\n");
 
                 /* PMI_ID */
-                str = HYDU_int_to_str(pmi_rank);
+                str = HYDU_int_to_str(HYD_pmcd_pmip.downstream.pmi_rank[process_id]);
                 status = HYDU_append_env_to_list("PMI_ID", str, &force_env);
                 HYDU_ERR_POP(status, "unable to add env to list\n");
                 HYDU_FREE(str);
             }
             else {
                 /* PMI_RANK */
-                str = HYDU_int_to_str(pmi_rank);
+                str = HYDU_int_to_str(HYD_pmcd_pmip.downstream.pmi_rank[process_id]);
                 status = HYDU_append_env_to_list("PMI_RANK", str, &force_env);
                 HYDU_ERR_POP(status, "unable to add env to list\n");
                 HYDU_FREE(str);
@@ -677,7 +672,8 @@
 
             HYDT_bind_pid_to_cpuset(process_id, &cpuset);
             status = HYDU_create_process(client_args, force_env,
-                                         pmi_rank ? NULL : &HYD_pmcd_pmip.downstream.in,
+                                         HYD_pmcd_pmip.downstream.pmi_rank[process_id] ? NULL :
+                                         &HYD_pmcd_pmip.downstream.in,
                                          &HYD_pmcd_pmip.downstream.out[process_id],
                                          &HYD_pmcd_pmip.downstream.err[process_id],
                                          &HYD_pmcd_pmip.downstream.pid[process_id],
@@ -726,8 +722,6 @@
     HYDU_ERR_POP(status, "unable to register fd\n");
 
   fn_exit:
-    if (pmi_ranks)
-        HYDU_FREE(pmi_ranks);
     HYDU_FUNC_EXIT();
     return status;
 

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v1.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v1.c	2011-01-13 11:09:07 UTC (rev 7707)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v1.c	2011-01-13 11:09:17 UTC (rev 7708)
@@ -139,22 +139,14 @@
     id = atoi(val);
 
     /* Store the PMI_ID to fd mapping */
-    for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
+    for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++) {
         if (HYD_pmcd_pmip.downstream.pmi_rank[i] == id) {
             HYD_pmcd_pmip.downstream.pmi_fd[i] = fd;
+            HYD_pmcd_pmip.downstream.pmi_fd_active[i] = 1;
             break;
         }
-
-    /* The rank has not been previously stored */
-    if (i == HYD_pmcd_pmip.local.proxy_process_count) {
-        for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
-            if (HYD_pmcd_pmip.downstream.pmi_rank[i] == -1)
-                break;
-        HYDU_ASSERT(i < HYD_pmcd_pmip.local.proxy_process_count, status);
-
-        HYD_pmcd_pmip.downstream.pmi_rank[i] = id;
-        HYD_pmcd_pmip.downstream.pmi_fd[i] = fd;
     }
+    HYDU_ASSERT(i < HYD_pmcd_pmip.local.proxy_process_count, status);
 
     i = 0;
     tmp[i++] = HYDU_strdup("cmd=initack\ncmd=set size=");

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v2.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v2.c	2011-01-13 11:09:07 UTC (rev 7707)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmip_pmi_v2.c	2011-01-13 11:09:17 UTC (rev 7708)
@@ -165,9 +165,14 @@
     id = atoi(rank_str);
 
     /* Store the PMI_RANK to fd mapping */
-    for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
-        if (HYD_pmcd_pmip.downstream.pmi_rank[i] == id)
+    for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++) {
+        if (HYD_pmcd_pmip.downstream.pmi_rank[i] == id) {
             HYD_pmcd_pmip.downstream.pmi_fd[i] = fd;
+            HYD_pmcd_pmip.downstream.pmi_fd_active[i] = 1;
+            break;
+        }
+    }
+    HYDU_ASSERT(i < HYD_pmcd_pmip.local.proxy_process_count, status);
 
     i = 0;
     /* FIXME: allow for multiple ranks per PMI ID */

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmiserv_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmiserv_cb.c	2011-01-13 11:09:07 UTC (rev 7707)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmiserv_cb.c	2011-01-13 11:09:17 UTC (rev 7708)
@@ -140,6 +140,7 @@
     proxy = (struct HYD_proxy *) userp;
 
     if (fd == STDIN_FILENO) {
+        HYD_pmcd_init_header(&hdr);
         hdr.cmd = STDIN;
     }
     else {



More information about the mpich2-commits mailing list