[mpich2-commits] r4222 - in mpich2/trunk/src/pm/hydra: . bootstrap/fork bootstrap/slurm bootstrap/ssh control/consys include launcher/mpiexec launcher/utils pm/pmiserv utils/launch utils/sock

balaji at mcs.anl.gov balaji at mcs.anl.gov
Sun Mar 29 22:57:55 CDT 2009


Author: balaji
Date: 2009-03-29 22:57:55 -0500 (Sun, 29 Mar 2009)
New Revision: 4222

Added:
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h
Removed:
   mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
Modified:
   mpich2/trunk/src/pm/hydra/README
   mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
   mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
   mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
   mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm
   mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c
   mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
   mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
   mpich2/trunk/src/pm/hydra/include/hydra.h
   mpich2/trunk/src/pm/hydra/include/hydra_base.h
   mpich2/trunk/src/pm/hydra/include/hydra_utils.h
   mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
   mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
   mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
   mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
   mpich2/trunk/src/pm/hydra/utils/launch/allocate.c
   mpich2/trunk/src/pm/hydra/utils/sock/sock.c
Log:
Merge the code paths for the runtime and persistent launch modes so
that we don't need to maintain duplicate code.


Modified: mpich2/trunk/src/pm/hydra/README
===================================================================
--- mpich2/trunk/src/pm/hydra/README	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/README	2009-03-30 03:57:55 UTC (rev 4222)
@@ -198,3 +198,24 @@
 enable it, you should use the option --enable-x to mpiexec.
 
  $ mpiexec --enable-x -f hosts -n 4 ./app
+
+
+Persistent-mode Proxies
+-----------------------
+
+Hydra also supports proxies to be launched in persistent mode on the
+system (e.g., by a system administrator). To launch in persistent
+mode, use:
+
+ $ mpiexec --boot-proxies
+
+ $ mpiexec --use-persistent -n 4 ./app1
+
+ $ mpiexec --use-persistent -n 4 ./app2
+
+ $ mpiexec --use-persistent -n 4 ./app3
+
+ $ mpiexec --shutdown-proxies
+
+Persistent mode can also be picked using the environment setting
+HYDRA_LAUNCH_MODE=persistent.

Modified: mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -43,10 +43,6 @@
         for (arg = 0; client_arg[arg]; arg++)
             HYDU_FREE(client_arg[arg]);
 
-        /* For the remaining processes, set the stdin fd to -1 */
-        if (process_id != 0)
-            handle.in = -1;
-
         process_id++;
     }
 

Modified: mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -54,10 +54,6 @@
         for (arg = 0; client_arg[arg]; arg++)
             HYDU_FREE(client_arg[arg]);
 
-        /* For the remaining processes, set the stdin fd to -1 */
-        if (process_id != 0)
-            handle.in = -1;
-
         process_id++;
     }
 

Modified: mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -63,10 +63,6 @@
         for (arg = 0; client_arg[arg]; arg++)
             HYDU_FREE(client_arg[arg]);
 
-        /* For the remaining processes, set the stdin fd to -1 */
-        if (process_id != 0)
-            handle.in = -1;
-
         process_id++;
     }
 

Modified: mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm	2009-03-30 03:57:55 UTC (rev 4222)
@@ -7,7 +7,7 @@
 HYDRA_LIB_PATH = ../../lib
 
 libhydra_a_DIR = ${HYDRA_LIB_PATH}
-libhydra_a_SOURCES = consys_launch.c consys_wait.c consys_close.c consys_finalize.c
+libhydra_a_SOURCES = consys_launch.c consys_wait.c consys_finalize.c
 INCLUDES = -I${abs_srcdir}/../../include \
 	-I${abs_srcdir}/../../../../include \
 	-I../../include \
@@ -15,5 +15,4 @@
 	-I${abs_srcdir}/../../launcher/utils \
 	-I${abs_srcdir}/../include \
 	-I${abs_srcdir}/../utils \
-	-I${abs_srcdir}/../../pm/include \
-	-I${abs_srcdir}/../../demux
+	-I${abs_srcdir}/../../pm/include

Deleted: mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_close.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_close.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -1,45 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- *  (C) 2008 by Argonne National Laboratory.
- *      See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "hydra_utils.h"
-#include "csi.h"
-#include "pmci.h"
-#include "demux.h"
-
-extern HYD_Handle handle;
-
-HYD_Status HYD_CSI_close_fd(int fd)
-{
-    struct HYD_Partition *partition;
-    HYD_Status status = HYD_SUCCESS;
-
-    HYDU_FUNC_ENTER();
-
-    /* Deregister the FD with the demux engine and close it. */
-    status = HYD_DMX_deregister_fd(fd);
-    HYDU_ERR_SETANDJUMP1(status, status, "error deregistering fd %d\n", fd);
-    close(fd);
-
-    /* Find the FD in the handle and remove it. */
-    for (partition = handle.partition_list; partition; partition = partition->next) {
-        if (partition->out == fd) {
-            partition->out = -1;
-            goto fn_exit;
-        }
-        if (partition->err == fd) {
-            partition->err = -1;
-            goto fn_exit;
-        }
-    }
-
-  fn_exit:
-    HYDU_FUNC_EXIT();
-    return status;
-
-  fn_fail:
-    goto fn_exit;
-}

Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -7,7 +7,6 @@
 #include "hydra.h"
 #include "csi.h"
 #include "pmci.h"
-#include "demux.h"
 
 HYD_Status HYD_CSI_finalize(void)
 {
@@ -18,9 +17,6 @@
     status = HYD_PMCI_finalize();
     HYDU_ERR_POP(status, "error returned from PM finalize\n");
 
-    status = HYD_DMX_finalize();
-    HYDU_ERR_POP(status, "error returned from demux finalize\n");
-
   fn_exit:
     HYDU_FUNC_EXIT();
     return status;

Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,7 +8,6 @@
 #include "hydra_utils.h"
 #include "csi.h"
 #include "pmci.h"
-#include "demux.h"
 
 extern HYD_Handle handle;
 
@@ -23,32 +22,6 @@
     status = HYD_PMCI_launch_procs();
     HYDU_ERR_POP(status, "PM returned error while launching processes\n");
 
-    for (partition = handle.partition_list; partition; partition = partition->next) {
-        status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, NULL, handle.stdout_cb);
-        HYDU_ERR_POP(status, "demux returned error registering fd\n");
-
-        if (partition->err != -1) {
-            status =
-                HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, NULL, handle.stderr_cb);
-            HYDU_ERR_POP(status, "demux returned error registering fd\n");
-        }
-    }
-
-    if (handle.in != -1) {      /* Only process_id 0 */
-        status = HYDU_sock_set_nonblock(handle.in);
-        HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
-        stdin_fd = 0;
-        status = HYDU_sock_set_nonblock(stdin_fd);
-        HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
-        handle.stdin_buf_count = 0;
-        handle.stdin_buf_offset = 0;
-
-        status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, handle.stdin_cb);
-        HYDU_ERR_POP(status, "demux returned error registering fd\n");
-    }
-
   fn_exit:
     HYDU_FUNC_EXIT();
     return status;

Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,7 +8,6 @@
 #include "csi.h"
 #include "csiu.h"
 #include "pmci.h"
-#include "demux.h"
 
 extern HYD_Handle handle;
 
@@ -20,33 +19,11 @@
 
     HYDU_FUNC_ENTER();
 
-    while (1) {
-        /* Wait for some event to occur */
-        status = HYD_DMX_wait_for_event(HYDU_time_left(handle.start, handle.timeout));
-        HYDU_ERR_POP(status, "error waiting for event\n");
+    /* Make sure all the processes have terminated. The process
+     * manager control device will take care of that. */
+    status = HYD_PMCI_wait_for_completion();
+    HYDU_ERR_POP(status, "error waiting for completion\n");
 
-        /* Check to see if there's any open read socket left; if there
-         * are, we will just wait for more events. */
-        sockets_open = 0;
-        for (partition = handle.partition_list; partition; partition = partition->next) {
-            if (partition->out != -1 || partition->err != -1) {
-                sockets_open++;
-                break;
-            }
-        }
-
-        if (sockets_open && HYDU_time_left(handle.start, handle.timeout))
-            continue;
-
-        /* Make sure all the processes have terminated. The process
-         * manager control device will take care of that. */
-        status = HYD_PMCI_wait_for_completion();
-        HYDU_ERR_POP(status, "error waiting for completion\n");
-
-        /* We are done */
-        break;
-    }
-
   fn_exit:
     HYDU_FUNC_EXIT();
     return status;

Modified: mpich2/trunk/src/pm/hydra/include/hydra.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra.h	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/include/hydra.h	2009-03-30 03:57:55 UTC (rev 4222)
@@ -14,16 +14,9 @@
 struct HYD_Handle_ {
     char *base_path;
     int proxy_port;
-    /* The persistent proxy is different from the centralized proxy
-     * and hence needs its own port - pproxy_port */
-    int pproxy_port;
+    HYD_Launch_mode_t launch_mode;
+
     char *bootstrap;
-    /* FIXME: We should define a proxy type instead of all these
-     * flags... proxy_type = PROXY_LAUNCHER | PROXY_TERMINATOR
-     */
-    int is_proxy_launcher;
-    int is_proxy_terminator;
-    int is_proxy_remote;
     HYD_Binding binding;
     char *user_bind_map;
 
@@ -66,7 +59,4 @@
 
 extern HYD_Handle handle;
 
-#define HYD_PROXY_NAME "pmi_proxy"
-#define HYD_PPROXY_PORT 8677
-
 #endif /* HYDRA_H_INCLUDED */

Modified: mpich2/trunk/src/pm/hydra/include/hydra_base.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_base.h	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/include/hydra_base.h	2009-03-30 03:57:55 UTC (rev 4222)
@@ -80,6 +80,18 @@
 } HYD_Status;
 
 
+/* Proxy type */
+typedef enum {
+    HYD_LAUNCH_UNSET,
+    HYD_LAUNCH_RUNTIME,
+
+    /* For persistent proxies */
+    HYD_LAUNCH_BOOT,
+    HYD_LAUNCH_SHUTDOWN,
+    HYD_LAUNCH_PERSISTENT
+} HYD_Launch_mode_t;
+
+
 /* Environment information */
 typedef struct HYD_Env {
     char *env_name;
@@ -139,6 +151,7 @@
     int out;
     int err;
     int exit_status;
+    int control_fd;
     char *proxy_args[HYD_NUM_TMP_STRINGS];      /* Full argument list */
 
     struct HYD_Partition *next;

Modified: mpich2/trunk/src/pm/hydra/include/hydra_utils.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_utils.h	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/include/hydra_utils.h	2009-03-30 03:57:55 UTC (rev 4222)
@@ -217,8 +217,8 @@
 HYD_Status HYDU_sock_set_nonblock(int fd);
 HYD_Status HYDU_sock_set_cloexec(int fd);
 HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed);
-HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, char *buf, int *buf_count,
-                              int *buf_offset, int *closed);
+HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, int stdin_fd, char *buf,
+                              int *buf_count, int *buf_offset, int *closed);
 
 
 /* Memory utilities */

Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -11,6 +11,39 @@
 
 extern HYD_Handle handle;
 
