[mpich2-commits] r4213 - in mpich2/trunk/src/pm/hydra: bootstrap/fork bootstrap/slurm bootstrap/ssh bootstrap/utils control/consys demux include launcher/mpiexec launcher/utils pm/pmiserv utils/env

jayesh at mcs.anl.gov jayesh at mcs.anl.gov
Fri Mar 27 13:11:52 CDT 2009


Author: jayesh
Date: 2009-03-27 13:11:52 -0500 (Fri, 27 Mar 2009)
New Revision: 4213

Modified:
   mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
   mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
   mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
   mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c
   mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
   mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
   mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
   mpich2/trunk/src/pm/hydra/demux/demux.c
   mpich2/trunk/src/pm/hydra/demux/demux.h
   mpich2/trunk/src/pm/hydra/include/hydra.h
   mpich2/trunk/src/pm/hydra/include/hydra_base.h
   mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
   mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
   mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h
   mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
   mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
   mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
   mpich2/trunk/src/pm/hydra/utils/env/env.c
Log:
# Initial cut of distributed proxies support in hydra
  - launch/shutdown proxies using job launcher
  - launch jobs using the standalone proxy
# DMX engine can now handle user contexts. The user registers the context when registering the fd & the DMX engine provides the context in the callback.
# Limitations (will be fixed soon...)
  - Code is a bit hackish... FIXMEs should cover a lot of them
  - Works only on localhost - debugging multiple hosts
  - Does not support MPMD
  - Supports only one job at a time
  - Need to provide complete path to executablea - to be fixed soon


Modified: mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,7 +9,7 @@
 #include "bscu.h"
 #include "fork.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 HYD_Status HYD_BSCD_fork_launch_procs(void)
 {

Modified: mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,7 +9,7 @@
 #include "bscu.h"
 #include "slurm.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 HYD_Status HYD_BSCD_slurm_launch_procs(void)
 {

Modified: mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,7 +9,7 @@
 #include "bscu.h"
 #include "ssh.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 /*
  * HYD_BSCD_ssh_launch_procs: For each process, we create an

Modified: mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -8,7 +8,7 @@
 #include "hydra_utils.h"
 #include "bscu.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 /*
  * HYD_BSCU_wait_for_completion: We first wait for communication

Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_close.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_close.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
 #include "pmci.h"
 #include "demux.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 HYD_Status HYD_CSI_close_fd(int fd)
 {

Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
 #include "pmci.h"
 #include "demux.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 HYD_Status HYD_CSI_launch_procs(void)
 {
@@ -24,11 +24,13 @@
     HYDU_ERR_POP(status, "PM returned error while launching processes\n");
 
     for (partition = handle.partition_list; partition; partition = partition->next) {
-        status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, handle.stdout_cb);
+        status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, NULL, handle.stdout_cb);
         HYDU_ERR_POP(status, "demux returned error registering fd\n");
 
-        status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, handle.stderr_cb);
-        HYDU_ERR_POP(status, "demux returned error registering fd\n");
+        if(partition->err != -1) {
+            status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, NULL, handle.stderr_cb);
+            HYDU_ERR_POP(status, "demux returned error registering fd\n");
+        }
     }
 
     if (handle.in != -1) {      /* Only process_id 0 */
@@ -42,7 +44,7 @@
         handle.stdin_buf_count = 0;
         handle.stdin_buf_offset = 0;
 
-        status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, handle.stdin_cb);
+        status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, handle.stdin_cb);
         HYDU_ERR_POP(status, "demux returned error registering fd\n");
     }
 

Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
 #include "pmci.h"
 #include "demux.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 HYD_Status HYD_CSI_wait_for_completion(void)
 {

Modified: mpich2/trunk/src/pm/hydra/demux/demux.c
===================================================================
--- mpich2/trunk/src/pm/hydra/demux/demux.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/demux/demux.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,15 +14,16 @@
     int num_fds;
     int *fd;
     HYD_Event_t events;
-     HYD_Status(*callback) (int fd, HYD_Event_t events);
+    void *userp;
+     HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp);
 
     struct HYD_DMXI_callback *next;
 } HYD_DMXI_callback_t;
 
 static HYD_DMXI_callback_t *cb_list = NULL;
 
-HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
-                               HYD_Status(*callback) (int fd, HYD_Event_t events))
+HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events, void *userp,
+                               HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp))
 {
     HYD_DMXI_callback_t *cb_element, *run;
     int i;
@@ -39,6 +40,7 @@
     HYDU_MALLOC(cb_element->fd, int *, num_fds * sizeof(int), status);
     memcpy(cb_element->fd, fd, num_fds * sizeof(int));
     cb_element->events = events;
+    cb_element->userp = userp;
     cb_element->callback = callback;
     cb_element->next = NULL;
 
@@ -158,7 +160,7 @@
                 if (pollfds[i].revents & POLLIN)
                     events |= HYD_STDOUT;
 
-                status = run->callback(pollfds[i].fd, events);
+                status = run->callback(pollfds[i].fd, events, run->userp);
                 HYDU_ERR_POP(status, "callback returned error status\n");
             }
 

Modified: mpich2/trunk/src/pm/hydra/demux/demux.h
===================================================================
--- mpich2/trunk/src/pm/hydra/demux/demux.h	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/demux/demux.h	2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,8 +9,8 @@
 
 #include "hydra.h"
 
-HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
-                               HYD_Status(*callback) (int fd, HYD_Event_t events));
+HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events, void *userp,
+                               HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp));
 HYD_Status HYD_DMX_deregister_fd(int fd);
 HYD_Status HYD_DMX_wait_for_event(int time);
 HYD_Status HYD_DMX_finalize(void);