+static HYD_Status close_fd(int fd)
+{
+    struct HYD_Partition *partition;
+    HYD_Status status = HYD_SUCCESS;
+
+    HYDU_FUNC_ENTER();
+
+    /* Deregister the FD with the demux engine and close it. */
+    status = HYD_DMX_deregister_fd(fd);
+    HYDU_ERR_SETANDJUMP1(status, status, "error deregistering fd %d\n", fd);
+    close(fd);
+
+    /* Find the FD in the handle and remove it. */
+    for (partition = handle.partition_list; partition; partition = partition->next) {
+        if (partition->out == fd) {
+            partition->out = -1;
+            goto fn_exit;
+        }
+        if (partition->err == fd) {
+            partition->err = -1;
+            goto fn_exit;
+        }
+    }
+
+fn_exit:
+    HYDU_FUNC_EXIT();
+    return status;
+
+fn_fail:
+    goto fn_exit;
+}
+
+
 HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp)
 {
     int closed;
@@ -24,7 +57,7 @@
                          fd, HYDU_strerror(errno));
 
     if (closed) {
-        status = HYD_CSI_close_fd(fd);
+        status = close_fd(fd);
         HYDU_ERR_SETANDJUMP2(status, status, "socket close error on fd %d: %s\n",
                              fd, HYDU_strerror(errno));
         goto fn_exit;
@@ -51,7 +84,7 @@
                          fd, HYDU_strerror(errno))
 
         if (closed) {
-        status = HYD_CSI_close_fd(fd);
+        status = close_fd(fd);
         HYDU_ERR_SETANDJUMP2(status, status, "socket close error on fd %d (%s)\n",
                              fd, HYDU_strerror(errno));
         goto fn_exit;
@@ -73,12 +106,12 @@
 
     HYDU_FUNC_ENTER();
 
-    status = HYDU_sock_stdin_cb(handle.in, events, handle.stdin_tmp_buf,
+    status = HYDU_sock_stdin_cb(handle.in, events, 0, handle.stdin_tmp_buf,
                                 &handle.stdin_buf_count, &handle.stdin_buf_offset, &closed);
     HYDU_ERR_POP(status, "stdin callback error\n");
 
     if (closed) {
-        status = HYD_CSI_close_fd(fd);
+        status = close_fd(fd);
         HYDU_ERR_SETANDJUMP2(status, status, "socket close error on fd %d (errno: %d)\n",
                              fd, errno);
 

Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -18,13 +18,6 @@
     printf("Usage: ./mpiexec [global opts] [exec1 local opts] : [exec2 local opts] : ...\n\n");
 
     printf("Global Options (passed to all executables):\n");
-    printf("\t--verbose                        [Verbose mode]\n");
-    printf("\t--version                        [Version information]\n");
-    printf("\t--enable-x/--disable-x           [Enable or disable X forwarding]\n");
-    printf("\t--proxy-port                     [Port on which proxies can listen]\n");
-    printf("\t--bootstrap                      [Bootstrap server to use]\n");
-    printf("\t--binding                        [Process binding]");
-
     printf("\t-genv {name} {value}             [Environment variable name and value]\n");
     printf("\t-genvlist {env1,env2,...}        [Environment variable list to pass]\n");
     printf("\t-genvnone                        [Do not pass any environment variables]\n");
@@ -40,10 +33,20 @@
     printf("\t-envlist {env1,env2,...}         [Environment variable list to pass]\n");
     printf("\t-envnone                         [Do not pass any environment variables]\n");
     printf("\t-envall                          [Pass all environment variables (default)]\n");
-    printf
-        ("\t{exec_name} {args}               [Name of the executable to run and its arguments]\n");
+    printf("\t{exec_name} {args}               [Executable name to run and arguments]\n");
 
     printf("\n");
+
+    printf("Hydra specific options (treated as global):\n");
+    printf("\t--verbose                        [Verbose mode]\n");
+    printf("\t--version                        [Version information]\n");
+    printf("\t--enable-x/--disable-x           [Enable or disable X forwarding]\n");
+    printf("\t--proxy-port                     [Port on which proxies can listen]\n");
+    printf("\t--bootstrap                      [Bootstrap server to use]\n");
+    printf("\t--binding                        [Process binding]");
+    printf("\t--boot-proxies                   [Boot proxies to run in persistent mode]\n");
+    printf("\t--shutdown-proxies               [Shutdown persistent mode proxies]\n");
+    printf("\t--use-persistent                 [Use persistent mode proxies to launch]\n");
 }
 
 
@@ -51,7 +54,7 @@
 {
     struct HYD_Partition *partition;
     int exit_status = 0;
-    int timeout;
+    int timeout, stdin_fd;
     HYD_Status status = HYD_SUCCESS;
 
     HYDU_FUNC_ENTER();
@@ -79,10 +82,6 @@
     if (handle.debug)
         HYD_LCHU_print_params();
 
-    handle.stdout_cb = HYD_LCHI_stdout_cb;
-    handle.stderr_cb = HYD_LCHI_stderr_cb;
-    handle.stdin_cb = HYD_LCHI_stdin_cb;
-
     HYDU_time_set(&handle.start, NULL); /* NULL implies right now */
     if (getenv("MPIEXEC_TIMEOUT"))
         timeout = atoi(getenv("MPIEXEC_TIMEOUT"));
@@ -95,6 +94,44 @@
     status = HYD_CSI_launch_procs();
     HYDU_ERR_POP(status, "control system error launching processes\n");
 
+    /* During shutdown, no processes are launched, so there is nothing
+     * to wait for. If the launch command didn't return an error, we
+     * are OK; just return a success. */
+    /* FIXME: We are assuming a working model for the process manager
+     * here. We need to get how many processes the PM has launched
+     * instead of assuming this. For example, it is possible to have a
+     * PM implementation that launches separate "new" proxies on a
+     * different port and kills the original proxies using them. */
+    if (handle.launch_mode == HYD_LAUNCH_SHUTDOWN) {
+        exit_status = 0;
+        goto fn_exit;
+    }
+
+    /* Setup stdout/stderr/stdin handlers */
+    for (partition = handle.partition_list; partition; partition = partition->next) {
+        status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, NULL,
+                                     HYD_LCHI_stdout_cb);
+        HYDU_ERR_POP(status, "demux returned error registering fd\n");
+
+        status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, NULL,
+                                     HYD_LCHI_stderr_cb);
+        HYDU_ERR_POP(status, "demux returned error registering fd\n");
+    }
+
+    status = HYDU_sock_set_nonblock(handle.in);
+    HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+    stdin_fd = 0;
+    status = HYDU_sock_set_nonblock(stdin_fd);
+    HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+    handle.stdin_buf_count = 0;
+    handle.stdin_buf_offset = 0;
+
+    status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, HYD_LCHI_stdin_cb);
+    HYDU_ERR_POP(status, "demux returned error registering fd\n");
+
+
     /* Wait for their completion */
     status = HYD_CSI_wait_for_completion();
     HYDU_ERR_POP(status, "control system error waiting for completion\n");

Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -70,27 +70,28 @@
         }
 
         if (!strcmp(*argv, "--boot-proxies")) {
-            /* FIXME: Prevent usage of incompatible params */
-            handle.bootstrap = HYDU_strdup("ssh");
-            handle.is_proxy_launcher = 1;
-            handle.prop = HYD_ENV_PROP_ALL;
+            HYDU_ERR_CHKANDJUMP(status, handle.launch_mode != HYD_LAUNCH_UNSET,
+                                HYD_INTERNAL_ERROR, "duplicate launch mode\n");
+            handle.launch_mode = HYD_LAUNCH_BOOT;
             continue;
         }
 
-        if (!strcmp(*argv, "--remote-proxy")) {
-            /* FIXME: We should get rid of this option eventually.
-             * This should be the default case. The centralized
-             * version should use an option like "--local-proxy"
-             */
-            handle.is_proxy_remote = 1;
-            handle.prop = HYD_ENV_PROP_ALL;
+        if (!strcmp(*argv, "--shutdown-proxies")) {
+            HYDU_ERR_CHKANDJUMP(status, handle.launch_mode != HYD_LAUNCH_UNSET,
+                                HYD_INTERNAL_ERROR, "duplicate launch mode\n");
+            handle.launch_mode = HYD_LAUNCH_SHUTDOWN;
             continue;
         }
 
-        if (!strcmp(*argv, "--shutdown-proxies")) {
-            handle.is_proxy_remote = 1;
-            handle.is_proxy_terminator = 1;
-            handle.prop = HYD_ENV_PROP_ALL;
+        if (!strcmp(*argv, "--use-persistent") || !strcmp(*argv, "--use-runtime")) {
+            HYDU_ERR_CHKANDJUMP(status, handle.launch_mode != HYD_LAUNCH_UNSET,
+                                HYD_INTERNAL_ERROR, "duplicate launch mode\n");
+
+            if (!strcmp(*argv, "--use-persistent"))
+                handle.launch_mode = HYD_LAUNCH_PERSISTENT;
+            else
+                handle.launch_mode = HYD_LAUNCH_RUNTIME;
+
             continue;
         }
 
@@ -262,17 +263,6 @@
             continue;
         }
 
-        if (!strcmp(str[0], "--pproxy-port")) {
-            if (!str[1]) {
-                INCREMENT_ARGV(status);
-                str[1] = *argv;
-            }
-            HYDU_ERR_CHKANDJUMP(status, handle.pproxy_port != -1, HYD_INTERNAL_ERROR,
-                                "duplicate persistent proxy port\n");
-            handle.pproxy_port = atoi(str[1]);
-            continue;
-        }
-
         if (*argv[0] == '-')
             HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "unrecognized argument\n");
 
@@ -299,46 +289,15 @@
             break;
 
         continue;
-
     }
-    /* In the case of the proxy launcher, aka --boot-proxies, there is no executable
-     * specified */
-    if (handle.is_proxy_launcher || handle.is_proxy_terminator) {
 
-        status = HYD_LCHU_get_current_exec_info(&exec_info);
-        HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
-
-        exec_info->exec[0] = HYDU_strdup(HYD_PROXY_NAME " --persistent");
-        exec_info->exec[1] = NULL;
-        if (exec_info->exec_proc_count == 0)
-            exec_info->exec_proc_count = 1;
-
-        env_name = HYDU_strdup("HYD_PROXY_PORT");
-        env_value = HYDU_int_to_str(handle.pproxy_port);
-
-        status = HYDU_env_create(&env, env_name, env_value);
-        HYDU_ERR_POP(status, "unable to create env struct\n");
-
-        HYDU_append_env_to_list(*env, &exec_info->user_env);
-    }
-
-
+    /* First set all the variables that do not depend on the launch mode */
     tmp = getenv("MPIEXEC_DEBUG");
     if (handle.debug == -1 && tmp)
         handle.debug = atoi(tmp) ? 1 : 0;
     if (handle.debug == -1)
         handle.debug = 0;
 
-    if (handle.exec_info_list == NULL)
-        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no local options set\n");
-
-    if (handle.wdir == NULL) {
-        HYDU_MALLOC(handle.wdir, char *, HYDRA_MAX_PATH, status);
-        if (getcwd(handle.wdir, HYDRA_MAX_PATH) == NULL)
-            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
-                                "allocated space is too small for absolute path\n");
-    }
-
     tmp = getenv("HYDRA_BOOTSTRAP");
     if (handle.bootstrap == NULL && tmp)
         handle.bootstrap = HYDU_strdup(tmp);
@@ -351,43 +310,82 @@
     if (handle.host_file == NULL)
         handle.host_file = HYDU_strdup("HYDRA_USE_LOCALHOST");
 
+    if (handle.proxy_port == -1)
+        handle.proxy_port = HYD_DEFAULT_PROXY_PORT;
+
+    tmp = getenv("HYDRA_LAUNCH_MODE");
+    if (handle.launch_mode == HYD_LAUNCH_UNSET && tmp) {
+        if (!strcmp(tmp, "persistent"))
+            handle.launch_mode = HYD_LAUNCH_PERSISTENT;
+        else if (!strcmp(tmp, "runtime"))
+            handle.launch_mode = HYD_LAUNCH_RUNTIME;
+    }
+    if (handle.launch_mode == HYD_LAUNCH_UNSET)
+        handle.launch_mode = HYD_LAUNCH_RUNTIME;
+
+    /* Get the base path for the proxy */
+    if (handle.wdir == NULL) {
+        HYDU_MALLOC(handle.wdir, char *, HYDRA_MAX_PATH, status);
+        if (getcwd(handle.wdir, HYDRA_MAX_PATH) == NULL)
+            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+                                "allocated space is too small for absolute path\n");
+    }
     status = HYDU_get_base_path(progname, handle.wdir, &handle.base_path);
     HYDU_ERR_POP(status, "unable to get base path\n");
 
-    tmp = getenv("HYDRA_BINDING");
-    if (handle.binding == HYD_BIND_UNSET && tmp)
-        handle.binding = !strcmp(tmp, "none") ? HYD_BIND_NONE :
-            !strcmp(tmp, "rr") ? HYD_BIND_RR :
-            !strcmp(tmp, "buddy") ? HYD_BIND_BUDDY :
-            !strcmp(tmp, "pack") ? HYD_BIND_PACK : HYD_BIND_USER;
-    if (handle.binding == HYD_BIND_UNSET)
-        handle.binding = HYD_BIND_NONE;
+    /* Proxy setup or teardown */
+    if ((handle.launch_mode == HYD_LAUNCH_BOOT) ||
+        (handle.launch_mode == HYD_LAUNCH_SHUTDOWN)) {
 
-    /* Check environment for setting the global environment */
-    tmp = getenv("HYDRA_ENV");
-    if (handle.prop == HYD_ENV_PROP_UNSET && tmp)
-        handle.prop = !strcmp(tmp, "all") ? HYD_ENV_PROP_ALL : HYD_ENV_PROP_NONE;
+        /* NULL out variables we don't care about */
+        HYDU_ERR_CHKANDJUMP(status, handle.prop != HYD_ENV_PROP_UNSET, HYD_INTERNAL_ERROR,
+                            "environment setting in proxy launch mode\n");
+        handle.prop = HYD_ENV_PROP_NONE;
 
-    /* Make sure local executable is set */
-    local_env_set = 0;
-    for (exec_info = handle.exec_info_list; exec_info; exec_info = exec_info->next) {
-        if (exec_info->exec[0] == NULL)
-            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no executable specified\n");
+        HYDU_ERR_CHKANDJUMP(status, handle.binding != HYD_BIND_UNSET, HYD_INTERNAL_ERROR,
+                            "binding setting in proxy launch mode\n");
+        handle.binding = HYD_BIND_UNSET;
 
-        if (exec_info->exec_proc_count == 0)
-            exec_info->exec_proc_count = 1;
-
-        if (exec_info->prop != HYD_ENV_PROP_UNSET)
-            local_env_set = 1;
+        HYDU_ERR_CHKANDJUMP(status, handle.exec_info_list, HYD_INTERNAL_ERROR,
+                            "executables specified in proxy launch mode\n");
     }
+    else { /* Application launch */
+        if (handle.exec_info_list == NULL)
+            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no local options set\n");
 
-    /* If no global or local environment is set, use the default */
-    if ((handle.prop == HYD_ENV_PROP_UNSET) && (local_env_set == 0))
-        handle.prop = HYD_ENV_PROP_ALL;
+        /* Check environment for setting binding */
+        tmp = getenv("HYDRA_BINDING");
+        if (handle.binding == HYD_BIND_UNSET && tmp)
+            handle.binding = !strcmp(tmp, "none") ? HYD_BIND_NONE :
+                !strcmp(tmp, "rr") ? HYD_BIND_RR :
+                !strcmp(tmp, "buddy") ? HYD_BIND_BUDDY :
+                !strcmp(tmp, "pack") ? HYD_BIND_PACK : HYD_BIND_USER;
+        if (handle.binding == HYD_BIND_UNSET)
+            handle.binding = HYD_BIND_NONE;
 
-    if (handle.proxy_port == -1)
-        handle.proxy_port = HYD_DEFAULT_PROXY_PORT;
+        /* Check environment for setting the global environment */
+        tmp = getenv("HYDRA_ENV");
+        if (handle.prop == HYD_ENV_PROP_UNSET && tmp)
+            handle.prop = !strcmp(tmp, "all") ? HYD_ENV_PROP_ALL : HYD_ENV_PROP_NONE;
 
+        /* Make sure local executable is set */
+        local_env_set = 0;
+        for (exec_info = handle.exec_info_list; exec_info; exec_info = exec_info->next) {
+            if (exec_info->exec[0] == NULL)
+                HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no executable specified\n");
+
+            if (exec_info->exec_proc_count == 0)
+                exec_info->exec_proc_count = 1;
+
+            if (exec_info->prop != HYD_ENV_PROP_UNSET)
+                local_env_set = 1;
+        }
+
+        /* If no global or local environment is set, use the default */
+        if ((handle.prop == HYD_ENV_PROP_UNSET) && (local_env_set == 0))
+            handle.prop = HYD_ENV_PROP_ALL;
+    }
+
   fn_exit:
     for (i = 0; i < 4; i++)
         if (str[i])

Modified: mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -14,11 +14,9 @@
 {
     handle.base_path = NULL;
     handle.proxy_port = -1;
-    handle.pproxy_port = HYD_PPROXY_PORT;
+    handle.launch_mode = HYD_LAUNCH_UNSET;
+
     handle.bootstrap = NULL;
-    handle.is_proxy_launcher = 0;
-    handle.is_proxy_terminator = 0;
-    handle.is_proxy_remote = 0;
     handle.binding = HYD_BIND_UNSET;
     handle.user_bind_map = NULL;
 

Added: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h	                        (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h	2009-03-30 03:57:55 UTC (rev 4222)
@@ -0,0 +1,20 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ *  (C) 2008 by Argonne National Laboratory.
+ *      See COPYRIGHT in top-level directory.
+ */
+
+#ifndef PMI_COMMON_H_INCLUDED
+#define PMI_COMMON_H_INCLUDED
+
+/* The set of commands supported */
+enum HYD_PMCD_pmi_proxy_cmds {
+    PROC_INFO,
+    KILL_JOB,
+    PROXY_SHUTDOWN,
+    USE_AS_STDOUT,
+    USE_AS_STDERR,
+    USE_AS_STDIN
+};
+
+#endif /* PMI_COMMON_H_INCLUDED */

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,7 +8,7 @@
 #include "pmi_handle.h"
 #include "pmi_handle_v1.h"
 
-extern HYD_Handle handle;
+HYD_Handle handle;
 HYD_PMCD_pmi_pg_t *pg_list = NULL;
 
 struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_v1;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -11,7 +11,7 @@
 #include "pmi_handle.h"
 #include "pmi_handle_v1.h"
 
-extern HYD_Handle handle;
+HYD_Handle handle;
 HYD_PMCD_pmi_pg_t *pg_list;
 
 /* TODO: abort, create_kvs, destroy_kvs, getbyidx, spawn */

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -12,97 +12,96 @@
 struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 int HYD_PMCD_pmi_proxy_listenfd;
 
-static HYD_Status HYD_PMCD_pmi_pproxy_start(void)
+static HYD_Status wait_for_procs_to_finish(void)
 {
-    /* If this function exits... its always an error */
-    HYD_Status status = HYD_INTERNAL_ERROR;
-    int ret = 0;
-    pid_t proc_id = -1;
-    struct rlimit rl;
+    int i, out_count, err_count, count, pid, ret_status;
+    HYD_Status status = HYD_SUCCESS;
 
-    umask(0);
+    while (1) {
+        /* Wait for some event to occur */
+        status = HYD_DMX_wait_for_event(-1);
+        HYDU_ERR_POP(status, "demux engine error waiting for event\n");
 
-    /* Get the limit of fds */
-    ret = getrlimit(RLIMIT_NOFILE, &rl);
-    if (ret == -1)
-        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "getrlimit() failed (%s)\n",
-                             HYDU_strerror(errno));
+        /* Check to see if there's any open read socket left; if there
+         * are, we will just wait for more events. */
+        out_count = 0;
+        err_count = 0;
+        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
+            if (HYD_PMCD_pmi_proxy_params.out[i] != -1)
+                out_count++;
+            if (HYD_PMCD_pmi_proxy_params.err[i] != -1)
+                err_count++;
 
-    proc_id = fork();
-    if (proc_id > 0) {
-        /* Ignore exit from child proc - persistent pmi proxy */
-        status = HYDU_set_signal(SIGCHLD, SIG_IGN);
-        HYDU_ERR_POP(status, "Setting SIGCHLD handler to SIG_IGN failed\n");
+            if (out_count && err_count)
+                break;
+        }
 
-        /* Parent process exits */
-        if (!HYD_PMCD_pmi_proxy_params.debug)
-            exit(0);
+        if (HYD_PMCD_pmi_proxy_params.procs_are_launched) {
+            if (out_count == 0)
+                close(HYD_PMCD_pmi_proxy_params.out_upstream_fd);
+
+            if (err_count == 0)
+                close(HYD_PMCD_pmi_proxy_params.err_upstream_fd);
+
+            /* We are done */
+            if (!out_count && !err_count)
+                break;
+        }
     }
-    else if (proc_id == 0) {
-        /* Child proc continues */
-        int i;
-        pid_t spid;
-        spid = setsid();
-        if (spid == -1)
-            HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "setsid() failed(%s)\n",
-                                 HYDU_strerror(errno));
 
-        if (!HYD_PMCD_pmi_proxy_params.debug)
-            for (i = 0; i < rl.rlim_max; i++)
-                close(i);
-        /* FIXME: dup(0,1,2) to "/dev/null" */
+    /* FIXME: If we did not break out yet, add a small usleep to yield
+     * CPU here. We can not just sleep for the remaining time, as the
+     * timeout value might be large and the application might exit
+     * much quicker. Note that the sched_yield() call is broken on
+     * newer linux kernel versions and should not be used. */
+    /* Once all the sockets are closed, wait for all the processes to
+     * finish. We poll here, but hopefully not for too long. */
+    do {
+        if (HYD_PMCD_pmi_proxy_params.procs_are_launched == 0)
+            break;
 
-        if (getenv("HYD_PROXY_PORT"))
-            HYD_PMCD_pmi_proxy_params.proxy_port = atoi(getenv("HYD_PROXY_PORT"));
-        else
-            HYD_PMCD_pmi_proxy_params.proxy_port = -1;
+        pid = waitpid(-1, &ret_status, WNOHANG);
 
-        status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
-                                  (uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
-        HYDU_ERR_POP(status, "unable to listen on socket\n");
+        /* Find the pid and mark it as complete. */
+        if (pid > 0)
+            for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+                if (HYD_PMCD_pmi_proxy_params.pid[i] == pid)
+                    HYD_PMCD_pmi_proxy_params.exit_status[i] = WEXITSTATUS(ret_status);
 
-        /* Register the listening socket with the demux engine */
-        status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
-                                     HYD_PMCD_pmi_proxy_listen_cb);
-        HYDU_ERR_POP(status, "unable to register fd\n");
-    }
-    else {
-        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "fork() failed (%s) \n",
-                             HYDU_strerror(errno));
-    }
+        /* Check how many more processes are pending */
+        count = 0;
+        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
+            if (HYD_PMCD_pmi_proxy_params.exit_status[i] == -1) {
+                count++;
+                break;
+            }
+        }
 
+        if (count == 0)
+            break;
 
-    while (1) {
-        status = HYD_DMX_wait_for_event(-1);
+        /* Check if there are any messages from the launcher */
+        status = HYD_DMX_wait_for_event(0);
         HYDU_ERR_POP(status, "demux engine error waiting for event\n");
-    }
+    } while (1);
 
   fn_exit:
     return status;
+
   fn_fail:
     goto fn_exit;
 }
 
 int main(int argc, char **argv)
 {
-    int i, j, arg, count, pid, ret_status;
-    int stdin_fd, process_id, core, pmi_id, rem;
-    char *str;
-    char *client_args[HYD_NUM_TMP_STRINGS];
-    HYD_Env_t *env;
+    int i, ret_status;
     struct HYD_Partition_exec *exec;
     struct HYD_Partition_segment *segment;
     HYD_Status status = HYD_SUCCESS;
 
-    status = HYD_PMCD_pmi_proxy_get_params(argc, argv);
+    status = HYD_PMCD_pmi_proxy_get_params(argv);
     HYDU_ERR_POP(status, "bad parameters passed to the proxy\n");
 
-    if (HYD_PMCD_pmi_proxy_params.is_persistent) {
-        status = HYD_PMCD_pmi_pproxy_start();
-        HYDU_ERR_POP(status, "Error starting persistent PMI proxy\n");
-        goto fn_exit;
-    }
-
     status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
                               (uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
     HYDU_ERR_POP(status, "unable to listen on socket\n");
@@ -120,172 +119,54 @@
      * local processes. That is, we can only have a single-level
      * hierarchy of proxies. */
 
-    HYD_PMCD_pmi_proxy_params.partition_proc_count = 0;
-    for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment; segment = segment->next)
-        HYD_PMCD_pmi_proxy_params.partition_proc_count += segment->proc_count;
+    /* Process launching only happens in the runtime case over here */
+    if (HYD_PMCD_pmi_proxy_params.proxy_type == HYD_PMCD_PMI_PROXY_RUNTIME) {
+        HYD_PMCD_pmi_proxy_params.out_upstream_fd = 1;
+        HYD_PMCD_pmi_proxy_params.err_upstream_fd = 2;
+        HYD_PMCD_pmi_proxy_params.in_upstream_fd = 0;
 
-    HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
-    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
-        HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
+        status = HYD_PMCD_pmi_proxy_launch_procs();
+        HYDU_ERR_POP(status, "unable to launch procs based on proxy handle info\n");
 
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+        /* Now wait for the processes to finish */
+        status = wait_for_procs_to_finish();
+        HYDU_ERR_POP(status, "error waiting for processes to finish\n");
 
-    /* Initialize the exit status */
-    for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
-        HYD_PMCD_pmi_proxy_params.exit_status[i] = -1;
-
-    /* For local spawning, set the global environment here itself */
-    status = HYDU_putenv_list(HYD_PMCD_pmi_proxy_params.global_env);
-    HYDU_ERR_POP(status, "putenv returned error\n");
-
-    status = HYDU_bind_init(HYD_PMCD_pmi_proxy_params.user_bind_map);
-    HYDU_ERR_POP(status, "unable to initialize process binding\n");
-
-    /* Spawn the processes */
-    process_id = 0;
-    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
-        for (i = 0; i < exec->proc_count; i++) {
-
-            pmi_id = ((process_id / HYD_PMCD_pmi_proxy_params.partition_proc_count) *
-                      HYD_PMCD_pmi_proxy_params.one_pass_count);
-            rem = (process_id % HYD_PMCD_pmi_proxy_params.partition_proc_count);
-
-            for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment;
-                 segment = segment->next) {
-                if (rem >= segment->proc_count)
-                    rem -= segment->proc_count;
-                else {
-                    pmi_id += segment->start_pid + rem;
-                    break;
-                }
-            }
-
-            str = HYDU_int_to_str(pmi_id);
-            status = HYDU_env_create(&env, "PMI_ID", str);
-            HYDU_ERR_POP(status, "unable to create env\n");
-            HYDU_FREE(str);
-            status = HYDU_putenv(env);
-            HYDU_ERR_POP(status, "putenv failed\n");
-
-            if (chdir(HYD_PMCD_pmi_proxy_params.wdir) < 0)
-                HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
-                                     "unable to change wdir (%s)\n", HYDU_strerror(errno));
-
-            for (j = 0, arg = 0; exec->exec[j]; j++)
-                client_args[arg++] = HYDU_strdup(exec->exec[j]);
-            client_args[arg++] = NULL;
-
-            core = HYDU_bind_get_core_id(process_id, HYD_PMCD_pmi_proxy_params.binding);
-            if (pmi_id == 0) {
-                status = HYDU_create_process(client_args, exec->prop_env,
-                                             &HYD_PMCD_pmi_proxy_params.in,
-                                             &HYD_PMCD_pmi_proxy_params.out[process_id],
-                                             &HYD_PMCD_pmi_proxy_params.err[process_id],
-                                             &HYD_PMCD_pmi_proxy_params.pid[process_id], core);
-
-                status = HYDU_sock_set_nonblock(HYD_PMCD_pmi_proxy_params.in);
-                HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
-                stdin_fd = 0;
-                status = HYDU_sock_set_nonblock(stdin_fd);
-                HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
-                HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
-                HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
-                status =
-                    HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL,
-                                        HYD_PMCD_pmi_proxy_stdin_cb);
-                HYDU_ERR_POP(status, "unable to register fd\n");
-            }
-            else {
-                status = HYDU_create_process(client_args, exec->prop_env,
-                                             NULL,
-                                             &HYD_PMCD_pmi_proxy_params.out[process_id],
-                                             &HYD_PMCD_pmi_proxy_params.err[process_id],
-                                             &HYD_PMCD_pmi_proxy_params.pid[process_id], core);
-            }
-            HYDU_ERR_POP(status, "spawn process returned error\n");
-
-            process_id++;
-        }
+        ret_status = 0;
+        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+            ret_status |= HYD_PMCD_pmi_proxy_params.exit_status[i];
     }