Modified: mpich2/trunk/src/pm/hydra/include/hydra.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra.h	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/include/hydra.h	2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,7 +14,16 @@
 struct HYD_Handle_ {
     char *base_path;
     int proxy_port;
+    /* The persistent proxy is different from the centralized proxy
+     * and hence needs its own port - pproxy_port */
+    int pproxy_port;
     char *bootstrap;
+    /* FIXME: We should define a proxy type instead of all these 
+     * flags... proxy_type = PROXY_LAUNCHER | PROXY_TERMINATOR
+     */
+    int is_proxy_launcher;
+    int is_proxy_terminator;
+    int is_proxy_remote;
     HYD_Binding binding;
     char *user_bind_map;
 
@@ -31,9 +40,9 @@
     HYD_Env_t *prop_env;
 
     int in;
-     HYD_Status(*stdin_cb) (int fd, HYD_Event_t events);
-     HYD_Status(*stdout_cb) (int fd, HYD_Event_t events);
-     HYD_Status(*stderr_cb) (int fd, HYD_Event_t events);
+     HYD_Status(*stdin_cb) (int fd, HYD_Event_t events, void *userp);
+     HYD_Status(*stdout_cb) (int fd, HYD_Event_t events, void *userp);
+     HYD_Status(*stderr_cb) (int fd, HYD_Event_t events, void *userp);
 
     /* Start time and timeout. These are filled in by the launcher,
      * but are utilized by the demux engine and the boot-strap server
@@ -57,4 +66,7 @@
 
 extern HYD_Handle handle;
 
+#define HYD_PROXY_NAME "pmi_proxy"
+#define HYD_PPROXY_PORT 8677
+
 #endif /* HYDRA_H_INCLUDED */

Modified: mpich2/trunk/src/pm/hydra/include/hydra_base.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_base.h	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/include/hydra_base.h	2009-03-27 18:11:52 UTC (rev 4213)
@@ -34,6 +34,10 @@
 #include <sys/types.h>
 #endif /* HAVE_SYS_TYPES_H */
 
+#if defined HAVE_SYS_STAT_H
+#include <sys/stat.h>
+#endif /* HAVE_SYS_STAT_H */
+
 #include <errno.h>
 
 #if defined HAVE_GETTIMEOFDAY

Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,9 +9,9 @@
 #include "mpiexec.h"
 #include "csi.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
-HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp)
 {
     int closed;
     HYD_Status status = HYD_SUCCESS;
@@ -38,8 +38,7 @@
     goto fn_exit;
 }
 
-
-HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events, void *userp)
 {
     int closed;
     HYD_Status status = HYD_SUCCESS;
@@ -67,7 +66,7 @@
 }
 
 
-HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events, void *userp)
 {
     int closed;
     HYD_Status status = HYD_SUCCESS;

Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
 #include "lchu.h"
 #include "csi.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 static void usage(void)
 {

Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h	2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,8 +10,8 @@
 #include "hydra.h"
 
 HYD_Status HYD_LCHI_get_parameters(char **t_argv);
-HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events, void *userp);
 
 #endif /* MPIEXEC_H_INCLUDED */

Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -11,7 +11,7 @@
 
 #define HYDRA_MAX_PATH 4096
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 
 #define INCREMENT_ARGV(status)             \
     {                                      \
@@ -33,7 +33,7 @@
 {
     int i, local_env_set;
     char **argv = t_argv, *tmp;
-    char *env_name, *env_value, *str[4] = { NULL }, *progname = *argv;
+    char *env_name, *env_value, *str[4] = { NULL, NULL, NULL, NULL }, *progname = *argv;
     HYD_Env_t *env;
     struct HYD_Exec_info *exec_info;
     HYD_Status status = HYD_SUCCESS;
@@ -68,7 +68,32 @@
             handle.enablex = !strcmp(*argv, "--enable-x");
             continue;
         }
+        
+        if(!strcmp(*argv, "--boot-proxies")) {
+            /* FIXME: Prevent usage of incompatible params */
+            handle.bootstrap = HYDU_strdup("ssh");
+            handle.is_proxy_launcher = 1;
+            handle.prop = HYD_ENV_PROP_ALL;
+            continue;
+        }
 
+        if(!strcmp(*argv, "--remote-proxy")) {
+            /* FIXME: We should get rid of this option eventually.
+             * This should be the default case. The centralized
+             * version should use an option like "--local-proxy"
+             */
+            handle.is_proxy_remote = 1;
+            handle.prop = HYD_ENV_PROP_ALL;
+            continue;
+        }
+
+        if(!strcmp(*argv, "--shutdown-proxies")) {
+            handle.is_proxy_remote = 1;
+            handle.is_proxy_terminator = 1;
+            handle.prop = HYD_ENV_PROP_ALL;
+            continue;
+        }
+
         if (!strcmp(*argv, "-genvall")) {
             HYDU_ERR_CHKANDJUMP(status, handle.prop != HYD_ENV_PROP_UNSET,
                                 HYD_INTERNAL_ERROR, "duplicate prop setting\n");
@@ -237,12 +262,24 @@
             continue;
         }
 
+        if (!strcmp(str[0], "--pproxy-port")) {
+            if (!str[1]) {
+                INCREMENT_ARGV(status);
+                str[1] = *argv;
+            }
+            HYDU_ERR_CHKANDJUMP(status, handle.pproxy_port != -1, HYD_INTERNAL_ERROR,
+                                "duplicate persistent proxy port\n");
+            handle.pproxy_port = atoi(str[1]);
+            continue;
+        }
+
         if (*argv[0] == '-')
             HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "unrecognized argument\n");
 
         status = HYD_LCHU_get_current_exec_info(&exec_info);
         HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
 
+        /* End of Job launcher option handling */
         /* Read the executable till you hit the end of a ":" */
         do {
             if (!strcmp(*argv, ":")) {  /* Next executable */
@@ -262,8 +299,32 @@
             break;
 
         continue;
+
     }
+    /* In the case of the proxy launcher, aka --boot-proxies, there is no executable
+     * specified */
+    if(handle.is_proxy_launcher || handle.is_proxy_terminator) {
+        if(handle.exec_info_list != NULL)
+            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+                "No executable should be specified when booting proxies\n");
 
+        status = HYD_LCHU_get_current_exec_info(&exec_info);
+        HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
+
+        exec_info->exec[0] = HYDU_strdup(HYD_PROXY_NAME" --persistent");
+        exec_info->exec[1] = NULL;
+        exec_info->exec_proc_count = 1;
+
+        env_name = HYDU_strdup("HYD_PROXY_PORT");
+        env_value = HYDU_int_to_str(handle.pproxy_port);
+
+        status = HYDU_env_create(&env, env_name, env_value);
+        HYDU_ERR_POP(status, "unable to create env struct\n");
+
+        HYDU_append_env_to_list(*env, &exec_info->user_env);
+    }
+
+
     tmp = getenv("MPIEXEC_DEBUG");
     if (handle.debug == -1 && tmp)
         handle.debug = atoi(tmp) ? 1 : 0;

Modified: mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,7 +14,11 @@
 {
     handle.base_path = NULL;
     handle.proxy_port = -1;
+    handle.pproxy_port = HYD_PPROXY_PORT;
     handle.bootstrap = NULL;
+    handle.is_proxy_launcher = 0;
+    handle.is_proxy_terminator = 0;
+    handle.is_proxy_remote = 0;
     handle.binding = HYD_BIND_UNSET;
     handle.user_bind_map = NULL;
 

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -8,7 +8,7 @@
 #include "pmi_handle.h"
 #include "pmi_handle_v1.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 HYD_PMCD_pmi_pg_t *pg_list = NULL;
 
 struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_v1;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -11,7 +11,7 @@
 #include "pmi_handle.h"
 #include "pmi_handle_v1.h"
 
-HYD_Handle handle;
+extern HYD_Handle handle;
 HYD_PMCD_pmi_pg_t *pg_list;
 
 /* TODO: abort, create_kvs, destroy_kvs, getbyidx, spawn */

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -12,6 +12,76 @@
 struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 int HYD_PMCD_pmi_proxy_listenfd;
 
+static HYD_Status HYD_PMCD_pmi_pproxy_start(void ) {
+    /* If this function exits... its always an error */
+    HYD_Status status = HYD_INTERNAL_ERROR;
+    int ret = 0;
+    pid_t proc_id = -1;
+    struct rlimit rl;
+
+    umask(0);
+
+    /* Get the limit of fds */
+    ret = getrlimit(RLIMIT_NOFILE, &rl);
+    if(ret == -1)
+        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "getrlimit() failed (%s)\n",
+                                HYDU_strerror(errno));
+
+    proc_id = fork();
+    if(proc_id > 0 ) {
+        /* Ignore exit from child proc - persistent pmi proxy */
+        status = HYDU_set_signal(SIGCHLD, SIG_IGN);
+        HYDU_ERR_POP(status, "Setting SIGCHLD handler to SIG_IGN failed\n");
+        
+        /* Parent process exits */
+        if(!HYD_PMCD_pmi_proxy_params.debug)
+            exit(0);
+    }
+    else if(proc_id == 0 ) {
+        /* Child proc continues */
+        int i;
+        pid_t spid;
+        spid = setsid();
+        if(spid == -1)
+            HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "setsid() failed(%s)\n",
+                                    HYDU_strerror(errno));
+
+        if(!HYD_PMCD_pmi_proxy_params.debug)
+        for(i=0; i<rl.rlim_max; i++)
+            close(i);
+        /* FIXME: dup(0,1,2) to "/dev/null" */
+
+        if(getenv("HYD_PROXY_PORT"))
+            HYD_PMCD_pmi_proxy_params.proxy_port = atoi(getenv("HYD_PROXY_PORT"));
+        else
+            HYD_PMCD_pmi_proxy_params.proxy_port = -1;
+
+        status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
+                              (uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
+        HYDU_ERR_POP(status, "unable to listen on socket\n");
+
+        /* Register the listening socket with the demux engine */
+        status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
+                                 HYD_PMCD_pmi_proxy_listen_cb);
+        HYDU_ERR_POP(status, "unable to register fd\n");
+    }
+    else {
+        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "fork() failed (%s) \n",
+                                HYDU_strerror(errno));
+    }
+
+
+    while(1) {
+        status = HYD_DMX_wait_for_event(-1);
+        HYDU_ERR_POP(status, "demux engine error waiting for event\n");
+    }
+
+fn_exit:
+    return status;
+fn_fail:
+    goto fn_exit;
+}
+
 int main(int argc, char **argv)
 {
     int i, j, arg, count, pid, ret_status;
@@ -26,12 +96,18 @@
     status = HYD_PMCD_pmi_proxy_get_params(argc, argv);
     HYDU_ERR_POP(status, "bad parameters passed to the proxy\n");
 
+    if(HYD_PMCD_pmi_proxy_params.is_persistent) {
+        status = HYD_PMCD_pmi_pproxy_start();
+        HYDU_ERR_POP(status, "Error starting persistent PMI proxy\n");
+        goto fn_exit;
+    }
+
     status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
                               (uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
     HYDU_ERR_POP(status, "unable to listen on socket\n");
 
     /* Register the listening socket with the demux engine */
-    status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT,
+    status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
                                  HYD_PMCD_pmi_proxy_listen_cb);
     HYDU_ERR_POP(status, "unable to register fd\n");
 
@@ -123,7 +199,7 @@
                 HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
                 HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
                 status =
-                    HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, HYD_PMCD_pmi_proxy_stdin_cb);
+                    HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, HYD_PMCD_pmi_proxy_stdin_cb);
                 HYDU_ERR_POP(status, "unable to register fd\n");
             }
             else {
@@ -142,12 +218,12 @@
     /* Everything is spawned, now wait for I/O */
     status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
                                  HYD_PMCD_pmi_proxy_params.out,
-                                 HYD_STDOUT, HYD_PMCD_pmi_proxy_stdout_cb);
+                                 HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stdout_cb);
     HYDU_ERR_POP(status, "unable to register fd\n");
 
     status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
                                  HYD_PMCD_pmi_proxy_params.err,