+    else { /* Persistent mode */
+        do {
+            /* Wait for the processes to finish. If there are no
+             * processes, we will just wait blocking for the work to
+             * arrive. */
+            status = wait_for_procs_to_finish();
+            HYDU_ERR_POP(status, "error waiting for processes to finish\n");
 
-    /* Everything is spawned, now wait for I/O */
-    status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
-                                 HYD_PMCD_pmi_proxy_params.out,
-                                 HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stdout_cb);
-    HYDU_ERR_POP(status, "unable to register fd\n");
+            /* If processes had been launched and terminated, find the
+             * exit status, return it and cleanup everything. */
+            if (HYD_PMCD_pmi_proxy_params.procs_are_launched) {
+                ret_status = 0;
+                for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+                    ret_status |= HYD_PMCD_pmi_proxy_params.exit_status[i];
 
-    status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
-                                 HYD_PMCD_pmi_proxy_params.err,
-                                 HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stderr_cb);
-    HYDU_ERR_POP(status, "unable to register fd\n");
+                /* Send the exit status upstream */
+                status = HYDU_sock_write(HYD_PMCD_pmi_proxy_params.control_fd, &ret_status,
+                                         sizeof(int));
+                HYDU_ERR_POP(status, "unable to return exit status upstream\n");
 
-    while (1) {
-        /* Wait for some event to occur */
-        status = HYD_DMX_wait_for_event(-1);
-        HYDU_ERR_POP(status, "demux engine error waiting for event\n");
+                status = HYD_DMX_deregister_fd(HYD_PMCD_pmi_proxy_params.control_fd);
+                HYDU_ERR_POP(status, "unable to deregister fd\n");
+                close(HYD_PMCD_pmi_proxy_params.control_fd);
 
-        /* Check to see if there's any open read socket left; if there
-         * are, we will just wait for more events. */
-        count = 0;
-        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
-            if (HYD_PMCD_pmi_proxy_params.out[i] != -1 ||
-                HYD_PMCD_pmi_proxy_params.err[i] != -1) {
-                count++;
-                break;
+                /* cleanup the params structure for the next job */
+                status = HYD_PMCD_pmi_proxy_cleanup_params();
+                HYDU_ERR_POP(status, "unable to cleanup params\n");
             }
-        }
-
-        /* We are done */
-        if (!count)
-            break;
+        } while (1);
     }
 
-    /* FIXME: If we did not break out yet, add a small usleep to yield
-     * CPU here. We can not just sleep for the remaining time, as the
-     * timeout value might be large and the application might exit
-     * much quicker. Note that the sched_yield() call is broken on
-     * newer linux kernel versions and should not be used. */
-    /* Once all the sockets are closed, wait for all the processes to
-     * finish. We poll here, but hopefully not for too long. */
-    do {
-        pid = waitpid(-1, &ret_status, WNOHANG);
-
-        /* Find the pid and mark it as complete. */
-        if (pid > 0)
-            for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
-                if (HYD_PMCD_pmi_proxy_params.pid[i] == pid)
-                    HYD_PMCD_pmi_proxy_params.exit_status[i] = WEXITSTATUS(ret_status);
-
-        /* Check how many more processes are pending */
-        count = 0;
-        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
-            if (HYD_PMCD_pmi_proxy_params.exit_status[i] == -1) {
-                count++;
-                break;
-            }
-        }
-
-        if (count == 0)
-            break;
-
-        /* Check if there are any messages from the launcher */
-        status = HYD_DMX_wait_for_event(0);
-        HYDU_ERR_POP(status, "demux engine error waiting for event\n");
-    } while (1);
-
-    ret_status = 0;
-    for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
-        ret_status |= HYD_PMCD_pmi_proxy_params.exit_status[i];
-
   fn_exit:
     if (status != HYD_SUCCESS)
         return -1;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h	2009-03-30 03:57:55 UTC (rev 4222)
@@ -9,12 +9,19 @@
 
 #include "hydra_base.h"
 #include "hydra_utils.h"
+#include "pmi_common.h"
 
+typedef enum {
+    HYD_PMCD_PMI_PROXY_UNSET,
+    HYD_PMCD_PMI_PROXY_RUNTIME,
+    HYD_PMCD_PMI_PROXY_PERSISTENT
+} HYD_PMCD_pmi_proxy_type;
+
 struct HYD_PMCD_pmi_proxy_params {
     int debug;
 
     int proxy_port;
-    int is_persistent;
+    HYD_PMCD_pmi_proxy_type proxy_type;
     char *wdir;
     HYD_Binding binding;
     char *user_bind_map;
@@ -25,16 +32,22 @@
     int partition_proc_count;
     int exec_proc_count;
 
+    int procs_are_launched;
+
     /* Process segmentation information for this partition */
     struct HYD_Partition_segment *segment_list;
     struct HYD_Partition_exec *exec_list;
 
+    int out_upstream_fd;
+    int err_upstream_fd;
+    int in_upstream_fd;
+    int control_fd;
+
     int *pid;
     int *out;
     int *err;
     int *exit_status;
     int in;
-    int rproxy_connfd;
 
     int stdin_buf_offset;
     int stdin_buf_count;
@@ -44,17 +57,15 @@
 extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 extern int HYD_PMCD_pmi_proxy_listenfd;
 
-HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
-HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
-HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv);
-HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
-                                               int *key_lenp, char **valp, int *val_lenp,
-                                               char separator);
-HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len);
-HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len);
+/* utils */
+HYD_Status HYD_PMCD_pmi_proxy_get_params(char **t_argv);
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(void);
+HYD_Status HYD_PMCD_pmi_proxy_procinfo(int fd);
+HYD_Status HYD_PMCD_pmi_proxy_launch(void);
+void HYD_PMCD_pmi_proxy_killjob(void);
+
+/* callback */
 HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp);
-HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp);
-HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp);
 HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events, void *userp);
 HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events, void *userp);
 HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events, void *userp);

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,15 +8,14 @@
 #include "hydra_utils.h"
 #include "pmi_proxy.h"
 #include "demux.h"
-#include "pmi_serv.h"
 
-extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 int HYD_PMCD_pmi_proxy_listenfd;
 
 HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp)
 {
-    int accept_fd, cmd_len;
-    char cmd[HYD_PMCD_MAX_CMD_LEN];
+    int accept_fd = -1, cmd_len;
+    enum HYD_PMCD_pmi_proxy_cmds cmd;
     HYD_Status status = HYD_SUCCESS;
 
     HYDU_FUNC_ENTER();
@@ -28,13 +27,13 @@
         status = HYDU_sock_accept(fd, &accept_fd);
         HYDU_ERR_POP(status, "accept error\n");
 
-        status =
-            HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_listen_cb);
+        status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL,
+                                     HYD_PMCD_pmi_proxy_listen_cb);
         HYDU_ERR_POP(status, "unable to register fd\n");
     }
     else {      /* We got a command from mpiexec */
-        status = HYDU_sock_readline(fd, cmd, HYD_PMCD_MAX_CMD_LEN, &cmd_len);
-        HYDU_ERR_POP(status, "Error reading command from proxy");
+        status = HYDU_sock_read(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds), &cmd_len);
+        HYDU_ERR_POP(status, "error reading command from launcher\n");
         if (cmd_len == 0) {
             /* The connection has closed */
             status = HYD_DMX_deregister_fd(fd);
@@ -42,82 +41,51 @@
             close(fd);
             goto fn_exit;
         }
-        status = HYD_PMCD_pmi_proxy_handle_cmd(fd, cmd, cmd_len);
-        HYDU_ERR_POP(status, "Error handling proxy command\n");
-    }
 
-  fn_exit:
-    HYDU_FUNC_EXIT();
-    return status;
-  fn_fail:
-    goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp)
-{
-    int closed, i;
-    HYD_Status status = HYD_SUCCESS;
-    struct HYD_PMCD_pmi_proxy_params *proxy_params;
-
-    HYDU_FUNC_ENTER();
-    proxy_params = (struct HYD_PMCD_pmi_proxy_params *) userp;
-
-    status = HYDU_sock_stdout_cb(fd, events, proxy_params->rproxy_connfd, &closed);
-    HYDU_ERR_POP(status, "stdout callback error\n");
-
-    if (closed) {
-        int all_procs_exited = 1;
-        /* The process exited */
-        status = HYD_DMX_deregister_fd(fd);
-        HYDU_ERR_POP(status, "unable to deregister fd\n");
-
-        /* FIXME: This could be a perf killer if we have a lot of procs associated with
-         * the same job on a single proxy
-         */
-        for (i = 0; i < proxy_params->exec_proc_count; i++) {
-            int ret_status = 0;
-            if (proxy_params->out[i] == fd) {
-                waitpid(proxy_params->pid[i], &ret_status, WUNTRACED);
-                close(proxy_params->in);
-                proxy_params->out[i] = -1;
-                proxy_params->err[i] = -1;
-            }
-            if (proxy_params->out[i] != -1)
-                all_procs_exited = 0;
+        if (cmd == PROC_INFO) {
+            status = HYD_PMCD_pmi_proxy_procinfo(fd);
         }
-        if (all_procs_exited) {
-            close(proxy_params->rproxy_connfd);
-            status = HYD_DMX_deregister_fd(proxy_params->rproxy_connfd);
-            HYDU_ERR_POP(status, "Error deregistering remote job conn fd\n");
-            status = HYD_PMCD_pmi_proxy_cleanup_params(proxy_params);
-            HYDU_ERR_POP(status, "Error cleaning up proxy params\n");
+        else if (cmd == USE_AS_STDOUT) {
+            HYD_PMCD_pmi_proxy_params.out_upstream_fd = fd;
+            status = HYD_DMX_deregister_fd(fd);
+            HYDU_ERR_POP(status, "unable to deregister fd\n");
         }
-    }
+        else if (cmd == USE_AS_STDERR) {
+            HYD_PMCD_pmi_proxy_params.err_upstream_fd = fd;
+            status = HYD_DMX_deregister_fd(fd);
+            HYDU_ERR_POP(status, "unable to deregister fd\n");
+        }
+        else if (cmd == USE_AS_STDIN) {
+            HYD_PMCD_pmi_proxy_params.in_upstream_fd = fd;
+            status = HYD_DMX_deregister_fd(fd);
+            HYDU_ERR_POP(status, "unable to deregister fd\n");
+        }
+        else if (cmd == KILL_JOB) {
+            HYD_PMCD_pmi_proxy_killjob();
+            status = HYD_SUCCESS;
+        }
+        else if (cmd == PROXY_SHUTDOWN) {
+            /* FIXME: shutdown should be handled more cleanly. That
+             * is, check if there are other processes still running
+             * and kill them before exiting. */
+            exit(-1);
+        }
+        else {
+            status = HYD_INTERNAL_ERROR;
+        }
 
-  fn_exit:
-    HYDU_FUNC_EXIT();
-    return status;
+        HYDU_ERR_POP(status, "error handling proxy command\n");
 
-  fn_fail:
-    goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp)
-{
-    int closed = 0, i;
-    HYD_Status status = HYD_SUCCESS;
-
-    HYDU_FUNC_ENTER();
-    /* FIXME: This cb should take care of the commands from mpiexec */
-
-    if (closed) {
-        /* The connection has closed */
-        status = HYD_DMX_deregister_fd(fd);
-        HYDU_ERR_POP(status, "unable to deregister fd\n");
-
-        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
-            if (HYD_PMCD_pmi_proxy_params.out[i] == fd)
-                HYD_PMCD_pmi_proxy_params.out[i] = -1;
+        /* One of these commands can trigger the start of the
+         * application since they can arrive in any order. */
+        if ((cmd == PROC_INFO) || (cmd == USE_AS_STDOUT) || (cmd == USE_AS_STDERR) ||
+            (cmd == USE_AS_STDIN))
+            if ((HYD_PMCD_pmi_proxy_params.segment_list != NULL) &&
+                (HYD_PMCD_pmi_proxy_params.out_upstream_fd != -1) &&
+                (HYD_PMCD_pmi_proxy_params.err_upstream_fd != -1))
+                if ((HYD_PMCD_pmi_proxy_params.segment_list->start_pid != 0) ||
+                    (HYD_PMCD_pmi_proxy_params.in_upstream_fd != -1))
+                    HYD_PMCD_pmi_proxy_launch_procs();
     }
 
   fn_exit:
@@ -135,7 +103,8 @@
 
     HYDU_FUNC_ENTER();
 
-    status = HYDU_sock_stdout_cb(fd, events, 1, &closed);
+    status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.out_upstream_fd,
+                                 &closed);
     HYDU_ERR_POP(status, "stdout callback error\n");
 
     if (closed) {
@@ -164,7 +133,8 @@
 
     HYDU_FUNC_ENTER();
 
-    status = HYDU_sock_stdout_cb(fd, events, 2, &closed);
+    status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.err_upstream_fd,
+                                 &closed);
     HYDU_ERR_POP(status, "stdout callback error\n");
 
     if (closed) {
@@ -193,7 +163,9 @@
 
     HYDU_FUNC_ENTER();
 
+    /* FIXME: HYD_PMCD_pmi_proxy_params.in_upstream_fd needs to be passed in */
     status = HYDU_sock_stdin_cb(HYD_PMCD_pmi_proxy_params.in, events,
+                                HYD_PMCD_pmi_proxy_params.in_upstream_fd,
                                 HYD_PMCD_pmi_proxy_params.stdin_tmp_buf,
                                 &HYD_PMCD_pmi_proxy_params.stdin_buf_count,
                                 &HYD_PMCD_pmi_proxy_params.stdin_buf_offset, &closed);
@@ -205,8 +177,6 @@
         HYDU_ERR_POP(status, "unable to deregister fd\n");
 
         close(HYD_PMCD_pmi_proxy_params.in);
-        close(fd);
-
         HYD_PMCD_pmi_proxy_params.in = -1;
     }
 

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -5,338 +5,58 @@
  */
 
 #include "pmi_proxy.h"
-#include "pmi_serv.h"
 #include "demux.h"
 #include "hydra_utils.h"
 
 struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 
-HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
-                                               int *key_lenp, char **valp, int *val_lenp,
-                                               char separator)
+static HYD_Status init_params()
 {
-    char *p = NULL;
-    int len = 0;
-    int klen = 0;
-    int vlen = 0;
-
     HYD_Status status = HYD_SUCCESS;
 
-    p = *bufp;
-    len = *buf_lenp;
+    HYD_PMCD_pmi_proxy_params.debug = 0;
 
-    while (len && isspace(*p)) {
-        p++;
-        len--;
-    }
-    if (len <= 0)
-        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+    HYD_PMCD_pmi_proxy_params.proxy_port = -1;
+    HYD_PMCD_pmi_proxy_params.proxy_type = HYD_PMCD_PMI_PROXY_UNSET;
+    HYD_PMCD_pmi_proxy_params.wdir = NULL;
+    HYD_PMCD_pmi_proxy_params.binding = HYD_BIND_UNSET;
+    HYD_PMCD_pmi_proxy_params.user_bind_map = NULL;
 
-    *keyp = p;
-    klen = 0;
-    while (len && (*p != '=')) {
-        p++;
-        len--;
-        klen++;
-    }
-    if (key_lenp)
-        *key_lenp = klen;
-    p++;
-    len--;
-    if (len <= 0)
-        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+    HYD_PMCD_pmi_proxy_params.global_env = NULL;
 
-    *valp = p;
-    vlen = 0;
-    /* FIXME: Allow escaping ';' */
-    while (len && (*p != separator)) {
-        p++;
-        len--;
-        vlen++;
-    }
-    if (val_lenp)
-        *val_lenp = vlen;
-    p++;
-    len--;
-    if (len < 0)
-        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
-
-    while (len && isspace(*p)) {
-        p++;
-        len--;
-    }
-    /* p now points to the next key or the end of string */
-    *bufp = p;
-    if (*p != '\0') {
-        /* Remaining length of buffer to be processed */
-        *buf_lenp = len;
-    }
-    else {
-        /* End of string - no more keyvals */
-        *buf_lenp = 0;
-    }
-
-  fn_exit:
-    return status;
-  fn_fail:
-    goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len)
-{
-    char *key, *val;
-    int i = 0, key_len = 0, val_len = 0, core = 0;
-    struct HYD_Partition_exec *exec = NULL;
-    HYD_Env_t *env = NULL;
-    HYD_Status status = HYD_SUCCESS;
-
-    /* FIXME: We currently support only one job - We need a list of proxy params for multiple jobs */
-    status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
-    HYDU_ERR_POP(status, "Error initializing proxy params\n");
-
-    HYD_PMCD_pmi_proxy_params.rproxy_connfd = job_connfd;
-
-    status = HYD_DMX_deregister_fd(job_connfd);
-    HYDU_ERR_POP(status, "Unable to deregister job conn fd\n");
-    status =
-        HYD_DMX_register_fd(1, &job_connfd, HYD_STDIN, (void *) &HYD_PMCD_pmi_proxy_params,
-                            HYD_PMCD_pmi_proxy_remote_cb);
-    HYDU_ERR_POP(status, "Unable to register job conn fd\n");
-
-    status = HYDU_alloc_partition_exec(&HYD_PMCD_pmi_proxy_params.exec_list);
-    HYDU_ERR_POP(status, "unable to allocate partition exec\n");
-
-    exec = HYD_PMCD_pmi_proxy_params.exec_list;
-
-    while (cmd_len > 0) {
-        status =
-            HYD_PMCD_pmi_proxy_get_next_keyvalp(&launch_cmd, &cmd_len, &key, &key_len, &val,
-                                                &val_len, HYD_PMCD_CMD_SEP_CHAR);
-        HYDU_ERR_POP(status, "Unable to get next key from launch command\n");
-
-        /* FIXME: Use pre-defined macros for keys */
-        if (!strncmp(key, "exec_name", key_len)) {
-            HYDU_MALLOC(exec->exec[0], char *, (val_len + 1), status);
-            HYDU_ERR_POP(status, "Error allocating memory for executable name\n");
-            HYDU_snprintf(exec->exec[0], val_len, "%s", val);
-            exec->exec[1] = NULL;
-        }
-        else if (!strncmp(key, "exec_cnt", key_len)) {
-            for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec->next; exec = exec->next);
-            exec->proc_count = atoi(val);
-        }
-        else if (!strncmp(key, "env", key_len)) {
-            char *env_str;
-            int env_str_len;
-
-            env_str = val;
-            env_str_len = val_len;
-            exec->prop_env = NULL;
-            while (env_str_len > 0) {
-                status =
-                    HYD_PMCD_pmi_proxy_get_next_keyvalp(&env_str, &env_str_len, &key, &key_len,
-                                                        &val, &val_len,
-                                                        HYD_PMCD_CMD_ENV_SEP_CHAR);
-                HYDU_ERR_POP(status,
-                             "Error getting next environment variable from launch command\n");
-
-                HYDU_MALLOC(env, HYD_Env_t *, sizeof(HYD_Env_t), status);
-                HYDU_ERR_POP(status,
-                             "Error allocating memory for environment variable in proxy params\n");
-
-                HYDU_MALLOC(env->env_name, char *, key_len + 1, status);
-                HYDU_ERR_POP(status,
-                             "Error allocating memory for environment variable in proxy params\n");
-                HYDU_snprintf(env->env_name, key_len + 1, "%s", key);
-
-                HYDU_MALLOC(env->env_value, char *, val_len + 1, status);
-                HYDU_ERR_POP(status,
-                             "Error allocating memory for environment variable in proxy params\n");
-                HYDU_snprintf(env->env_value, val_len + 1, "%s", val);
-                env->next = exec->prop_env;
-                exec->prop_env = env;
-            }
-        }
-        else {
-            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
-                                "Unrecognized key in launch command\n");
-        }
-
-        /* FIXME: Set these ... */
-        /* HYD_PMCD_pmi_proxy_params.wdir =
-         * HYD_PMCD_pmi_proxy_params.binding =
-         * HYD_PMCD_pmi_proxy_params.user_bind_map = ;
-         * HYDU_append_env_to_list(*env, &HYD_PMCD_pmi_proxy_params.global_env);
-         * HYD_PMCD_pmi_proxy_params.one_pass_count
-         * status = HYDU_alloc_partition_segment(&segment->next);
-         * segment->proc_count = ;
-         * segment->start_pid = ;
-         */
-    }
-
+    HYD_PMCD_pmi_proxy_params.one_pass_count = 0;
+    HYD_PMCD_pmi_proxy_params.partition_proc_count = 0;
     HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
-    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
-        HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
 
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
-    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
-                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+    HYD_PMCD_pmi_proxy_params.procs_are_launched = 0;
 
-    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
-        for (i = 0; i < exec->proc_count; i++) {
-            char *str = NULL;
-            core = 0;
-            env = NULL;
-            /* FIXME: Use the start pmi_id from launch command */
-            str = HYDU_int_to_str(i);
-            status = HYDU_env_create(&env, "PMI_ID", str);
-            HYDU_ERR_POP(status, "unable to create env\n");
-            status = HYDU_putenv(env);
-            HYDU_ERR_POP(status, "putenv failed\n");
-            status = HYDU_create_process(&exec->exec[0], exec->prop_env, NULL,
-                                         &HYD_PMCD_pmi_proxy_params.out[i],
-                                         &HYD_PMCD_pmi_proxy_params.err[i],
-                                         &HYD_PMCD_pmi_proxy_params.pid[i], core);
-            HYDU_ERR_POP(status, "Error launching process\n");
-            status =
-                HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_params.out[i], HYD_STDOUT,
-                                    (void *) &HYD_PMCD_pmi_proxy_params,
-                                    HYD_PMCD_pmi_proxy_rstdout_cb);
-            HYDU_ERR_POP(status, "Error registering process stdout\n");
-        }
-    }
+    HYD_PMCD_pmi_proxy_params.segment_list = NULL;
+    HYD_PMCD_pmi_proxy_params.exec_list = NULL;
 
-  fn_exit:
-    return status;
-  fn_fail:
-    goto fn_exit;
-}
+    HYD_PMCD_pmi_proxy_params.out_upstream_fd = -1;
+    HYD_PMCD_pmi_proxy_params.err_upstream_fd = -1;
+    HYD_PMCD_pmi_proxy_params.in_upstream_fd = -1;
+    HYD_PMCD_pmi_proxy_params.control_fd = -1;
 