-                                 HYD_STDOUT, HYD_PMCD_pmi_proxy_stderr_cb);
+                                 HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stderr_cb);
     HYDU_ERR_POP(status, "unable to register fd\n");
 
     while (1) {

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h	2009-03-27 18:11:52 UTC (rev 4213)
@@ -11,7 +11,10 @@
 #include "hydra_utils.h"
 
 struct HYD_PMCD_pmi_proxy_params {
+    int debug;
+
     int proxy_port;
+    int is_persistent;
     char *wdir;
     HYD_Binding binding;
     char *user_bind_map;
@@ -31,6 +34,7 @@
     int *err;
     int *exit_status;
     int in;
+    int rproxy_connfd;
 
     int stdin_buf_offset;
     int stdin_buf_count;
@@ -40,10 +44,18 @@
 extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 extern int HYD_PMCD_pmi_proxy_listenfd;
 
+HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
 HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv);
-HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
+                                    int *key_lenp, char **valp, int *val_lenp, char separator); 
+HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len);
+HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len);
+HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events, void *userp);
 
 #endif /* PMI_PROXY_H_INCLUDED */

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,13 +10,13 @@
 #include "demux.h"
 #include "pmi_serv.h"
 
-struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 int HYD_PMCD_pmi_proxy_listenfd;
 
-HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp)
 {
-    int accept_fd, count, i;
-    enum HYD_PMCD_pmi_proxy_cmds cmd;
+    int accept_fd, cmd_len;
+    char cmd[HYD_PMCD_MAX_CMD_LEN];
     HYD_Status status = HYD_SUCCESS;
 
     HYDU_FUNC_ENTER();
@@ -28,39 +28,67 @@
         status = HYDU_sock_accept(fd, &accept_fd);
         HYDU_ERR_POP(status, "accept error\n");
 
-        status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_pmi_proxy_listen_cb);
+        status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_listen_cb);
         HYDU_ERR_POP(status, "unable to register fd\n");
     }
     else {      /* We got a command from mpiexec */
-        count = read(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
-        if (count < 0) {
-            HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n",
-                                 fd, HYDU_strerror(errno));
-        }
-        else if (count == 0) {
+        status = HYDU_sock_readline(fd, cmd, HYD_PMCD_MAX_CMD_LEN, &cmd_len);
+        HYDU_ERR_POP(status, "Error reading command from proxy");
+        if (cmd_len == 0) {
             /* The connection has closed */
             status = HYD_DMX_deregister_fd(fd);
             HYDU_ERR_POP(status, "unable to deregister fd\n");
             close(fd);
             goto fn_exit;
         }
+        status = HYD_PMCD_pmi_proxy_handle_cmd(fd, cmd, cmd_len);
+        HYDU_ERR_POP(status, "Error handling proxy command\n");
+    }
 
-        if (cmd == KILLALL_PROCS) {     /* Got the killall command */
-            for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
-                if (HYD_PMCD_pmi_proxy_params.pid[i] != -1)
-                {
-                    kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGTERM);
-                    kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
-                }
-            
+  fn_exit:
+    HYDU_FUNC_EXIT();
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
 
-            status = HYD_DMX_deregister_fd(fd);
-            HYDU_ERR_POP(status, "unable to register fd\n");
-            close(fd);
+HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp)
+{
+    int closed, i;
+    HYD_Status status = HYD_SUCCESS;
+    struct HYD_PMCD_pmi_proxy_params *proxy_params;
+
+    HYDU_FUNC_ENTER();
+    proxy_params = (struct HYD_PMCD_pmi_proxy_params *)userp;
+
+    status = HYDU_sock_stdout_cb(fd, events, proxy_params->rproxy_connfd, &closed);
+    HYDU_ERR_POP(status, "stdout callback error\n");
+
+    if (closed) {
+        int all_procs_exited = 1;
+        /* The process exited */
+        status = HYD_DMX_deregister_fd(fd);
+        HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+        /* FIXME: This could be a perf killer if we have a lot of procs associated with
+         * the same job on a single proxy
+         */
+        for (i = 0; i < proxy_params->exec_proc_count; i++) {
+            int ret_status = 0;
+            if (proxy_params->out[i] == fd) {
+                waitpid(proxy_params->pid[i], &ret_status, WUNTRACED);
+                close(proxy_params->in);
+                proxy_params->out[i] = -1;
+                proxy_params->err[i] = -1;
+            }
+            if (proxy_params->out[i] != -1) all_procs_exited = 0;
         }
-        else {
-            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
-                                "got unrecognized command from mpiexec\n");
+        if(all_procs_exited) {
+            close(proxy_params->rproxy_connfd);
+            status = HYD_DMX_deregister_fd(proxy_params->rproxy_connfd);
+            HYDU_ERR_POP(status, "Error deregistering remote job conn fd\n");
+            status = HYD_PMCD_pmi_proxy_cleanup_params(proxy_params);
+            HYDU_ERR_POP(status, "Error cleaning up proxy params\n");
         }
     }
 
@@ -72,8 +100,33 @@
     goto fn_exit;
 }
 
+HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp)
+{
+    int closed=0, i;
+    HYD_Status status = HYD_SUCCESS;
 
-HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events)
+    HYDU_FUNC_ENTER();
+    /* FIXME: This cb should take care of the commands from mpiexec */
+
+    if (closed) {
+        /* The connection has closed */
+        status = HYD_DMX_deregister_fd(fd);
+        HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+            if (HYD_PMCD_pmi_proxy_params.out[i] == fd)
+                HYD_PMCD_pmi_proxy_params.out[i] = -1;
+    }
+
+  fn_exit:
+    HYDU_FUNC_EXIT();
+    return status;
+
+  fn_fail:
+    goto fn_exit;
+}
+
+HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events, void *userp)
 {
     int closed, i;
     HYD_Status status = HYD_SUCCESS;
@@ -102,7 +155,7 @@
 }
 
 
-HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events, void *userp)
 {
     int closed, i;
     HYD_Status status = HYD_SUCCESS;
@@ -131,7 +184,7 @@
 }
 
 
-HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events, void *userp)
 {
     int closed;
     HYD_Status status = HYD_SUCCESS;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -5,10 +5,307 @@
  */
 
 #include "pmi_proxy.h"
+#include "pmi_serv.h"
+#include "demux.h"
 #include "hydra_utils.h"
 
 struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
 
+HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
+                                    int *key_lenp, char **valp, int *val_lenp, char separator) 
+{
+    char *p = NULL;
+    int len = 0;
+    int klen = 0;
+    int vlen = 0;
+
+    HYD_Status status = HYD_SUCCESS;
+
+    p = *bufp;
+    len = *buf_lenp;
+
+    while(len && isspace(*p)) { p++; len--; }
+    if(len <= 0)
+        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+
+    *keyp = p;
+    klen = 0;
+    while(len && (*p != '=')) {
+        p++; len--;
+        klen++;
+    }
+    if(key_lenp) *key_lenp = klen;
+    p++; len--;
+    if(len <= 0)
+        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+
+    *valp = p;
+    vlen = 0;
+    /* FIXME: Allow escaping ';' */
+    while(len && (*p != separator)) {
+        p++; len--;
+        vlen++;
+    }
+    if(val_lenp) *val_lenp = vlen;
+    p++; len--;
+    if(len < 0)
+        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+
+    while(len && isspace(*p)) { p++; len--; }
+    /* p now points to the next key or the end of string */
+    *bufp = p;
+    if(*p != '\0') {
+        /* Remaining length of buffer to be processed */
+        *buf_lenp = len;
+    }
+    else{
+        /* End of string - no more keyvals */
+        *buf_lenp = 0;
+    }
+
+  fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
+HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len)
+{
+    char *key, *val;
+    int i=0, key_len=0, val_len = 0, core = 0;
+    struct HYD_Partition_exec *exec = NULL;
+    HYD_Env_t *env=NULL;
+    HYD_Status status = HYD_SUCCESS;
+
+    /* FIXME: We currently support only one job - We need a list of proxy params for multiple jobs */
+    status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
+    HYDU_ERR_POP(status, "Error initializing proxy params\n");
+
+    HYD_PMCD_pmi_proxy_params.rproxy_connfd = job_connfd;
+
+    status = HYD_DMX_deregister_fd(job_connfd);
+    HYDU_ERR_POP(status, "Unable to deregister job conn fd\n");
+    status = HYD_DMX_register_fd(1, &job_connfd, HYD_STDIN, (void *)&HYD_PMCD_pmi_proxy_params, HYD_PMCD_pmi_proxy_remote_cb);
+    HYDU_ERR_POP(status, "Unable to register job conn fd\n");
+
+    status = HYDU_alloc_partition_exec(&HYD_PMCD_pmi_proxy_params.exec_list);
+    HYDU_ERR_POP(status, "unable to allocate partition exec\n");
+
+    exec = HYD_PMCD_pmi_proxy_params.exec_list;
+
+    while(cmd_len > 0){
+        status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&launch_cmd, &cmd_len, &key, &key_len, &val, &val_len, HYD_PMCD_CMD_SEP_CHAR);
+        HYDU_ERR_POP(status, "Unable to get next key from launch command\n");
+
+        /* FIXME: Use pre-defined macros for keys */
+        if(!strncmp(key, "exec_name", key_len)) {
+            HYDU_MALLOC(exec->exec[0], char *, (val_len + 1), status);
+            HYDU_ERR_POP(status, "Error allocating memory for executable name\n");
+            HYDU_snprintf(exec->exec[0], val_len, "%s", val);
+            exec->exec[1] = NULL;
+        }
+        else if(!strncmp(key, "exec_cnt", key_len)) {
+            for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec->next; exec = exec->next);
+                exec->proc_count = atoi(val);
+        }
+        else if(!strncmp(key, "env", key_len)) {
+            char *env_str;
+            int env_str_len;
+
+            env_str = val; env_str_len = val_len;
+            exec->prop_env = NULL;
+            while(env_str_len > 0) {
+                status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&env_str, &env_str_len, &key, &key_len, &val, &val_len, HYD_PMCD_CMD_ENV_SEP_CHAR);
+                HYDU_ERR_POP(status, "Error getting next environment variable from launch command\n");
+
+                HYDU_MALLOC(env, HYD_Env_t *, sizeof(HYD_Env_t), status);
+                HYDU_ERR_POP(status, "Error allocating memory for environment variable in proxy params\n");
+
+                HYDU_MALLOC(env->env_name, char *, key_len+1, status);
+                HYDU_ERR_POP(status, "Error allocating memory for environment variable in proxy params\n");
+                HYDU_snprintf(env->env_name, key_len+1, "%s", key);
+
+                HYDU_MALLOC(env->env_value, char *, val_len+1, status);
+                HYDU_ERR_POP(status, "Error allocating memory for environment variable in proxy params\n");
+                HYDU_snprintf(env->env_value, val_len+1, "%s", val);
+                env->next = exec->prop_env;
+                exec->prop_env = env;
+            }
+        }
+        else{
+            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Unrecognized key in launch command\n");
+        }
+
+        /* FIXME: Set these ... */
+        /* HYD_PMCD_pmi_proxy_params.wdir =
+           HYD_PMCD_pmi_proxy_params.binding = 
+           HYD_PMCD_pmi_proxy_params.user_bind_map = ;
+           HYDU_append_env_to_list(*env, &HYD_PMCD_pmi_proxy_params.global_env);
+           HYD_PMCD_pmi_proxy_params.one_pass_count 
+           status = HYDU_alloc_partition_segment(&segment->next);
+           segment->proc_count = ;
+            segment->start_pid = ;
+        */
+    }
+
+    HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
+    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
+        HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
+
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+    HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
+                HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+
+    for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
+        for (i = 0; i < exec->proc_count; i++) {
+            char *str = NULL;
+            core = 0;
+            env = NULL;
+            /* FIXME: Use the start pmi_id from launch command */
+            str = HYDU_int_to_str(i);
+            status = HYDU_env_create(&env, "PMI_ID", str);
+            HYDU_ERR_POP(status, "unable to create env\n");
+            status = HYDU_putenv(env);
+            HYDU_ERR_POP(status, "putenv failed\n");
+            status = HYDU_create_process(&exec->exec[0], exec->prop_env, NULL, 
+                                             &HYD_PMCD_pmi_proxy_params.out[i],
+                                             &HYD_PMCD_pmi_proxy_params.err[i],
+                                             &HYD_PMCD_pmi_proxy_params.pid[i], core);
+            HYDU_ERR_POP(status, "Error launching process\n");
+            status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_params.out[i], HYD_STDOUT, (void *)&HYD_PMCD_pmi_proxy_params, HYD_PMCD_pmi_proxy_rstdout_cb);
+            HYDU_ERR_POP(status, "Error registering process stdout\n");
+        }
+    }
+
+  fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
+/* Handle proxy commands */
+HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len)
+{
+    char *key = NULL;
+    char *cmd_name = NULL;
+    int i=0, key_len = 0, cmd_name_len = 0;
+    char *cmdp = NULL;
+    HYD_Status status = HYD_SUCCESS;
+
+    cmdp = cmd;
+    /* First key/val is the command name */
+    status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&cmdp, &cmd_len, &key, &key_len, &cmd_name,
+                                                    &cmd_name_len, HYD_PMCD_CMD_SEP_CHAR );
+    HYDU_ERR_POP(status, "Error retreiving command name from command\n");
+    
+    if (!strncmp(cmd_name, HYD_PMCD_CMD_KILLALL_PROCS, key_len)) {
+        for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+            if (HYD_PMCD_pmi_proxy_params.pid[i] != -1)
+                kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
+
+        status = HYD_DMX_deregister_fd(fd);
+        HYDU_ERR_POP(status, "unable to register fd\n");
+        close(fd);
+    }
+    else if(!strncmp(cmd_name, HYD_PMCD_CMD_LAUNCH_PROCS, key_len)) {
+        status = HYD_PMCD_pmi_proxy_handle_launch_cmd(fd, cmdp, cmd_len);
+        HYDU_ERR_POP(status, "Unable to handle launch command\n");
+    }
+    else if(!strncmp(cmd_name, HYD_PMCD_CMD_SHUTDOWN, key_len)) {
+        /* FIXME: Not a clean shutdown... Kill all procs before exiting */
+        status = HYD_DMX_deregister_fd(fd);
+        HYDU_ERR_POP(status, "unable to register fd\n");
+        close(fd);
+        exit(0);
+    }
+    else {
+        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+                            "got unrecognized command from mpiexec\n");
+    }
+  fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
+/* Initialize proxy params */
+HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params)
+{
+    HYD_Status status = HYD_SUCCESS;
+    proxy_params->debug = 0;
+    proxy_params->proxy_port = -1;
+    proxy_params->is_persistent = 0;
+    proxy_params->wdir = NULL;
+    proxy_params->binding = HYD_BIND_UNSET;
+    proxy_params->user_bind_map = NULL;
+    proxy_params->global_env = NULL;
+    proxy_params->one_pass_count = 0;
+    proxy_params->partition_proc_count = 0;
+    proxy_params->exec_proc_count = 0;
+    proxy_params->segment_list = NULL;
+    proxy_params->exec_list = NULL;
+    proxy_params->pid = NULL;
+    proxy_params->out = NULL;
+    proxy_params->err = NULL;
+    proxy_params->exit_status = NULL;
+    proxy_params->in = -1;
+    proxy_params->rproxy_connfd = -1;
+    proxy_params->stdin_buf_offset = 0;
+    proxy_params->stdin_buf_count = 0;
+    proxy_params->stdin_tmp_buf[0] = '\0';
+    return status;
+}
+
+/* Cleanup proxy params after use */
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params *proxy_params)
+{
+    HYD_Status status = HYD_SUCCESS;
+    if(proxy_params->wdir != NULL)
+        HYDU_FREE(proxy_params->wdir);
+    if(proxy_params->user_bind_map != NULL)
+        HYDU_FREE(proxy_params->user_bind_map);
+    if(proxy_params->global_env != NULL){
+        HYD_Env_t *p, *q;
+        do {
+            p = proxy_params->global_env;
+            q = p->next;
+            HYDU_FREE(p);
+        }while(q);
+    }
+    if(proxy_params->segment_list != NULL) {
+        /* FIXME : incomplete */
+    }
+    if(proxy_params->exec_list != NULL) {
+        struct HYD_Partition_exec *p, *q;
+        do{
+            p = proxy_params->exec_list;
+            q = p->next;
+            HYDU_FREE(p);
+        }while(q);
+    }
+    if(proxy_params->pid != NULL)
+        HYDU_FREE(proxy_params->pid);
+    if(proxy_params->out != NULL)
+        HYDU_FREE(proxy_params->out);
+    if(proxy_params->err != NULL)
+        HYDU_FREE(proxy_params->err);
+    if(proxy_params->exit_status != NULL)
+        HYDU_FREE(proxy_params->exit_status);
+
+    status = HYD_PMCD_pmi_proxy_init_params(proxy_params);
+    HYDU_ERR_POP(status, "Error initializing proxy params\n");
+
+  fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
 HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv)
 {
     char **argv = t_argv, *str;
@@ -20,15 +317,18 @@
 
     HYDU_FUNC_ENTER();
 
-    HYD_PMCD_pmi_proxy_params.exec_list = NULL;
-    HYD_PMCD_pmi_proxy_params.segment_list = NULL;
-    HYD_PMCD_pmi_proxy_params.global_env = NULL;
+    status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
+    HYDU_ERR_POP(status, "Error initializing proxy params\n");
 
     while (*argv) {
         ++argv;
         if (*argv == NULL)
             break;
 
+        if(!strcmp(*argv, "--verbose")) {
+            HYD_PMCD_pmi_proxy_params.debug = 1;
+            continue;
+        }
         /* Proxy port */
         if (!strcmp(*argv, "--proxy-port")) {
             argv++;
@@ -36,6 +336,11 @@
             continue;
         }
 
+        if(!strcmp(*argv, "--persistent")) {
+            HYD_PMCD_pmi_proxy_params.is_persistent = 1;
+            continue;
+        }
+
         /* Working directory */
         if (!strcmp(*argv, "--wdir")) {
             argv++;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h	2009-03-27 18:11:52 UTC (rev 4213)
@@ -7,14 +7,20 @@
 #ifndef PMI_SERV_H_INCLUDED
 #define PMI_SERV_H_INCLUDED
 
-/* Currently we only have one command */
-enum HYD_PMCD_pmi_proxy_cmds {
-    KILLALL_PROCS
-};
+/* The set of commands supported */
+#define HYD_PMCD_CMD_KILLALL_PROCS      "kill_all_procs"
+#define HYD_PMCD_CMD_KILLALL_PROXIES    "kill_all_proxies"
+#define HYD_PMCD_CMD_LAUNCH_PROCS       "launch_procs"
+#define HYD_PMCD_CMD_SHUTDOWN           "shutdown"
 
+#define HYD_PMCD_CMD_SEP_CHAR   ';'
+#define HYD_PMCD_CMD_ENV_SEP_CHAR   ','
+
+#define HYD_PMCD_MAX_CMD_LEN    1024
+
 extern int HYD_PMCD_pmi_serv_listenfd;
 
-HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events, void *userp);
 HYD_Status HYD_PMCD_pmi_serv_cleanup(void);
 void HYD_PMCD_pmi_serv_signal_cb(int signal);
 

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,7 +14,7 @@
 #include "pmi_serv.h"
 
 int HYD_PMCD_pmi_serv_listenfd;
-HYD_Handle handle;
+extern HYD_Handle handle;
 struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_handle_list;
 
 /*
@@ -27,7 +27,7 @@
  * 2. The client sends us a "cmd" or "mcmd" string which means that a
  * single or multi-line command is to follow.
  */
-HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events, void *userp)
 {
     int accept_fd, linelen, i;
     char *buf = NULL, *cmd, *args[HYD_NUM_TMP_STRINGS];
@@ -41,7 +41,7 @@
         status = HYDU_sock_accept(fd, &accept_fd);
         HYDU_ERR_POP(status, "accept error\n");
 
-        status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_pmi_serv_cb);
+        status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL, HYD_PMCD_pmi_serv_cb);
         HYDU_ERR_POP(status, "unable to register fd\n");
     }
     else {
@@ -141,7 +141,7 @@
 {
     struct HYD_Partition *partition;
     int fd;
-    enum HYD_PMCD_pmi_proxy_cmds cmd;
+    char cmd[HYD_PMCD_MAX_CMD_LEN];
     HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
 
     HYDU_FUNC_ENTER();
@@ -149,7 +149,8 @@
     /* FIXME: Instead of doing this from this process itself, fork a
      * bunch of processes to do this. */
     /* Connect to all proxies and send a KILL command */
-    cmd = KILLALL_PROCS;
+    HYDU_snprintf(cmd, HYD_PMCD_MAX_CMD_LEN, "%s=%s %c\n",
+        "cmd", HYD_PMCD_CMD_KILLALL_PROCS, HYD_PMCD_CMD_SEP_CHAR);
     for (partition = handle.partition_list; partition; partition = partition->next) {
         status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
         if (status != HYD_SUCCESS) {
@@ -158,7 +159,7 @@
             continue;   /* Move on to the next proxy */
         }
 
-        status = HYDU_sock_write(fd, &cmd, sizeof(cmd));
+        status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
         if (status != HYD_SUCCESS) {
             HYDU_Warn_printf("unable to send data to the proxy on %s\n", partition->name);
             overall_status = HYD_INTERNAL_ERROR;

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -30,8 +30,11 @@
     status = HYD_PMCD_pmi_finalize();
     HYDU_ERR_POP(status, "unable to finalize process manager utils\n");
 
-    status = HYD_BSCI_finalize();
-    HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
+    /* We use BSC only for local proxies */
+    if(!handle.is_proxy_remote) {
+        status = HYD_BSCI_finalize();
+        HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
+    }
 
   fn_exit:
     HYDU_FUNC_EXIT();

Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -13,90 +13,19 @@
 #include "pmi_serv.h"
 
 int HYD_PMCD_pmi_serv_listenfd;
-HYD_Handle handle;
+extern HYD_Handle handle;
 
-/*
- * HYD_PMCI_launch_procs: Here are the steps we follow:
- *
- * 1. Find what all ports the user wants to allow and listen on one of
- * those ports.
- *
- * 2. Create a call-back function to accept connections and register
- * the listening socket with the demux engine.
- *
- * 3. Create a port string out of this hostname and port and add it to
- * the environment list under the variable "PMI_PORT".
- *
- * 4. Create an environment variable for PMI_ID. This is an
- * auto-incrementing variable; the bootstrap server will take care of
- * adding the process ID to the start value.
- *
- * 5. Create a process info setup and ask the bootstrap server to
- * launch the processes.
- */
-HYD_Status HYD_PMCI_launch_procs(void)
+/* Local proxy is a proxy that is local to this process */
+static HYD_Status launch_procs_with_local_proxy(void ) 
 {
-    char *port_range, *port_str, *sport;
-    uint16_t port;
+    HYD_Status status = HYD_SUCCESS;
     int i, arg, process_id;
-    char hostname[MAX_HOSTNAME_LEN];
+    char *path_str[HYD_NUM_TMP_STRINGS];
     HYD_Env_t *env;
-    char *path_str[HYD_NUM_TMP_STRINGS];
     struct HYD_Partition *partition;
     struct HYD_Partition_exec *exec;
     struct HYD_Partition_segment *segment;
-    HYD_Status status = HYD_SUCCESS;
 
-    HYDU_FUNC_ENTER();
-
-    status = HYDU_set_common_signals(HYD_PMCD_pmi_serv_signal_cb);
-    HYDU_ERR_POP(status, "unable to set signal\n");
-
-    /* Check if the user wants us to use a port within a certain
-     * range. */
-    port_range = getenv("MPIEXEC_PORTRANGE");
-    if (!port_range)
-        port_range = getenv("MPIEXEC_PORT_RANGE");
-    if (!port_range)
-        port_range = getenv("MPICH_PORT_RANGE");
-
-    /* Listen on a port in the port range */
-    port = 0;
-    status = HYDU_sock_listen(&HYD_PMCD_pmi_serv_listenfd, port_range, &port);
-    HYDU_ERR_POP(status, "unable to listen on port\n");
-
-    /* Register the listening socket with the demux engine */
-    status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_serv_listenfd, HYD_STDOUT,
-                                 HYD_PMCD_pmi_serv_cb);
-    HYDU_ERR_POP(status, "unable to register fd\n");
-
-    /* Create a port string for MPI processes to use to connect to */
-    if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0)
-        HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR,
-                             "gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
-
-    sport = HYDU_int_to_str(port);
-
-    HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
-    HYDU_snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
-                  "%s:%s", hostname, sport);
-    HYDU_FREE(sport);
-    HYDU_Debug("Process manager listening on PMI port %s\n", port_str);
-
-    status = HYDU_env_create(&env, "PMI_PORT", port_str);
-    HYDU_ERR_POP(status, "unable to create env\n");
-
-    status = HYDU_append_env_to_list(*env, &handle.system_env);
-    HYDU_ERR_POP(status, "unable to add env to list\n");
-
-    HYDU_env_free(env);
-    HYDU_FREE(port_str);
-
-    /* Create a process group for the MPI processes in this
-     * comm_world */
-    status = HYD_PMCD_pmi_create_pg();
-    HYDU_ERR_POP(status, "unable to create process group\n");
-
     handle.one_pass_count = 0;
     for (partition = handle.partition_list; partition; partition = partition->next)
         handle.one_pass_count += partition->total_proc_count;
@@ -194,6 +123,192 @@
     HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
 
   fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
+/* Request remote proxies to shutdown */
+static HYD_Status shutdown_remote_proxies(void )
+{
+    char shutdown_proxies_cmd[HYD_PMCD_MAX_CMD_LEN];
+    struct HYD_Partition *partition=NULL;
+    int HYD_PMCD_pmi_proxy_connfd=-1;
+    HYD_Status status = HYD_SUCCESS;
+
+    for (partition = handle.partition_list; partition; partition = partition->next) {
+        status = HYDU_sock_connect(partition->name, handle.pproxy_port,
+                                    &HYD_PMCD_pmi_proxy_connfd);
+        HYDU_ERR_POP(status, "Error connecting to remote proxy");
+
+        /* Create shutdown command */
+        HYDU_snprintf(shutdown_proxies_cmd, HYD_PMCD_MAX_CMD_LEN,
+            "%s=%s %c\n",
+            "cmd", HYD_PMCD_CMD_SHUTDOWN, HYD_PMCD_CMD_SEP_CHAR);
+
+        status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, shutdown_proxies_cmd,
+                                        strlen(shutdown_proxies_cmd));
+        HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+
+        /* FIXME: Read result */
+        partition->out = HYD_PMCD_pmi_proxy_connfd;
+        partition->err = -1;
+    }
+
+  fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
+/* Remote proxy is a proxy external to this process */
+static HYD_Status launch_procs_with_remote_proxy(void )
+{
+    HYD_Status status = HYD_SUCCESS;
+    char launch_procs_cmd[HYD_PMCD_MAX_CMD_LEN];
+    char env_list[HYD_PMCD_MAX_CMD_LEN]; /* FIXME: Wrong *MAX*... */
+    int env_list_len = 0;
+    char *p = NULL;
+    struct HYD_Partition *partition=NULL;
+    struct HYD_Partition_exec *exec=NULL;
+    struct HYD_Env *env=NULL;
+    int HYD_PMCD_pmi_proxy_connfd=-1;
+
+    for (partition = handle.partition_list; partition; partition = partition->next) {
+        status = HYDU_sock_connect(partition->name, handle.pproxy_port,
+                                    &HYD_PMCD_pmi_proxy_connfd);
+        HYDU_ERR_POP(status, "Error connecting to remote proxy");
+
+        exec = partition->exec_list;
+
+        /* FIXME: Create a util func for converting env list to a string */
+        env = handle.system_env;
+        p = env_list;
+        *p = '\0';
+        env_list_len = HYD_PMCD_MAX_CMD_LEN;
+        while(env){
+            HYDU_snprintf(p, env_list_len,"%s=%s %c",
+                            env->env_name, env->env_value, HYD_PMCD_CMD_ENV_SEP_CHAR);
+            env_list_len -= strlen(p);
+            p += strlen(p);
+            env = env->next; 
+        }
+        /* Create launch command */
+        HYDU_snprintf(launch_procs_cmd, HYD_PMCD_MAX_CMD_LEN,
+            "%s=%s %c %s=%s %c %s=%d %c %s=%s %c\n",
+            "cmd", HYD_PMCD_CMD_LAUNCH_PROCS, HYD_PMCD_CMD_SEP_CHAR,
+            "exec_name", exec->exec[0], HYD_PMCD_CMD_SEP_CHAR,
+            "exec_cnt", exec->proc_count, HYD_PMCD_CMD_SEP_CHAR,
+            "env", env_list, HYD_PMCD_CMD_SEP_CHAR); 
+
+        status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, launch_procs_cmd,
+                                        strlen(launch_procs_cmd));
+        HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+
+        /* FIXME: Read result */
+        partition->out = HYD_PMCD_pmi_proxy_connfd;
+        partition->err = -1;
+    }
+
+  fn_exit:
+    return status;
+  fn_fail:
+    goto fn_exit;
+}
+
+/*
+ * HYD_PMCI_launch_procs: Here are the steps we follow:
+ *
+ * 1. Find what all ports the user wants to allow and listen on one of
+ * those ports.
+ *
+ * 2. Create a call-back function to accept connections and register
+ * the listening socket with the demux engine.
+ *
+ * 3. Create a port string out of this hostname and port and add it to
+ * the environment list under the variable "PMI_PORT".
+ *
+ * 4. Create an environment variable for PMI_ID. This is an
+ * auto-incrementing variable; the bootstrap server will take care of
+ * adding the process ID to the start value.
+ *
+ * 5. Create a process info setup and ask the bootstrap server to
+ * launch the processes.
+ */
+HYD_Status HYD_PMCI_launch_procs(void)
+{
+    char *port_range, *port_str, *sport;
+    uint16_t port;
+    char hostname[MAX_HOSTNAME_LEN];
+    HYD_Env_t *env;
+    HYD_Status status = HYD_SUCCESS;
+
+    HYDU_FUNC_ENTER();
+
+    status = HYDU_set_common_signals(HYD_PMCD_pmi_serv_signal_cb);
+    HYDU_ERR_POP(status, "unable to set signal\n");
+
+    /* Check if the user wants us to use a port within a certain
+     * range. */
+    port_range = getenv("MPIEXEC_PORTRANGE");
+    if (!port_range)
+        port_range = getenv("MPIEXEC_PORT_RANGE");
+    if (!port_range)
+        port_range = getenv("MPICH_PORT_RANGE");
+
+    /* Listen on a port in the port range */
+    port = 0;
+    status = HYDU_sock_listen(&HYD_PMCD_pmi_serv_listenfd, port_range, &port);
+    HYDU_ERR_POP(status, "unable to listen on port\n");
+
+    /* Register the listening socket with the demux engine */
+    status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_serv_listenfd, HYD_STDOUT, NULL,
+                                 HYD_PMCD_pmi_serv_cb);
+    HYDU_ERR_POP(status, "unable to register fd\n");
+
+    /* Create a port string for MPI processes to use to connect to */
+    if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0)
+        HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR,
+                             "gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
+
+    sport = HYDU_int_to_str(port);
+
+    HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
+    HYDU_snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
+                  "%s:%s", hostname, sport);
+    HYDU_FREE(sport);
+    HYDU_Debug("Process manager listening on PMI port %s\n", port_str);
+
+    status = HYDU_env_create(&env, "PMI_PORT", port_str);
+    HYDU_ERR_POP(status, "unable to create env\n");
+
+    status = HYDU_append_env_to_list(*env, &handle.system_env);
+    HYDU_ERR_POP(status, "unable to add env to list\n");
+
+    HYDU_env_free(env);
+    HYDU_FREE(port_str);
+
+    /* Create a process group for the MPI processes in this
+     * comm_world */
+    status = HYD_PMCD_pmi_create_pg();
+    HYDU_ERR_POP(status, "unable to create process group\n");
+
+    if(handle.is_proxy_remote) {
+        if(handle.is_proxy_terminator) {
+            status = shutdown_remote_proxies();
+            HYDU_ERR_POP(status, "Error shutting down remote proxies\n");
+        }
+        else{
+            status = launch_procs_with_remote_proxy();
+            HYDU_ERR_POP(status, "Error launching procs with remote proxy\n");
+        }
+    }
+    else {
+        status = launch_procs_with_local_proxy();
+        HYDU_ERR_POP(status, "Error launching procs with local proxy\n");
+    }
+
+  fn_exit:
     HYDU_FUNC_EXIT();
     return status;
 
@@ -208,11 +323,16 @@
 
     HYDU_FUNC_ENTER();
 
-    status = HYD_BSCI_wait_for_completion();
-    if (status != HYD_SUCCESS) {
-        status = HYD_PMCD_pmi_serv_cleanup();
-        HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+    if(handle.is_proxy_remote) {
+        status = HYD_SUCCESS;
     }
+    else{
+        status = HYD_BSCI_wait_for_completion();
+        if (status != HYD_SUCCESS) {
+            status = HYD_PMCD_pmi_serv_cleanup();
+            HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+        }
+    }
 
   fn_exit:
     HYDU_FUNC_EXIT();

Modified: mpich2/trunk/src/pm/hydra/utils/env/env.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/env/env.c	2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/utils/env/env.c	2009-03-27 18:11:52 UTC (rev 4213)
@@ -100,7 +100,7 @@
     env = env_list;
     while (env) {
         status = env_to_str(env, &str_list[i++]);
-        HYDU_ERR_POP(status, "env_to_str returned error\n");
+        HYDU_ERR_POP(status, "HYDU_env_to_str returned error\n");
         env = env->next;
     }
     str_list[i++] = NULL;



More information about the mpich2-commits mailing list