-/* Handle proxy commands */
-HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len)
-{
-    char *key = NULL;
-    char *cmd_name = NULL;
-    int i = 0, key_len = 0, cmd_name_len = 0;
-    char *cmdp = NULL;
-    HYD_Status status = HYD_SUCCESS;
+    HYD_PMCD_pmi_proxy_params.pid = NULL;
+    HYD_PMCD_pmi_proxy_params.out = NULL;
+    HYD_PMCD_pmi_proxy_params.err = NULL;
+    HYD_PMCD_pmi_proxy_params.exit_status = NULL;
+    HYD_PMCD_pmi_proxy_params.in = -1;
 
-    cmdp = cmd;
-    /* First key/val is the command name */
-    status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&cmdp, &cmd_len, &key, &key_len, &cmd_name,
-                                                 &cmd_name_len, HYD_PMCD_CMD_SEP_CHAR);
-    HYDU_ERR_POP(status, "Error retreiving command name from command\n");
+    HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
+    HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
+    HYD_PMCD_pmi_proxy_params.stdin_tmp_buf[0] = '\0';
 
-    if (!strncmp(cmd_name, HYD_PMCD_CMD_KILLALL_PROCS, key_len)) {
-        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
-            if (HYD_PMCD_pmi_proxy_params.pid[i] != -1)
-                kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
-
-        status = HYD_DMX_deregister_fd(fd);
-        HYDU_ERR_POP(status, "unable to register fd\n");
-        close(fd);
-    }
-    else if (!strncmp(cmd_name, HYD_PMCD_CMD_LAUNCH_PROCS, key_len)) {
-        status = HYD_PMCD_pmi_proxy_handle_launch_cmd(fd, cmdp, cmd_len);
-        HYDU_ERR_POP(status, "Unable to handle launch command\n");
-    }
-    else if (!strncmp(cmd_name, HYD_PMCD_CMD_SHUTDOWN, key_len)) {
-        /* FIXME: Not a clean shutdown... Kill all procs before exiting */
-        status = HYD_DMX_deregister_fd(fd);
-        HYDU_ERR_POP(status, "unable to register fd\n");
-        close(fd);
-        exit(0);
-    }
-    else {
-        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
-                            "got unrecognized command from mpiexec\n");
-    }
-  fn_exit:
     return status;
-  fn_fail:
-    goto fn_exit;
 }
 
-/* Initialize proxy params */
-HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params)
+/* FIXME: This function performs minimal error checking as it is not
+ * supposed to be called by the user, but rather by the process
+ * management server. It will still be helpful for debugging to add
+ * some error checks. */
+static HYD_Status parse_params(char **t_argv)
 {
-    HYD_Status status = HYD_SUCCESS;
-    proxy_params->debug = 0;
-    proxy_params->proxy_port = -1;
-    proxy_params->is_persistent = 0;
-    proxy_params->wdir = NULL;
-    proxy_params->binding = HYD_BIND_UNSET;
-    proxy_params->user_bind_map = NULL;
-    proxy_params->global_env = NULL;
-    proxy_params->one_pass_count = 0;
-    proxy_params->partition_proc_count = 0;
-    proxy_params->exec_proc_count = 0;
-    proxy_params->segment_list = NULL;
-    proxy_params->exec_list = NULL;
-    proxy_params->pid = NULL;
-    proxy_params->out = NULL;
-    proxy_params->err = NULL;
-    proxy_params->exit_status = NULL;
-    proxy_params->in = -1;
-    proxy_params->rproxy_connfd = -1;
-    proxy_params->stdin_buf_offset = 0;
-    proxy_params->stdin_buf_count = 0;
-    proxy_params->stdin_tmp_buf[0] = '\0';
-    return status;
-}
-
-/* Cleanup proxy params after use */
-HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params * proxy_params)
-{
-    HYD_Status status = HYD_SUCCESS;
-    if (proxy_params->wdir != NULL)
-        HYDU_FREE(proxy_params->wdir);
-    if (proxy_params->user_bind_map != NULL)
-        HYDU_FREE(proxy_params->user_bind_map);
-    if (proxy_params->global_env != NULL) {
-        HYD_Env_t *p, *q;
-        do {
-            p = proxy_params->global_env;
-            q = p->next;
-            HYDU_FREE(p);
-        } while (q);
-    }
-    if (proxy_params->segment_list != NULL) {
-        /* FIXME : incomplete */
-    }
-    if (proxy_params->exec_list != NULL) {
-        struct HYD_Partition_exec *p, *q;
-        do {
-            p = proxy_params->exec_list;
-            q = p->next;
-            HYDU_FREE(p);
-        } while (q);
-    }
-    if (proxy_params->pid != NULL)
-        HYDU_FREE(proxy_params->pid);
-    if (proxy_params->out != NULL)
-        HYDU_FREE(proxy_params->out);
-    if (proxy_params->err != NULL)
-        HYDU_FREE(proxy_params->err);
-    if (proxy_params->exit_status != NULL)
-        HYDU_FREE(proxy_params->exit_status);
-
-    status = HYD_PMCD_pmi_proxy_init_params(proxy_params);
-    HYDU_ERR_POP(status, "Error initializing proxy params\n");
-
-  fn_exit:
-    return status;
-  fn_fail:
-    goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv)
-{
     char **argv = t_argv, *str;
     int arg, i, count;
     HYD_Env_t *env;
@@ -346,18 +66,12 @@
 
     HYDU_FUNC_ENTER();
 
-    status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
-    HYDU_ERR_POP(status, "Error initializing proxy params\n");
-
-    while (*argv) {
-        ++argv;
-        if (*argv == NULL)
-            break;
-
+    while (++argv && *argv) {
         if (!strcmp(*argv, "--verbose")) {
             HYD_PMCD_pmi_proxy_params.debug = 1;
             continue;
         }
+
         /* Proxy port */
         if (!strcmp(*argv, "--proxy-port")) {
             argv++;
@@ -365,11 +79,6 @@
             continue;
         }
 
-        if (!strcmp(*argv, "--persistent")) {
-            HYD_PMCD_pmi_proxy_params.is_persistent = 1;
-            continue;
-        }
-
         /* Working directory */
         if (!strcmp(*argv, "--wdir")) {
             argv++;
@@ -508,6 +217,9 @@
         /* If we already touched the next --exec, step back */
         if (*argv && !strcmp(*argv, "--exec"))
             argv--;
+
+        if (!(*argv))
+            break;
     }
 
   fn_exit:
@@ -517,3 +229,313 @@
   fn_fail:
     goto fn_exit;
 }
+
+
+HYD_Status HYD_PMCD_pmi_proxy_get_params(char **t_argv)
+{
+    char **argv = t_argv;
+    HYD_Status status = HYD_SUCCESS;
+
+    HYDU_FUNC_ENTER();
+
+    status = init_params();
+    HYDU_ERR_POP(status, "Error initializing proxy params\n");
+
+    /* For the persistent mode, the parameters are fairly
+     * straightward. For the runtime mode, we call the parse_params()
+     * function to parse through argv and fill in the proxy handle. */
+    ++argv;
+    if (!strcmp(*argv, "--persistent-mode")) {
+        HYD_PMCD_pmi_proxy_params.proxy_type = HYD_PMCD_PMI_PROXY_PERSISTENT;
+
+        /* the next argument should be proxy port */
+        ++argv;
+        if (strcmp(*argv, "--proxy-port"))
+            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "incorrect proxy parameters\n");
+
+        ++argv;
+        HYD_PMCD_pmi_proxy_params.proxy_port = atoi(*argv);
+    }
+    else {
+        HYD_PMCD_pmi_proxy_params.proxy_type = HYD_PMCD_PMI_PROXY_RUNTIME;
+        status = parse_params(t_argv);
+        HYDU_ERR_POP(status, "error parsing proxy params\n");
+    }
+
+  fn_exit:
+    HYDU_FUNC_EXIT();
+    return status;
+
+  fn_fail:
+    goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(void)
+{
+    struct HYD_Partition_segment *segment, *tsegment;
+    struct HYD_Partition_exec *exec, *texec;
+    HYD_Status status = HYD_SUCCESS;
+
+    HYDU_FUNC_ENTER();
+
+    if (HYD_PMCD_pmi_proxy_params.wdir)
+        HYDU_FREE(HYD_PMCD_pmi_proxy_params.wdir);
+
+    if (HYD_PMCD_pmi_proxy_params.user_bind_map)
+        HYDU_FREE(HYD_PMCD_pmi_proxy_params.user_bind_map);
+
+    if (HYD_PMCD_pmi_proxy_params.global_env)
+        HYDU_env_free_list(HYD_PMCD_pmi_proxy_params.global_env);
+
+    if (HYD_PMCD_pmi_proxy_params.segment_list) {
+        segment = HYD_PMCD_pmi_proxy_params.segment_list;
+        while (segment) {
+            tsegment = segment->next;
+            if (segment->mapping) {
+                HYDU_free_strlist(segment->mapping);
+                HYDU_FREE(segment->mapping);
+            }
+            HYDU_FREE(segment);
+            segment = tsegment;
+        }
+    }
+
+    if (HYD_PMCD_pmi_proxy_params.exec_list) {
+        exec = HYD_PMCD_pmi_proxy_params.exec_list;
+        while (exec) {
+            texec = exec->next;
+            HYDU_free_strlist(exec->exec);
+            if (exec->prop_env)
+                HYDU_env_free(exec->prop_env);
+            HYDU_FREE(exec);
+            exec = texec;
+        }
+    }
+
+    if (HYD_PMCD_pmi_proxy_params.pid)
+        HYDU_FREE(HYD_PMCD_pmi_proxy_params.pid);
+
+    if (HYD_PMCD_pmi_proxy_params.out)
+        HYDU_FREE(HYD_PMCD_pmi_proxy_params.out);
+
+    if (HYD_PMCD_pmi_proxy_params.err)
+        HYDU_FREE(HYD_PMCD_pmi_proxy_params.err);
+
+    if (HYD_PMCD_pmi_proxy_params.exit_status)
+        HYDU_FREE(HYD_PMCD_pmi_proxy_params.exit_status);
+
+    /* Reinitialize all params to set everything to "NULL" or
+     * equivalent. */
+    status = init_params();
+    HYDU_ERR_POP(status, "unable to initialize params\n");
+
+  fn_exit:
+    HYDU_FUNC_EXIT();
+    return status;
+
+  fn_fail:
+    goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_procinfo(int fd)
+{
+    char **arglist;
+    int num_strings, str_len, recvd, i;
+    HYD_Status status = HYD_SUCCESS;
+
+    HYDU_FUNC_ENTER();
+
+    /* Read information about the application to launch into a string
+     * array and call parse_params() to interpret it and load it into
+     * the proxy handle. */
+    status = HYDU_sock_read(fd, &num_strings, sizeof(int), &recvd);
+    HYDU_ERR_POP(status, "error reading data from upstream\n");
+
+    HYDU_MALLOC(arglist, char **, num_strings * sizeof(char *), status);
+
+    for (i = 0; i < num_strings; i++) {
+        status = HYDU_sock_read(fd, &str_len, sizeof(int), &recvd);
+        HYDU_ERR_POP(status, "error reading data from upstream\n");
+
+        HYDU_MALLOC(arglist[i], char *, str_len, status);
+
+        status = HYDU_sock_read(fd, arglist[i], str_len, &recvd);
+        HYDU_ERR_POP(status, "error reading data from upstream\n");
+    }
+    arglist[num_strings] = NULL;
+
+    /* Get the parser to fill in the proxy params structure. */
+    status = parse_params(arglist);
+    HYDU_ERR_POP(status, "unable to parse argument list\n");
+
+    /* Save this fd as we need to send back the exit status on
+     * this. */
+    HYD_PMCD_pmi_proxy_params.control_fd = fd;
+
+  fn_exit:
+    HYDU_FUNC_EXIT();
+    return status;
+
+  fn_fail:
+    goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_launch_procs()
+{
+    int i, j, arg, stdin_fd, process_id, core, pmi_id, rem;
+    char *str;
+    char *client_args[HYD_NUM_TMP_STRINGS];
+    HYD_Env_t *env;
+    struct HYD_Partition_segment *segment;
+    struct HYD_Partition_exec *exec;
+    HYD_Status status = HYD_SUCCESS;
+
+    HYDU_FUNC_ENTER();
+
+    HYD_PMCD_pmi_proxy_params.partition_proc_count = 0;
+    for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment;
+         segment = segment->next)
+        HYD_PMCD_pmi_proxy_params.partition_proc_count += segment->proc_count;
+
+    HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
+    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
+        HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
+
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+
+    /* Initialize the exit status */
+    for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+        HYD_PMCD_pmi_proxy_params.exit_status[i] = -1;
+
+    /* For local spawning, set the global environment here itself */
+    status = HYDU_putenv_list(HYD_PMCD_pmi_proxy_params.global_env);
+    HYDU_ERR_POP(status, "putenv returned error\n");
+
+    status = HYDU_bind_init(HYD_PMCD_pmi_proxy_params.user_bind_map);
+    HYDU_ERR_POP(status, "unable to initialize process binding\n");
+
+    /* Spawn the processes */
+    process_id = 0;
+    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
+        for (i = 0; i < exec->proc_count; i++) {
+
+            pmi_id = ((process_id / HYD_PMCD_pmi_proxy_params.partition_proc_count) *
+                      HYD_PMCD_pmi_proxy_params.one_pass_count);
+            rem = (process_id % HYD_PMCD_pmi_proxy_params.partition_proc_count);
+
+            for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment;
+                 segment = segment->next) {
+                if (rem >= segment->proc_count)
+                    rem -= segment->proc_count;
+                else {
+                    pmi_id += segment->start_pid + rem;
+                    break;
+                }
+            }
+
+            str = HYDU_int_to_str(pmi_id);
+            status = HYDU_env_create(&env, "PMI_ID", str);
+            HYDU_ERR_POP(status, "unable to create env\n");
+            HYDU_FREE(str);
+            status = HYDU_putenv(env);
+            HYDU_ERR_POP(status, "putenv failed\n");
+
+            if (chdir(HYD_PMCD_pmi_proxy_params.wdir) < 0)
+                HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
+                                     "unable to change wdir (%s)\n",
+                                     HYDU_strerror(errno));
+
+            for (j = 0, arg = 0; exec->exec[j]; j++)
+                client_args[arg++] = HYDU_strdup(exec->exec[j]);
+            client_args[arg++] = NULL;
+
+            core = HYDU_bind_get_core_id(process_id, HYD_PMCD_pmi_proxy_params.binding);
+            if (pmi_id == 0) {
+                status = HYDU_create_process(client_args, exec->prop_env,
+                                             &HYD_PMCD_pmi_proxy_params.in,
+                                             &HYD_PMCD_pmi_proxy_params.out[process_id],
+                                             &HYD_PMCD_pmi_proxy_params.err[process_id],
+                                             &HYD_PMCD_pmi_proxy_params.pid[process_id],
+                                             core);
+
+                status = HYDU_sock_set_nonblock(HYD_PMCD_pmi_proxy_params.in);
+                HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+                stdin_fd = 0;
+                status = HYDU_sock_set_nonblock(stdin_fd);
+                HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+                HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
+                HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
+                status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL,
+                                             HYD_PMCD_pmi_proxy_stdin_cb);
+                HYDU_ERR_POP(status, "unable to register fd\n");
+            }
+            else {
+                status = HYDU_create_process(client_args, exec->prop_env,
+                                             NULL,
+                                             &HYD_PMCD_pmi_proxy_params.out[process_id],
+                                             &HYD_PMCD_pmi_proxy_params.err[process_id],
+                                             &HYD_PMCD_pmi_proxy_params.pid[process_id],
+                                             core);
+            }
+            HYDU_ERR_POP(status, "spawn process returned error\n");
+
+            process_id++;
+        }
+    }
+
+    /* Everything is spawned, register the required FDs  */
+    status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
+                                 HYD_PMCD_pmi_proxy_params.out,
+                                 HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stdout_cb);
+    HYDU_ERR_POP(status, "unable to register fd\n");
+
+    status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
+                                 HYD_PMCD_pmi_proxy_params.err,
+                                 HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stderr_cb);
+    HYDU_ERR_POP(status, "unable to register fd\n");
+
+    /* Indicate that the processes have been launched */
+    HYD_PMCD_pmi_proxy_params.procs_are_launched = 1;
+
+    HYDU_FUNC_EXIT();
+
+  fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
+
+void HYD_PMCD_pmi_proxy_killjob(void)
+{
+    int i;
+
+    HYDU_FUNC_ENTER();
+
+    /* Send the kill signal to all processes */
+    for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
+        if (HYD_PMCD_pmi_proxy_params.pid[i] != -1) {
+            kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGTERM);
+            kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
+        }
+    }
+
+  fn_exit:
+    HYDU_FUNC_EXIT();
+    return;
+
+  fn_fail:
+    goto fn_exit;
+}

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h	2009-03-30 03:57:55 UTC (rev 4222)
@@ -7,20 +7,12 @@
 #ifndef PMI_SERV_H_INCLUDED
 #define PMI_SERV_H_INCLUDED
 
-/* The set of commands supported */
-#define HYD_PMCD_CMD_KILLALL_PROCS      "kill_all_procs"
-#define HYD_PMCD_CMD_KILLALL_PROXIES    "kill_all_proxies"
-#define HYD_PMCD_CMD_LAUNCH_PROCS       "launch_procs"
-#define HYD_PMCD_CMD_SHUTDOWN           "shutdown"
+#include "pmi_common.h"
 
-#define HYD_PMCD_CMD_SEP_CHAR   ';'
-#define HYD_PMCD_CMD_ENV_SEP_CHAR   ','
-
-#define HYD_PMCD_MAX_CMD_LEN    1024
-
 extern int HYD_PMCD_pmi_serv_listenfd;
 
 HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_serv_control_cb(int fd, HYD_Event_t events, void *userp);
 HYD_Status HYD_PMCD_pmi_serv_cleanup(void);
 void HYD_PMCD_pmi_serv_signal_cb(int signal);
 

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -14,7 +14,7 @@
 #include "pmi_serv.h"
 
 int HYD_PMCD_pmi_serv_listenfd;
-extern HYD_Handle handle;
+HYD_Handle handle;
 struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_handle_list;
 
 /*
@@ -137,11 +137,38 @@
 }
 
 
+HYD_Status HYD_PMCD_pmi_serv_control_cb(int fd, HYD_Event_t events, void *userp)
+{
+    struct HYD_Partition *partition;
+    int count;
+    HYD_Status status = HYD_SUCCESS;
+
+    HYDU_FUNC_ENTER();
+
+    partition = (struct HYD_Partition *) userp;
+
+    status = HYDU_sock_read(fd, &partition->exit_status, sizeof(int), &count);
+    HYDU_ERR_POP(status, "unable to read status from proxy\n");
+
+    status = HYD_DMX_deregister_fd(fd);
+    HYDU_ERR_POP(status, "error deregistering fd\n");
+
+    close(fd);
+
+  fn_exit:
+    HYDU_FUNC_EXIT();
+    return status;
+
+  fn_fail:
+    goto fn_exit;
+}
+
+
 HYD_Status HYD_PMCD_pmi_serv_cleanup(void)
 {
     struct HYD_Partition *partition;
     int fd;
-    char cmd[HYD_PMCD_MAX_CMD_LEN];
+    enum HYD_PMCD_pmi_proxy_cmds cmd;
     HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
 
     HYDU_FUNC_ENTER();
@@ -149,8 +176,6 @@
     /* FIXME: Instead of doing this from this process itself, fork a
      * bunch of processes to do this. */
     /* Connect to all proxies and send a KILL command */
-    HYDU_snprintf(cmd, HYD_PMCD_MAX_CMD_LEN, "%s=%s %c\n",
-                  "cmd", HYD_PMCD_CMD_KILLALL_PROCS, HYD_PMCD_CMD_SEP_CHAR);
     for (partition = handle.partition_list; partition; partition = partition->next) {
         status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
         if (status != HYD_SUCCESS) {
@@ -159,7 +184,8 @@
             continue;   /* Move on to the next proxy */
         }
 
-        status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+        cmd = KILL_JOB;
+        status = HYDU_sock_write(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
         if (status != HYD_SUCCESS) {
             HYDU_Warn_printf("unable to send data to the proxy on %s\n", partition->name);
             overall_status = HYD_INTERNAL_ERROR;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -30,12 +30,14 @@
     status = HYD_PMCD_pmi_finalize();
     HYDU_ERR_POP(status, "unable to finalize process manager utils\n");
 
-    /* We use BSC only for local proxies */
-    if (!handle.is_proxy_remote) {
+    if (handle.launch_mode == HYD_LAUNCH_RUNTIME) {
         status = HYD_BSCI_finalize();
         HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
     }
 
+    status = HYD_DMX_finalize();
+    HYDU_ERR_POP(status, "error returned from demux finalize\n");
+
   fn_exit:
     HYDU_FUNC_EXIT();
     return status;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -13,18 +13,17 @@
 #include "pmi_serv.h"
 
 int HYD_PMCD_pmi_serv_listenfd;
-extern HYD_Handle handle;
+HYD_Handle handle;
 
-/* Local proxy is a proxy that is local to this process */
-static HYD_Status launch_procs_with_local_proxy(void)
+static HYD_Status fill_in_proxy_args(void)
 {
-    HYD_Status status = HYD_SUCCESS;
     int i, arg, process_id;
     char *path_str[HYD_NUM_TMP_STRINGS];
     HYD_Env_t *env;
     struct HYD_Partition *partition;
     struct HYD_Partition_exec *exec;
     struct HYD_Partition_segment *segment;
+    HYD_Status status = HYD_SUCCESS;
 
     handle.one_pass_count = 0;
     for (partition = handle.partition_list; partition; partition = partition->next)
@@ -114,6 +113,27 @@
         }
     }
 
+  fn_exit:
+    return status;
+
+  fn_fail:
+    goto fn_exit;
+}
+
+/* Local proxy is a proxy that is local to this process */
+static HYD_Status launch_procs_in_runtime_mode(void)
+{
+    int i, arg, process_id;
+    char *path_str[HYD_NUM_TMP_STRINGS];
+    HYD_Env_t *env;
+    struct HYD_Partition *partition;
+    struct HYD_Partition_exec *exec;
+    struct HYD_Partition_segment *segment;
+    HYD_Status status = HYD_SUCCESS;
+
+    status = fill_in_proxy_args();
+    HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
+
     /* Initialize the bootstrap server and ask it to launch the
      * processes. */
     status = HYD_BSCI_init(handle.bootstrap);
@@ -124,93 +144,196 @@
 
   fn_exit:
     return status;
+
   fn_fail:
     goto fn_exit;
 }
 
-/* Request remote proxies to shutdown */
-static HYD_Status shutdown_remote_proxies(void)
+static HYD_Status boot_proxies(void)
 {
-    char shutdown_proxies_cmd[HYD_PMCD_MAX_CMD_LEN];
-    struct HYD_Partition *partition = NULL;
-    int HYD_PMCD_pmi_proxy_connfd = -1;
     HYD_Status status = HYD_SUCCESS;
+    int i, arg;
+    char *path_str[HYD_NUM_TMP_STRINGS];
+    struct HYD_Partition *partition;
 
+    handle.one_pass_count = 0;
+    for (partition = handle.partition_list; partition; partition = partition->next)
+        handle.one_pass_count += partition->total_proc_count;
+
+    /* Create the arguments list for each proxy */
     for (partition = handle.partition_list; partition; partition = partition->next) {
-        status = HYDU_sock_connect(partition->name, handle.pproxy_port,
-                                   &HYD_PMCD_pmi_proxy_connfd);
-        HYDU_ERR_POP(status, "Error connecting to remote proxy");
 
-        /* Create shutdown command */
-        HYDU_snprintf(shutdown_proxies_cmd, HYD_PMCD_MAX_CMD_LEN,
-                      "%s=%s %c\n", "cmd", HYD_PMCD_CMD_SHUTDOWN, HYD_PMCD_CMD_SEP_CHAR);
+        arg = HYDU_strlist_lastidx(partition->proxy_args);
+        i = 0;
+        path_str[i++] = HYDU_strdup(handle.base_path);
+        path_str[i++] = HYDU_strdup("pmi_proxy");
+        path_str[i] = NULL;
+        status = HYDU_str_alloc_and_join(path_str, &partition->proxy_args[arg++]);
+        HYDU_ERR_POP(status, "unable to join strings\n");
+        HYDU_free_strlist(path_str);
 
-        status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, shutdown_proxies_cmd,
-                                     strlen(shutdown_proxies_cmd));
-        HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+        partition->proxy_args[arg++] = HYDU_strdup("--persistent-mode");
+        partition->proxy_args[arg++] = HYDU_strdup("--proxy-port");
+        partition->proxy_args[arg++] = HYDU_int_to_str(handle.proxy_port);
+        partition->proxy_args[arg++] = NULL;
 
-        /* FIXME: Read result */
-        partition->out = HYD_PMCD_pmi_proxy_connfd;
-        partition->err = -1;
+        if (handle.debug) {
+            HYDU_Debug("Executable passed to the bootstrap: ");
+            HYDU_print_strlist(partition->proxy_args);
+        }
     }
 
+    /* Initialize the bootstrap server and ask it to launch the
+     * processes. */
+    status = HYD_BSCI_init(handle.bootstrap);
+    HYDU_ERR_POP(status, "bootstrap server initialization failed\n");
+
+    status = HYD_BSCI_launch_procs();
+    HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
+
   fn_exit:
     return status;
+
   fn_fail:
     goto fn_exit;
 }
 
-/* Remote proxy is a proxy external to this process */
-static HYD_Status launch_procs_with_remote_proxy(void)
+static HYD_Status shutdown_proxies(void)
 {
+    struct HYD_Partition *partition;
+    enum HYD_PMCD_pmi_proxy_cmds cmd;
+    int fd;
     HYD_Status status = HYD_SUCCESS;
-    char launch_procs_cmd[HYD_PMCD_MAX_CMD_LEN];
-    char env_list[HYD_PMCD_MAX_CMD_LEN];        /* FIXME: Wrong *MAX*... */
-    int env_list_len = 0;
-    char *p = NULL;
-    struct HYD_Partition *partition = NULL;
-    struct HYD_Partition_exec *exec = NULL;
-    struct HYD_Env *env = NULL;
-    int HYD_PMCD_pmi_proxy_connfd = -1;
 
     for (partition = handle.partition_list; partition; partition = partition->next) {
-        status = HYDU_sock_connect(partition->name, handle.pproxy_port,
-                                   &HYD_PMCD_pmi_proxy_connfd);
-        HYDU_ERR_POP(status, "Error connecting to remote proxy");
+        status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
+        HYDU_ERR_POP(status, "unable to connect to proxy\n");
 
-        exec = partition->exec_list;
+        cmd = PROXY_SHUTDOWN;
+        status = HYDU_sock_write(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+        HYDU_ERR_POP(status, "unable to write data to proxy\n");
 
-        /* FIXME: Create a util func for converting env list to a string */
-        env = handle.system_env;
-        p = env_list;
-        *p = '\0';
-        env_list_len = HYD_PMCD_MAX_CMD_LEN;
-        while (env) {
-            HYDU_snprintf(p, env_list_len, "%s=%s %c",
-                          env->env_name, env->env_value, HYD_PMCD_CMD_ENV_SEP_CHAR);
-            env_list_len -= strlen(p);
-            p += strlen(p);
-            env = env->next;
+        close(fd);
+    }
+
+  fn_exit:
+    return status;
+
+  fn_fail:
+    goto fn_exit;
+}
+
+static HYD_Status launch_procs_in_persistent_mode(void)
+{
+    struct HYD_Partition *partition;
+    enum HYD_PMCD_pmi_proxy_cmds cmd;
+    int i, list_len, arg_len, first_partition;
+    HYD_Status status = HYD_SUCCESS;
+
+    /*
+     * Here are the steps we will follow:
+     *
+     * 1. Put all the arguments to pass in to a string list.
+     *
+     * 2. Connect to the proxy (this will be our primary control
+     *    socket).
+     *
+     * 3. Read this string list and write the following to the socket:
+     *    (a) The PROC_INFO command.
+     *    (b) Integer sized data with the number of arguments to
+     *        follow.
+     *    (c) For each argument to pass, first send an integer which
+     *        tells the proxy how many bytes are coming in that
+     *        argument.
+     *
+     * 4. Open two new sockets and connect them to the proxy.
+     *
+     * 5. On the first new socket, send USE_AS_STDOUT and the second
+     *    send USE_AS_STDERR.
+     *
+     * 6. For PMI_ID "0", open a separate socket and send the
+     *    USE_AS_STDIN command on it.
+     *
+     * 7. We need to figure out what to do with the LAUNCH_JOB
+     *    command; since it's going on a different socket, it might go
+     *    out-of-order. Maybe a state machine on the proxy to see if
+     *    it got all the information it needs to launch the job would
+     *    work.
+     */
+
+    status = fill_in_proxy_args();
+    HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
+
+    /* Though we don't use the bootstrap server right now, we still
+     * initialize it, as we need to query it for information
+     * sometimes. */
+    status = HYD_BSCI_init(handle.bootstrap);
+    HYDU_ERR_POP(status, "bootstrap server initialization failed\n");
+
+    first_partition = 1;
+    for (partition = handle.partition_list; partition; partition = partition->next) {
+        status = HYDU_sock_connect(partition->name, handle.proxy_port,
+                                   &partition->control_fd);
+        HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+        cmd = PROC_INFO;
+        status = HYDU_sock_write(partition->control_fd, &cmd,
+                                 sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+        HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+        /* Check how many arguments we have */
+        list_len = HYDU_strlist_lastidx(partition->proxy_args);
+        status = HYDU_sock_write(partition->control_fd, &list_len, sizeof(int));
+        HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+        /* Convert the string list to parseable data and send */
+        for (i = 0; partition->proxy_args[i]; i++) {
+            arg_len = strlen(partition->proxy_args[i]) + 1;
+
+            status = HYDU_sock_write(partition->control_fd, &arg_len, sizeof(int));
+            HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+            status = HYDU_sock_write(partition->control_fd, partition->proxy_args[i],
+                                     arg_len);
+            HYDU_ERR_POP(status, "unable to write data to proxy\n");
         }
-        /* Create launch command */
-        HYDU_snprintf(launch_procs_cmd, HYD_PMCD_MAX_CMD_LEN,
-                      "%s=%s %c %s=%s %c %s=%d %c %s=%s %c\n",
-                      "cmd", HYD_PMCD_CMD_LAUNCH_PROCS, HYD_PMCD_CMD_SEP_CHAR,
-                      "exec_name", exec->exec[0], HYD_PMCD_CMD_SEP_CHAR,
-                      "exec_cnt", exec->proc_count, HYD_PMCD_CMD_SEP_CHAR,
-                      "env", env_list, HYD_PMCD_CMD_SEP_CHAR);
 
-        status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, launch_procs_cmd,
-                                     strlen(launch_procs_cmd));
-        HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+        /* Register the control socket with the demux engine */
+        status = HYD_DMX_register_fd(1, &partition->control_fd, HYD_STDOUT, partition,
+                                     HYD_PMCD_pmi_serv_control_cb);
 
-        /* FIXME: Read result */
-        partition->out = HYD_PMCD_pmi_proxy_connfd;
-        partition->err = -1;
+        /* Create an stdout socket */
+        status = HYDU_sock_connect(partition->name, handle.proxy_port, &partition->out);
+        HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+        cmd = USE_AS_STDOUT;
+        status = HYDU_sock_write(partition->out, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+        HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+        /* Create an stderr socket */
+        status = HYDU_sock_connect(partition->name, handle.proxy_port, &partition->err);
+        HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+        cmd = USE_AS_STDERR;
+        status = HYDU_sock_write(partition->err, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+        HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+        /* If rank 0 is here, create an stdin socket */
+        if (first_partition) {
+            status = HYDU_sock_connect(partition->name, handle.proxy_port, &handle.in);
+            HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+            cmd = USE_AS_STDIN;
+            status = HYDU_sock_write(handle.in, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+            HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+            first_partition = 0;
+        }
     }
 
   fn_exit:
     return status;
+
   fn_fail:
     goto fn_exit;
 }
@@ -271,7 +394,6 @@
                              "gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
 
     sport = HYDU_int_to_str(port);
-
     HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
     HYDU_snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
                   "%s:%s", hostname, sport);
@@ -292,20 +414,22 @@
     status = HYD_PMCD_pmi_create_pg();
     HYDU_ERR_POP(status, "unable to create process group\n");
 
-    if (handle.is_proxy_remote) {
-        if (handle.is_proxy_terminator) {
-            status = shutdown_remote_proxies();
-            HYDU_ERR_POP(status, "Error shutting down remote proxies\n");
-        }
-        else {
-            status = launch_procs_with_remote_proxy();
-            HYDU_ERR_POP(status, "Error launching procs with remote proxy\n");
-        }
+    if (handle.launch_mode == HYD_LAUNCH_RUNTIME) {
+        status = launch_procs_in_runtime_mode();
+        HYDU_ERR_POP(status, "error launching procs in runtime mode\n");
     }
-    else {
-        status = launch_procs_with_local_proxy();
-        HYDU_ERR_POP(status, "Error launching procs with local proxy\n");
+    else if (handle.launch_mode == HYD_LAUNCH_BOOT) {
+        status = boot_proxies();
+        HYDU_ERR_POP(status, "error booting proxies\n");
     }
+    else if (handle.launch_mode == HYD_LAUNCH_SHUTDOWN) {
+        status = shutdown_proxies();
+        HYDU_ERR_POP(status, "error shutting down proxies\n");
+    }
+    else if (handle.launch_mode == HYD_LAUNCH_PERSISTENT) {
+        status = launch_procs_in_persistent_mode();
+        HYDU_ERR_POP(status, "error launching procs in persistent mode\n");
+    }
 
   fn_exit:
     HYDU_FUNC_EXIT();
@@ -318,19 +442,65 @@
 
 HYD_Status HYD_PMCI_wait_for_completion(void)
 {
+    struct HYD_Partition *partition;
+    int sockets_open, all_procs_exited;
     HYD_Status status = HYD_SUCCESS;
 
     HYDU_FUNC_ENTER();
 
-    if (handle.is_proxy_remote) {
+    if ((handle.launch_mode == HYD_LAUNCH_BOOT) ||
+        (handle.launch_mode == HYD_LAUNCH_SHUTDOWN)) {
         status = HYD_SUCCESS;
     }
     else {
-        status = HYD_BSCI_wait_for_completion();
-        if (status != HYD_SUCCESS) {
-            status = HYD_PMCD_pmi_serv_cleanup();
-            HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+        while (1) {
+            /* Wait for some event to occur */
+            status = HYD_DMX_wait_for_event(HYDU_time_left(handle.start, handle.timeout));
+            HYDU_ERR_POP(status, "error waiting for event\n");
+
+            /* Check to see if there's any open read socket left; if
+             * there are, we will just wait for more events. */
+            sockets_open = 0;
+            for (partition = handle.partition_list; partition; partition = partition->next) {
+                if (partition->out != -1 || partition->err != -1) {
+                    sockets_open++;
+                    break;
+                }
+            }
+
+            if (sockets_open && HYDU_time_left(handle.start, handle.timeout))
+                continue;
+
+            break;
         }
+
+        /* The bootstrap will wait for all processes to terminate */
+        if (handle.launch_mode == HYD_LAUNCH_RUNTIME) {
+            status = HYD_BSCI_wait_for_completion();
+            if (status != HYD_SUCCESS) {
+                status = HYD_PMCD_pmi_serv_cleanup();
+                HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+            }
+        }
+        else if (handle.launch_mode == HYD_LAUNCH_PERSISTENT) {
+            do {
+                /* Check if the exit status has already arrived */
+                all_procs_exited = 1;
+                for (partition = handle.partition_list; partition; partition = partition->next) {
+                    if (partition->exit_status == -1) {
+                        all_procs_exited = 0;
+                        break;
+                    }
+                }
+
+                if (all_procs_exited)
+                    break;
+
+                /* If not, wait for some event to occur */
+                status = HYD_DMX_wait_for_event(HYDU_time_left(handle.start, handle.timeout));
+                HYDU_ERR_POP(status, "error waiting for event\n");
+            } while (1);
+        }
     }
 
   fn_exit:

Modified: mpich2/trunk/src/pm/hydra/utils/launch/allocate.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/launch/allocate.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/utils/launch/allocate.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -22,6 +22,7 @@
     (*partition)->out = -1;
     (*partition)->err = -1;
     (*partition)->exit_status = -1;
+    (*partition)->control_fd = -1;
     (*partition)->proxy_args[0] = NULL;
 
     (*partition)->exec_list = NULL;

Modified: mpich2/trunk/src/pm/hydra/utils/sock/sock.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/sock/sock.c	2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/utils/sock/sock.c	2009-03-30 03:57:55 UTC (rev 4222)
@@ -134,7 +134,7 @@
      * return an error, but only print a warning message. The upper
      * layer can decide what to do with the return status. */
     if (connect(*fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) {
-        HYDU_Warn_printf("connect error (%s)\n", HYDU_strerror(errno));
+        HYDU_Error_printf("connect error (%s)\n", HYDU_strerror(errno));
         status = HYD_SOCK_ERROR;
         goto fn_fail;
     }
@@ -375,8 +375,8 @@
 }
 
 
-HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, char *buf, int *buf_count,
-                              int *buf_offset, int *closed)
+HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, int stdin_fd, char *buf,
+                              int *buf_count, int *buf_offset, int *closed)
 {
     int count;
     HYD_Status status = HYD_SUCCESS;
@@ -401,7 +401,7 @@
         }
 
         /* If we are still here, we need to refill our temporary buffer */
-        count = read(0, buf, HYD_TMPBUF_SIZE);
+        count = read(stdin_fd, buf, HYD_TMPBUF_SIZE);
         if (count < 0) {
             if (errno == EINTR || errno == EAGAIN) {
                 /* This call was interrupted or there was no data to read; just break out. */



More information about the mpich2-commits mailing list