[mpich2-commits] r4213 - in mpich2/trunk/src/pm/hydra: bootstrap/fork bootstrap/slurm bootstrap/ssh bootstrap/utils control/consys demux include launcher/mpiexec launcher/utils pm/pmiserv utils/env
jayesh at mcs.anl.gov
jayesh at mcs.anl.gov
Fri Mar 27 13:11:52 CDT 2009
Author: jayesh
Date: 2009-03-27 13:11:52 -0500 (Fri, 27 Mar 2009)
New Revision: 4213
Modified:
mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c
mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
mpich2/trunk/src/pm/hydra/demux/demux.c
mpich2/trunk/src/pm/hydra/demux/demux.h
mpich2/trunk/src/pm/hydra/include/hydra.h
mpich2/trunk/src/pm/hydra/include/hydra_base.h
mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h
mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
mpich2/trunk/src/pm/hydra/utils/env/env.c
Log:
# Initial cut of distributed proxies support in hydra
- launch/shutdown proxies using job launcher
- launch jobs using the standalone proxy
# DMX engine can now handle user contexts. The user registers the context when registering the fd & the DMX engine provides the context in the callback.
# Limitations (will be fixed soon...)
- Code is a bit hackish... FIXMEs should cover a lot of them
- Works only on localhost - debugging multiple hosts
- Does not support MPMD
- Supports only one job at a time
- Need to provide complete path to executablea - to be fixed soon
Modified: mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,7 +9,7 @@
#include "bscu.h"
#include "fork.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
HYD_Status HYD_BSCD_fork_launch_procs(void)
{
Modified: mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,7 +9,7 @@
#include "bscu.h"
#include "slurm.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
HYD_Status HYD_BSCD_slurm_launch_procs(void)
{
Modified: mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,7 +9,7 @@
#include "bscu.h"
#include "ssh.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
/*
* HYD_BSCD_ssh_launch_procs: For each process, we create an
Modified: mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -8,7 +8,7 @@
#include "hydra_utils.h"
#include "bscu.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
/*
* HYD_BSCU_wait_for_completion: We first wait for communication
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_close.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_close.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
#include "pmci.h"
#include "demux.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
HYD_Status HYD_CSI_close_fd(int fd)
{
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
#include "pmci.h"
#include "demux.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
HYD_Status HYD_CSI_launch_procs(void)
{
@@ -24,11 +24,13 @@
HYDU_ERR_POP(status, "PM returned error while launching processes\n");
for (partition = handle.partition_list; partition; partition = partition->next) {
- status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, handle.stdout_cb);
+ status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, NULL, handle.stdout_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
- status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, handle.stderr_cb);
- HYDU_ERR_POP(status, "demux returned error registering fd\n");
+ if(partition->err != -1) {
+ status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, NULL, handle.stderr_cb);
+ HYDU_ERR_POP(status, "demux returned error registering fd\n");
+ }
}
if (handle.in != -1) { /* Only process_id 0 */
@@ -42,7 +44,7 @@
handle.stdin_buf_count = 0;
handle.stdin_buf_offset = 0;
- status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, handle.stdin_cb);
+ status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, handle.stdin_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
}
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
#include "pmci.h"
#include "demux.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
HYD_Status HYD_CSI_wait_for_completion(void)
{
Modified: mpich2/trunk/src/pm/hydra/demux/demux.c
===================================================================
--- mpich2/trunk/src/pm/hydra/demux/demux.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/demux/demux.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,15 +14,16 @@
int num_fds;
int *fd;
HYD_Event_t events;
- HYD_Status(*callback) (int fd, HYD_Event_t events);
+ void *userp;
+ HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp);
struct HYD_DMXI_callback *next;
} HYD_DMXI_callback_t;
static HYD_DMXI_callback_t *cb_list = NULL;
-HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
- HYD_Status(*callback) (int fd, HYD_Event_t events))
+HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events, void *userp,
+ HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp))
{
HYD_DMXI_callback_t *cb_element, *run;
int i;
@@ -39,6 +40,7 @@
HYDU_MALLOC(cb_element->fd, int *, num_fds * sizeof(int), status);
memcpy(cb_element->fd, fd, num_fds * sizeof(int));
cb_element->events = events;
+ cb_element->userp = userp;
cb_element->callback = callback;
cb_element->next = NULL;
@@ -158,7 +160,7 @@
if (pollfds[i].revents & POLLIN)
events |= HYD_STDOUT;
- status = run->callback(pollfds[i].fd, events);
+ status = run->callback(pollfds[i].fd, events, run->userp);
HYDU_ERR_POP(status, "callback returned error status\n");
}
Modified: mpich2/trunk/src/pm/hydra/demux/demux.h
===================================================================
--- mpich2/trunk/src/pm/hydra/demux/demux.h 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/demux/demux.h 2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,8 +9,8 @@
#include "hydra.h"
-HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
- HYD_Status(*callback) (int fd, HYD_Event_t events));
+HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events, void *userp,
+ HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp));
HYD_Status HYD_DMX_deregister_fd(int fd);
HYD_Status HYD_DMX_wait_for_event(int time);
HYD_Status HYD_DMX_finalize(void);
Modified: mpich2/trunk/src/pm/hydra/include/hydra.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra.h 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/include/hydra.h 2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,7 +14,16 @@
struct HYD_Handle_ {
char *base_path;
int proxy_port;
+ /* The persistent proxy is different from the centralized proxy
+ * and hence needs its own port - pproxy_port */
+ int pproxy_port;
char *bootstrap;
+ /* FIXME: We should define a proxy type instead of all these
+ * flags... proxy_type = PROXY_LAUNCHER | PROXY_TERMINATOR
+ */
+ int is_proxy_launcher;
+ int is_proxy_terminator;
+ int is_proxy_remote;
HYD_Binding binding;
char *user_bind_map;
@@ -31,9 +40,9 @@
HYD_Env_t *prop_env;
int in;
- HYD_Status(*stdin_cb) (int fd, HYD_Event_t events);
- HYD_Status(*stdout_cb) (int fd, HYD_Event_t events);
- HYD_Status(*stderr_cb) (int fd, HYD_Event_t events);
+ HYD_Status(*stdin_cb) (int fd, HYD_Event_t events, void *userp);
+ HYD_Status(*stdout_cb) (int fd, HYD_Event_t events, void *userp);
+ HYD_Status(*stderr_cb) (int fd, HYD_Event_t events, void *userp);
/* Start time and timeout. These are filled in by the launcher,
* but are utilized by the demux engine and the boot-strap server
@@ -57,4 +66,7 @@
extern HYD_Handle handle;
+#define HYD_PROXY_NAME "pmi_proxy"
+#define HYD_PPROXY_PORT 8677
+
#endif /* HYDRA_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/include/hydra_base.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_base.h 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/include/hydra_base.h 2009-03-27 18:11:52 UTC (rev 4213)
@@ -34,6 +34,10 @@
#include <sys/types.h>
#endif /* HAVE_SYS_TYPES_H */
+#if defined HAVE_SYS_STAT_H
+#include <sys/stat.h>
+#endif /* HAVE_SYS_STAT_H */
+
#include <errno.h>
#if defined HAVE_GETTIMEOFDAY
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -9,9 +9,9 @@
#include "mpiexec.h"
#include "csi.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
-HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
HYD_Status status = HYD_SUCCESS;
@@ -38,8 +38,7 @@
goto fn_exit;
}
-
-HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
HYD_Status status = HYD_SUCCESS;
@@ -67,7 +66,7 @@
}
-HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
HYD_Status status = HYD_SUCCESS;
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,7 +10,7 @@
#include "lchu.h"
#include "csi.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
static void usage(void)
{
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.h 2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,8 +10,8 @@
#include "hydra.h"
HYD_Status HYD_LCHI_get_parameters(char **t_argv);
-HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events, void *userp);
#endif /* MPIEXEC_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -11,7 +11,7 @@
#define HYDRA_MAX_PATH 4096
-HYD_Handle handle;
+extern HYD_Handle handle;
#define INCREMENT_ARGV(status) \
{ \
@@ -33,7 +33,7 @@
{
int i, local_env_set;
char **argv = t_argv, *tmp;
- char *env_name, *env_value, *str[4] = { NULL }, *progname = *argv;
+ char *env_name, *env_value, *str[4] = { NULL, NULL, NULL, NULL }, *progname = *argv;
HYD_Env_t *env;
struct HYD_Exec_info *exec_info;
HYD_Status status = HYD_SUCCESS;
@@ -68,7 +68,32 @@
handle.enablex = !strcmp(*argv, "--enable-x");
continue;
}
+
+ if(!strcmp(*argv, "--boot-proxies")) {
+ /* FIXME: Prevent usage of incompatible params */
+ handle.bootstrap = HYDU_strdup("ssh");
+ handle.is_proxy_launcher = 1;
+ handle.prop = HYD_ENV_PROP_ALL;
+ continue;
+ }
+ if(!strcmp(*argv, "--remote-proxy")) {
+ /* FIXME: We should get rid of this option eventually.
+ * This should be the default case. The centralized
+ * version should use an option like "--local-proxy"
+ */
+ handle.is_proxy_remote = 1;
+ handle.prop = HYD_ENV_PROP_ALL;
+ continue;
+ }
+
+ if(!strcmp(*argv, "--shutdown-proxies")) {
+ handle.is_proxy_remote = 1;
+ handle.is_proxy_terminator = 1;
+ handle.prop = HYD_ENV_PROP_ALL;
+ continue;
+ }
+
if (!strcmp(*argv, "-genvall")) {
HYDU_ERR_CHKANDJUMP(status, handle.prop != HYD_ENV_PROP_UNSET,
HYD_INTERNAL_ERROR, "duplicate prop setting\n");
@@ -237,12 +262,24 @@
continue;
}
+ if (!strcmp(str[0], "--pproxy-port")) {
+ if (!str[1]) {
+ INCREMENT_ARGV(status);
+ str[1] = *argv;
+ }
+ HYDU_ERR_CHKANDJUMP(status, handle.pproxy_port != -1, HYD_INTERNAL_ERROR,
+ "duplicate persistent proxy port\n");
+ handle.pproxy_port = atoi(str[1]);
+ continue;
+ }
+
if (*argv[0] == '-')
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "unrecognized argument\n");
status = HYD_LCHU_get_current_exec_info(&exec_info);
HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
+ /* End of Job launcher option handling */
/* Read the executable till you hit the end of a ":" */
do {
if (!strcmp(*argv, ":")) { /* Next executable */
@@ -262,8 +299,32 @@
break;
continue;
+
}
+ /* In the case of the proxy launcher, aka --boot-proxies, there is no executable
+ * specified */
+ if(handle.is_proxy_launcher || handle.is_proxy_terminator) {
+ if(handle.exec_info_list != NULL)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+ "No executable should be specified when booting proxies\n");
+ status = HYD_LCHU_get_current_exec_info(&exec_info);
+ HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
+
+ exec_info->exec[0] = HYDU_strdup(HYD_PROXY_NAME" --persistent");
+ exec_info->exec[1] = NULL;
+ exec_info->exec_proc_count = 1;
+
+ env_name = HYDU_strdup("HYD_PROXY_PORT");
+ env_value = HYDU_int_to_str(handle.pproxy_port);
+
+ status = HYDU_env_create(&env, env_name, env_value);
+ HYDU_ERR_POP(status, "unable to create env struct\n");
+
+ HYDU_append_env_to_list(*env, &exec_info->user_env);
+ }
+
+
tmp = getenv("MPIEXEC_DEBUG");
if (handle.debug == -1 && tmp)
handle.debug = atoi(tmp) ? 1 : 0;
Modified: mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,7 +14,11 @@
{
handle.base_path = NULL;
handle.proxy_port = -1;
+ handle.pproxy_port = HYD_PPROXY_PORT;
handle.bootstrap = NULL;
+ handle.is_proxy_launcher = 0;
+ handle.is_proxy_terminator = 0;
+ handle.is_proxy_remote = 0;
handle.binding = HYD_BIND_UNSET;
handle.user_bind_map = NULL;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -8,7 +8,7 @@
#include "pmi_handle.h"
#include "pmi_handle_v1.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
HYD_PMCD_pmi_pg_t *pg_list = NULL;
struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_v1;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -11,7 +11,7 @@
#include "pmi_handle.h"
#include "pmi_handle_v1.h"
-HYD_Handle handle;
+extern HYD_Handle handle;
HYD_PMCD_pmi_pg_t *pg_list;
/* TODO: abort, create_kvs, destroy_kvs, getbyidx, spawn */
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -12,6 +12,76 @@
struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
int HYD_PMCD_pmi_proxy_listenfd;
+static HYD_Status HYD_PMCD_pmi_pproxy_start(void ) {
+ /* If this function exits... its always an error */
+ HYD_Status status = HYD_INTERNAL_ERROR;
+ int ret = 0;
+ pid_t proc_id = -1;
+ struct rlimit rl;
+
+ umask(0);
+
+ /* Get the limit of fds */
+ ret = getrlimit(RLIMIT_NOFILE, &rl);
+ if(ret == -1)
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "getrlimit() failed (%s)\n",
+ HYDU_strerror(errno));
+
+ proc_id = fork();
+ if(proc_id > 0 ) {
+ /* Ignore exit from child proc - persistent pmi proxy */
+ status = HYDU_set_signal(SIGCHLD, SIG_IGN);
+ HYDU_ERR_POP(status, "Setting SIGCHLD handler to SIG_IGN failed\n");
+
+ /* Parent process exits */
+ if(!HYD_PMCD_pmi_proxy_params.debug)
+ exit(0);
+ }
+ else if(proc_id == 0 ) {
+ /* Child proc continues */
+ int i;
+ pid_t spid;
+ spid = setsid();
+ if(spid == -1)
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "setsid() failed(%s)\n",
+ HYDU_strerror(errno));
+
+ if(!HYD_PMCD_pmi_proxy_params.debug)
+ for(i=0; i<rl.rlim_max; i++)
+ close(i);
+ /* FIXME: dup(0,1,2) to "/dev/null" */
+
+ if(getenv("HYD_PROXY_PORT"))
+ HYD_PMCD_pmi_proxy_params.proxy_port = atoi(getenv("HYD_PROXY_PORT"));
+ else
+ HYD_PMCD_pmi_proxy_params.proxy_port = -1;
+
+ status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
+ (uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
+ HYDU_ERR_POP(status, "unable to listen on socket\n");
+
+ /* Register the listening socket with the demux engine */
+ status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
+ HYD_PMCD_pmi_proxy_listen_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ }
+ else {
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "fork() failed (%s) \n",
+ HYDU_strerror(errno));
+ }
+
+
+ while(1) {
+ status = HYD_DMX_wait_for_event(-1);
+ HYDU_ERR_POP(status, "demux engine error waiting for event\n");
+ }
+
+fn_exit:
+ return status;
+fn_fail:
+ goto fn_exit;
+}
+
int main(int argc, char **argv)
{
int i, j, arg, count, pid, ret_status;
@@ -26,12 +96,18 @@
status = HYD_PMCD_pmi_proxy_get_params(argc, argv);
HYDU_ERR_POP(status, "bad parameters passed to the proxy\n");
+ if(HYD_PMCD_pmi_proxy_params.is_persistent) {
+ status = HYD_PMCD_pmi_pproxy_start();
+ HYDU_ERR_POP(status, "Error starting persistent PMI proxy\n");
+ goto fn_exit;
+ }
+
status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
(uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
HYDU_ERR_POP(status, "unable to listen on socket\n");
/* Register the listening socket with the demux engine */
- status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT,
+ status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
HYD_PMCD_pmi_proxy_listen_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
@@ -123,7 +199,7 @@
HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
status =
- HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, HYD_PMCD_pmi_proxy_stdin_cb);
+ HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, HYD_PMCD_pmi_proxy_stdin_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
}
else {
@@ -142,12 +218,12 @@
/* Everything is spawned, now wait for I/O */
status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
HYD_PMCD_pmi_proxy_params.out,
- HYD_STDOUT, HYD_PMCD_pmi_proxy_stdout_cb);
+ HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stdout_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
HYD_PMCD_pmi_proxy_params.err,
- HYD_STDOUT, HYD_PMCD_pmi_proxy_stderr_cb);
+ HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stderr_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
while (1) {
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h 2009-03-27 18:11:52 UTC (rev 4213)
@@ -11,7 +11,10 @@
#include "hydra_utils.h"
struct HYD_PMCD_pmi_proxy_params {
+ int debug;
+
int proxy_port;
+ int is_persistent;
char *wdir;
HYD_Binding binding;
char *user_bind_map;
@@ -31,6 +34,7 @@
int *err;
int *exit_status;
int in;
+ int rproxy_connfd;
int stdin_buf_offset;
int stdin_buf_count;
@@ -40,10 +44,18 @@
extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
extern int HYD_PMCD_pmi_proxy_listenfd;
+HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv);
-HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
+ int *key_lenp, char **valp, int *val_lenp, char separator);
+HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len);
+HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len);
+HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events, void *userp);
#endif /* PMI_PROXY_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -10,13 +10,13 @@
#include "demux.h"
#include "pmi_serv.h"
-struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
int HYD_PMCD_pmi_proxy_listenfd;
-HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp)
{
- int accept_fd, count, i;
- enum HYD_PMCD_pmi_proxy_cmds cmd;
+ int accept_fd, cmd_len;
+ char cmd[HYD_PMCD_MAX_CMD_LEN];
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
@@ -28,39 +28,67 @@
status = HYDU_sock_accept(fd, &accept_fd);
HYDU_ERR_POP(status, "accept error\n");
- status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_pmi_proxy_listen_cb);
+ status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_listen_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
}
else { /* We got a command from mpiexec */
- count = read(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
- if (count < 0) {
- HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n",
- fd, HYDU_strerror(errno));
- }
- else if (count == 0) {
+ status = HYDU_sock_readline(fd, cmd, HYD_PMCD_MAX_CMD_LEN, &cmd_len);
+ HYDU_ERR_POP(status, "Error reading command from proxy");
+ if (cmd_len == 0) {
/* The connection has closed */
status = HYD_DMX_deregister_fd(fd);
HYDU_ERR_POP(status, "unable to deregister fd\n");
close(fd);
goto fn_exit;
}
+ status = HYD_PMCD_pmi_proxy_handle_cmd(fd, cmd, cmd_len);
+ HYDU_ERR_POP(status, "Error handling proxy command\n");
+ }
- if (cmd == KILLALL_PROCS) { /* Got the killall command */
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
- if (HYD_PMCD_pmi_proxy_params.pid[i] != -1)
- {
- kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGTERM);
- kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
- }
-
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to register fd\n");
- close(fd);
+HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp)
+{
+ int closed, i;
+ HYD_Status status = HYD_SUCCESS;
+ struct HYD_PMCD_pmi_proxy_params *proxy_params;
+
+ HYDU_FUNC_ENTER();
+ proxy_params = (struct HYD_PMCD_pmi_proxy_params *)userp;
+
+ status = HYDU_sock_stdout_cb(fd, events, proxy_params->rproxy_connfd, &closed);
+ HYDU_ERR_POP(status, "stdout callback error\n");
+
+ if (closed) {
+ int all_procs_exited = 1;
+ /* The process exited */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+ /* FIXME: This could be a perf killer if we have a lot of procs associated with
+ * the same job on a single proxy
+ */
+ for (i = 0; i < proxy_params->exec_proc_count; i++) {
+ int ret_status = 0;
+ if (proxy_params->out[i] == fd) {
+ waitpid(proxy_params->pid[i], &ret_status, WUNTRACED);
+ close(proxy_params->in);
+ proxy_params->out[i] = -1;
+ proxy_params->err[i] = -1;
+ }
+ if (proxy_params->out[i] != -1) all_procs_exited = 0;
}
- else {
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
- "got unrecognized command from mpiexec\n");
+ if(all_procs_exited) {
+ close(proxy_params->rproxy_connfd);
+ status = HYD_DMX_deregister_fd(proxy_params->rproxy_connfd);
+ HYDU_ERR_POP(status, "Error deregistering remote job conn fd\n");
+ status = HYD_PMCD_pmi_proxy_cleanup_params(proxy_params);
+ HYDU_ERR_POP(status, "Error cleaning up proxy params\n");
}
}
@@ -72,8 +100,33 @@
goto fn_exit;
}
+HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp)
+{
+ int closed=0, i;
+ HYD_Status status = HYD_SUCCESS;
-HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events)
+ HYDU_FUNC_ENTER();
+ /* FIXME: This cb should take care of the commands from mpiexec */
+
+ if (closed) {
+ /* The connection has closed */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+ if (HYD_PMCD_pmi_proxy_params.out[i] == fd)
+ HYD_PMCD_pmi_proxy_params.out[i] = -1;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events, void *userp)
{
int closed, i;
HYD_Status status = HYD_SUCCESS;
@@ -102,7 +155,7 @@
}
-HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events, void *userp)
{
int closed, i;
HYD_Status status = HYD_SUCCESS;
@@ -131,7 +184,7 @@
}
-HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
HYD_Status status = HYD_SUCCESS;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -5,10 +5,307 @@
*/
#include "pmi_proxy.h"
+#include "pmi_serv.h"
+#include "demux.h"
#include "hydra_utils.h"
struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
+ int *key_lenp, char **valp, int *val_lenp, char separator)
+{
+ char *p = NULL;
+ int len = 0;
+ int klen = 0;
+ int vlen = 0;
+
+ HYD_Status status = HYD_SUCCESS;
+
+ p = *bufp;
+ len = *buf_lenp;
+
+ while(len && isspace(*p)) { p++; len--; }
+ if(len <= 0)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+
+ *keyp = p;
+ klen = 0;
+ while(len && (*p != '=')) {
+ p++; len--;
+ klen++;
+ }
+ if(key_lenp) *key_lenp = klen;
+ p++; len--;
+ if(len <= 0)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+
+ *valp = p;
+ vlen = 0;
+ /* FIXME: Allow escaping ';' */
+ while(len && (*p != separator)) {
+ p++; len--;
+ vlen++;
+ }
+ if(val_lenp) *val_lenp = vlen;
+ p++; len--;
+ if(len < 0)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+
+ while(len && isspace(*p)) { p++; len--; }
+ /* p now points to the next key or the end of string */
+ *bufp = p;
+ if(*p != '\0') {
+ /* Remaining length of buffer to be processed */
+ *buf_lenp = len;
+ }
+ else{
+ /* End of string - no more keyvals */
+ *buf_lenp = 0;
+ }
+
+ fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
+HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len)
+{
+ char *key, *val;
+ int i=0, key_len=0, val_len = 0, core = 0;
+ struct HYD_Partition_exec *exec = NULL;
+ HYD_Env_t *env=NULL;
+ HYD_Status status = HYD_SUCCESS;
+
+ /* FIXME: We currently support only one job - We need a list of proxy params for multiple jobs */
+ status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
+ HYDU_ERR_POP(status, "Error initializing proxy params\n");
+
+ HYD_PMCD_pmi_proxy_params.rproxy_connfd = job_connfd;
+
+ status = HYD_DMX_deregister_fd(job_connfd);
+ HYDU_ERR_POP(status, "Unable to deregister job conn fd\n");
+ status = HYD_DMX_register_fd(1, &job_connfd, HYD_STDIN, (void *)&HYD_PMCD_pmi_proxy_params, HYD_PMCD_pmi_proxy_remote_cb);
+ HYDU_ERR_POP(status, "Unable to register job conn fd\n");
+
+ status = HYDU_alloc_partition_exec(&HYD_PMCD_pmi_proxy_params.exec_list);
+ HYDU_ERR_POP(status, "unable to allocate partition exec\n");
+
+ exec = HYD_PMCD_pmi_proxy_params.exec_list;
+
+ while(cmd_len > 0){
+ status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&launch_cmd, &cmd_len, &key, &key_len, &val, &val_len, HYD_PMCD_CMD_SEP_CHAR);
+ HYDU_ERR_POP(status, "Unable to get next key from launch command\n");
+
+ /* FIXME: Use pre-defined macros for keys */
+ if(!strncmp(key, "exec_name", key_len)) {
+ HYDU_MALLOC(exec->exec[0], char *, (val_len + 1), status);
+ HYDU_ERR_POP(status, "Error allocating memory for executable name\n");
+ HYDU_snprintf(exec->exec[0], val_len, "%s", val);
+ exec->exec[1] = NULL;
+ }
+ else if(!strncmp(key, "exec_cnt", key_len)) {
+ for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec->next; exec = exec->next);
+ exec->proc_count = atoi(val);
+ }
+ else if(!strncmp(key, "env", key_len)) {
+ char *env_str;
+ int env_str_len;
+
+ env_str = val; env_str_len = val_len;
+ exec->prop_env = NULL;
+ while(env_str_len > 0) {
+ status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&env_str, &env_str_len, &key, &key_len, &val, &val_len, HYD_PMCD_CMD_ENV_SEP_CHAR);
+ HYDU_ERR_POP(status, "Error getting next environment variable from launch command\n");
+
+ HYDU_MALLOC(env, HYD_Env_t *, sizeof(HYD_Env_t), status);
+ HYDU_ERR_POP(status, "Error allocating memory for environment variable in proxy params\n");
+
+ HYDU_MALLOC(env->env_name, char *, key_len+1, status);
+ HYDU_ERR_POP(status, "Error allocating memory for environment variable in proxy params\n");
+ HYDU_snprintf(env->env_name, key_len+1, "%s", key);
+
+ HYDU_MALLOC(env->env_value, char *, val_len+1, status);
+ HYDU_ERR_POP(status, "Error allocating memory for environment variable in proxy params\n");
+ HYDU_snprintf(env->env_value, val_len+1, "%s", val);
+ env->next = exec->prop_env;
+ exec->prop_env = env;
+ }
+ }
+ else{
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Unrecognized key in launch command\n");
+ }
+
+ /* FIXME: Set these ... */
+ /* HYD_PMCD_pmi_proxy_params.wdir =
+ HYD_PMCD_pmi_proxy_params.binding =
+ HYD_PMCD_pmi_proxy_params.user_bind_map = ;
+ HYDU_append_env_to_list(*env, &HYD_PMCD_pmi_proxy_params.global_env);
+ HYD_PMCD_pmi_proxy_params.one_pass_count
+ status = HYDU_alloc_partition_segment(&segment->next);
+ segment->proc_count = ;
+ segment->start_pid = ;
+ */
+ }
+
+ HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
+ for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
+ HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
+
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+
+ for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
+ for (i = 0; i < exec->proc_count; i++) {
+ char *str = NULL;
+ core = 0;
+ env = NULL;
+ /* FIXME: Use the start pmi_id from launch command */
+ str = HYDU_int_to_str(i);
+ status = HYDU_env_create(&env, "PMI_ID", str);
+ HYDU_ERR_POP(status, "unable to create env\n");
+ status = HYDU_putenv(env);
+ HYDU_ERR_POP(status, "putenv failed\n");
+ status = HYDU_create_process(&exec->exec[0], exec->prop_env, NULL,
+ &HYD_PMCD_pmi_proxy_params.out[i],
+ &HYD_PMCD_pmi_proxy_params.err[i],
+ &HYD_PMCD_pmi_proxy_params.pid[i], core);
+ HYDU_ERR_POP(status, "Error launching process\n");
+ status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_params.out[i], HYD_STDOUT, (void *)&HYD_PMCD_pmi_proxy_params, HYD_PMCD_pmi_proxy_rstdout_cb);
+ HYDU_ERR_POP(status, "Error registering process stdout\n");
+ }
+ }
+
+ fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
+/* Handle proxy commands */
+HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len)
+{
+ char *key = NULL;
+ char *cmd_name = NULL;
+ int i=0, key_len = 0, cmd_name_len = 0;
+ char *cmdp = NULL;
+ HYD_Status status = HYD_SUCCESS;
+
+ cmdp = cmd;
+ /* First key/val is the command name */
+ status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&cmdp, &cmd_len, &key, &key_len, &cmd_name,
+ &cmd_name_len, HYD_PMCD_CMD_SEP_CHAR );
+ HYDU_ERR_POP(status, "Error retreiving command name from command\n");
+
+ if (!strncmp(cmd_name, HYD_PMCD_CMD_KILLALL_PROCS, key_len)) {
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+ if (HYD_PMCD_pmi_proxy_params.pid[i] != -1)
+ kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
+
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ close(fd);
+ }
+ else if(!strncmp(cmd_name, HYD_PMCD_CMD_LAUNCH_PROCS, key_len)) {
+ status = HYD_PMCD_pmi_proxy_handle_launch_cmd(fd, cmdp, cmd_len);
+ HYDU_ERR_POP(status, "Unable to handle launch command\n");
+ }
+ else if(!strncmp(cmd_name, HYD_PMCD_CMD_SHUTDOWN, key_len)) {
+ /* FIXME: Not a clean shutdown... Kill all procs before exiting */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ close(fd);
+ exit(0);
+ }
+ else {
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+ "got unrecognized command from mpiexec\n");
+ }
+ fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
+/* Initialize proxy params */
+HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params)
+{
+ HYD_Status status = HYD_SUCCESS;
+ proxy_params->debug = 0;
+ proxy_params->proxy_port = -1;
+ proxy_params->is_persistent = 0;
+ proxy_params->wdir = NULL;
+ proxy_params->binding = HYD_BIND_UNSET;
+ proxy_params->user_bind_map = NULL;
+ proxy_params->global_env = NULL;
+ proxy_params->one_pass_count = 0;
+ proxy_params->partition_proc_count = 0;
+ proxy_params->exec_proc_count = 0;
+ proxy_params->segment_list = NULL;
+ proxy_params->exec_list = NULL;
+ proxy_params->pid = NULL;
+ proxy_params->out = NULL;
+ proxy_params->err = NULL;
+ proxy_params->exit_status = NULL;
+ proxy_params->in = -1;
+ proxy_params->rproxy_connfd = -1;
+ proxy_params->stdin_buf_offset = 0;
+ proxy_params->stdin_buf_count = 0;
+ proxy_params->stdin_tmp_buf[0] = '\0';
+ return status;
+}
+
+/* Cleanup proxy params after use */
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params *proxy_params)
+{
+ HYD_Status status = HYD_SUCCESS;
+ if(proxy_params->wdir != NULL)
+ HYDU_FREE(proxy_params->wdir);
+ if(proxy_params->user_bind_map != NULL)
+ HYDU_FREE(proxy_params->user_bind_map);
+ if(proxy_params->global_env != NULL){
+ HYD_Env_t *p, *q;
+ do {
+ p = proxy_params->global_env;
+ q = p->next;
+ HYDU_FREE(p);
+ }while(q);
+ }
+ if(proxy_params->segment_list != NULL) {
+ /* FIXME : incomplete */
+ }
+ if(proxy_params->exec_list != NULL) {
+ struct HYD_Partition_exec *p, *q;
+ do{
+ p = proxy_params->exec_list;
+ q = p->next;
+ HYDU_FREE(p);
+ }while(q);
+ }
+ if(proxy_params->pid != NULL)
+ HYDU_FREE(proxy_params->pid);
+ if(proxy_params->out != NULL)
+ HYDU_FREE(proxy_params->out);
+ if(proxy_params->err != NULL)
+ HYDU_FREE(proxy_params->err);
+ if(proxy_params->exit_status != NULL)
+ HYDU_FREE(proxy_params->exit_status);
+
+ status = HYD_PMCD_pmi_proxy_init_params(proxy_params);
+ HYDU_ERR_POP(status, "Error initializing proxy params\n");
+
+ fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv)
{
char **argv = t_argv, *str;
@@ -20,15 +317,18 @@
HYDU_FUNC_ENTER();
- HYD_PMCD_pmi_proxy_params.exec_list = NULL;
- HYD_PMCD_pmi_proxy_params.segment_list = NULL;
- HYD_PMCD_pmi_proxy_params.global_env = NULL;
+ status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
+ HYDU_ERR_POP(status, "Error initializing proxy params\n");
while (*argv) {
++argv;
if (*argv == NULL)
break;
+ if(!strcmp(*argv, "--verbose")) {
+ HYD_PMCD_pmi_proxy_params.debug = 1;
+ continue;
+ }
/* Proxy port */
if (!strcmp(*argv, "--proxy-port")) {
argv++;
@@ -36,6 +336,11 @@
continue;
}
+ if(!strcmp(*argv, "--persistent")) {
+ HYD_PMCD_pmi_proxy_params.is_persistent = 1;
+ continue;
+ }
+
/* Working directory */
if (!strcmp(*argv, "--wdir")) {
argv++;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h 2009-03-27 18:11:52 UTC (rev 4213)
@@ -7,14 +7,20 @@
#ifndef PMI_SERV_H_INCLUDED
#define PMI_SERV_H_INCLUDED
-/* Currently we only have one command */
-enum HYD_PMCD_pmi_proxy_cmds {
- KILLALL_PROCS
-};
+/* The set of commands supported */
+#define HYD_PMCD_CMD_KILLALL_PROCS "kill_all_procs"
+#define HYD_PMCD_CMD_KILLALL_PROXIES "kill_all_proxies"
+#define HYD_PMCD_CMD_LAUNCH_PROCS "launch_procs"
+#define HYD_PMCD_CMD_SHUTDOWN "shutdown"
+#define HYD_PMCD_CMD_SEP_CHAR ';'
+#define HYD_PMCD_CMD_ENV_SEP_CHAR ','
+
+#define HYD_PMCD_MAX_CMD_LEN 1024
+
extern int HYD_PMCD_pmi_serv_listenfd;
-HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_serv_cleanup(void);
void HYD_PMCD_pmi_serv_signal_cb(int signal);
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -14,7 +14,7 @@
#include "pmi_serv.h"
int HYD_PMCD_pmi_serv_listenfd;
-HYD_Handle handle;
+extern HYD_Handle handle;
struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_handle_list;
/*
@@ -27,7 +27,7 @@
* 2. The client sends us a "cmd" or "mcmd" string which means that a
* single or multi-line command is to follow.
*/
-HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events)
+HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events, void *userp)
{
int accept_fd, linelen, i;
char *buf = NULL, *cmd, *args[HYD_NUM_TMP_STRINGS];
@@ -41,7 +41,7 @@
status = HYDU_sock_accept(fd, &accept_fd);
HYDU_ERR_POP(status, "accept error\n");
- status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_pmi_serv_cb);
+ status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL, HYD_PMCD_pmi_serv_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
}
else {
@@ -141,7 +141,7 @@
{
struct HYD_Partition *partition;
int fd;
- enum HYD_PMCD_pmi_proxy_cmds cmd;
+ char cmd[HYD_PMCD_MAX_CMD_LEN];
HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
@@ -149,7 +149,8 @@
/* FIXME: Instead of doing this from this process itself, fork a
* bunch of processes to do this. */
/* Connect to all proxies and send a KILL command */
- cmd = KILLALL_PROCS;
+ HYDU_snprintf(cmd, HYD_PMCD_MAX_CMD_LEN, "%s=%s %c\n",
+ "cmd", HYD_PMCD_CMD_KILLALL_PROCS, HYD_PMCD_CMD_SEP_CHAR);
for (partition = handle.partition_list; partition; partition = partition->next) {
status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
if (status != HYD_SUCCESS) {
@@ -158,7 +159,7 @@
continue; /* Move on to the next proxy */
}
- status = HYDU_sock_write(fd, &cmd, sizeof(cmd));
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
HYDU_Warn_printf("unable to send data to the proxy on %s\n", partition->name);
overall_status = HYD_INTERNAL_ERROR;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -30,8 +30,11 @@
status = HYD_PMCD_pmi_finalize();
HYDU_ERR_POP(status, "unable to finalize process manager utils\n");
- status = HYD_BSCI_finalize();
- HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
+ /* We use BSC only for local proxies */
+ if(!handle.is_proxy_remote) {
+ status = HYD_BSCI_finalize();
+ HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
+ }
fn_exit:
HYDU_FUNC_EXIT();
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -13,90 +13,19 @@
#include "pmi_serv.h"
int HYD_PMCD_pmi_serv_listenfd;
-HYD_Handle handle;
+extern HYD_Handle handle;
-/*
- * HYD_PMCI_launch_procs: Here are the steps we follow:
- *
- * 1. Find what all ports the user wants to allow and listen on one of
- * those ports.
- *
- * 2. Create a call-back function to accept connections and register
- * the listening socket with the demux engine.
- *
- * 3. Create a port string out of this hostname and port and add it to
- * the environment list under the variable "PMI_PORT".
- *
- * 4. Create an environment variable for PMI_ID. This is an
- * auto-incrementing variable; the bootstrap server will take care of
- * adding the process ID to the start value.
- *
- * 5. Create a process info setup and ask the bootstrap server to
- * launch the processes.
- */
-HYD_Status HYD_PMCI_launch_procs(void)
+/* Local proxy is a proxy that is local to this process */
+static HYD_Status launch_procs_with_local_proxy(void )
{
- char *port_range, *port_str, *sport;
- uint16_t port;
+ HYD_Status status = HYD_SUCCESS;
int i, arg, process_id;
- char hostname[MAX_HOSTNAME_LEN];
+ char *path_str[HYD_NUM_TMP_STRINGS];
HYD_Env_t *env;
- char *path_str[HYD_NUM_TMP_STRINGS];
struct HYD_Partition *partition;
struct HYD_Partition_exec *exec;
struct HYD_Partition_segment *segment;
- HYD_Status status = HYD_SUCCESS;
- HYDU_FUNC_ENTER();
-
- status = HYDU_set_common_signals(HYD_PMCD_pmi_serv_signal_cb);
- HYDU_ERR_POP(status, "unable to set signal\n");
-
- /* Check if the user wants us to use a port within a certain
- * range. */
- port_range = getenv("MPIEXEC_PORTRANGE");
- if (!port_range)
- port_range = getenv("MPIEXEC_PORT_RANGE");
- if (!port_range)
- port_range = getenv("MPICH_PORT_RANGE");
-
- /* Listen on a port in the port range */
- port = 0;
- status = HYDU_sock_listen(&HYD_PMCD_pmi_serv_listenfd, port_range, &port);
- HYDU_ERR_POP(status, "unable to listen on port\n");
-
- /* Register the listening socket with the demux engine */
- status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_serv_listenfd, HYD_STDOUT,
- HYD_PMCD_pmi_serv_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
-
- /* Create a port string for MPI processes to use to connect to */
- if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0)
- HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR,
- "gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
-
- sport = HYDU_int_to_str(port);
-
- HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
- HYDU_snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
- "%s:%s", hostname, sport);
- HYDU_FREE(sport);
- HYDU_Debug("Process manager listening on PMI port %s\n", port_str);
-
- status = HYDU_env_create(&env, "PMI_PORT", port_str);
- HYDU_ERR_POP(status, "unable to create env\n");
-
- status = HYDU_append_env_to_list(*env, &handle.system_env);
- HYDU_ERR_POP(status, "unable to add env to list\n");
-
- HYDU_env_free(env);
- HYDU_FREE(port_str);
-
- /* Create a process group for the MPI processes in this
- * comm_world */
- status = HYD_PMCD_pmi_create_pg();
- HYDU_ERR_POP(status, "unable to create process group\n");
-
handle.one_pass_count = 0;
for (partition = handle.partition_list; partition; partition = partition->next)
handle.one_pass_count += partition->total_proc_count;
@@ -194,6 +123,192 @@
HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
+/* Request remote proxies to shutdown */
+static HYD_Status shutdown_remote_proxies(void )
+{
+ char shutdown_proxies_cmd[HYD_PMCD_MAX_CMD_LEN];
+ struct HYD_Partition *partition=NULL;
+ int HYD_PMCD_pmi_proxy_connfd=-1;
+ HYD_Status status = HYD_SUCCESS;
+
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ status = HYDU_sock_connect(partition->name, handle.pproxy_port,
+ &HYD_PMCD_pmi_proxy_connfd);
+ HYDU_ERR_POP(status, "Error connecting to remote proxy");
+
+ /* Create shutdown command */
+ HYDU_snprintf(shutdown_proxies_cmd, HYD_PMCD_MAX_CMD_LEN,
+ "%s=%s %c\n",
+ "cmd", HYD_PMCD_CMD_SHUTDOWN, HYD_PMCD_CMD_SEP_CHAR);
+
+ status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, shutdown_proxies_cmd,
+ strlen(shutdown_proxies_cmd));
+ HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+
+ /* FIXME: Read result */
+ partition->out = HYD_PMCD_pmi_proxy_connfd;
+ partition->err = -1;
+ }
+
+ fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
+/* Remote proxy is a proxy external to this process */
+static HYD_Status launch_procs_with_remote_proxy(void )
+{
+ HYD_Status status = HYD_SUCCESS;
+ char launch_procs_cmd[HYD_PMCD_MAX_CMD_LEN];
+ char env_list[HYD_PMCD_MAX_CMD_LEN]; /* FIXME: Wrong *MAX*... */
+ int env_list_len = 0;
+ char *p = NULL;
+ struct HYD_Partition *partition=NULL;
+ struct HYD_Partition_exec *exec=NULL;
+ struct HYD_Env *env=NULL;
+ int HYD_PMCD_pmi_proxy_connfd=-1;
+
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ status = HYDU_sock_connect(partition->name, handle.pproxy_port,
+ &HYD_PMCD_pmi_proxy_connfd);
+ HYDU_ERR_POP(status, "Error connecting to remote proxy");
+
+ exec = partition->exec_list;
+
+ /* FIXME: Create a util func for converting env list to a string */
+ env = handle.system_env;
+ p = env_list;
+ *p = '\0';
+ env_list_len = HYD_PMCD_MAX_CMD_LEN;
+ while(env){
+ HYDU_snprintf(p, env_list_len,"%s=%s %c",
+ env->env_name, env->env_value, HYD_PMCD_CMD_ENV_SEP_CHAR);
+ env_list_len -= strlen(p);
+ p += strlen(p);
+ env = env->next;
+ }
+ /* Create launch command */
+ HYDU_snprintf(launch_procs_cmd, HYD_PMCD_MAX_CMD_LEN,
+ "%s=%s %c %s=%s %c %s=%d %c %s=%s %c\n",
+ "cmd", HYD_PMCD_CMD_LAUNCH_PROCS, HYD_PMCD_CMD_SEP_CHAR,
+ "exec_name", exec->exec[0], HYD_PMCD_CMD_SEP_CHAR,
+ "exec_cnt", exec->proc_count, HYD_PMCD_CMD_SEP_CHAR,
+ "env", env_list, HYD_PMCD_CMD_SEP_CHAR);
+
+ status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, launch_procs_cmd,
+ strlen(launch_procs_cmd));
+ HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+
+ /* FIXME: Read result */
+ partition->out = HYD_PMCD_pmi_proxy_connfd;
+ partition->err = -1;
+ }
+
+ fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
+/*
+ * HYD_PMCI_launch_procs: Here are the steps we follow:
+ *
+ * 1. Find what all ports the user wants to allow and listen on one of
+ * those ports.
+ *
+ * 2. Create a call-back function to accept connections and register
+ * the listening socket with the demux engine.
+ *
+ * 3. Create a port string out of this hostname and port and add it to
+ * the environment list under the variable "PMI_PORT".
+ *
+ * 4. Create an environment variable for PMI_ID. This is an
+ * auto-incrementing variable; the bootstrap server will take care of
+ * adding the process ID to the start value.
+ *
+ * 5. Create a process info setup and ask the bootstrap server to
+ * launch the processes.
+ */
+HYD_Status HYD_PMCI_launch_procs(void)
+{
+ char *port_range, *port_str, *sport;
+ uint16_t port;
+ char hostname[MAX_HOSTNAME_LEN];
+ HYD_Env_t *env;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYDU_set_common_signals(HYD_PMCD_pmi_serv_signal_cb);
+ HYDU_ERR_POP(status, "unable to set signal\n");
+
+ /* Check if the user wants us to use a port within a certain
+ * range. */
+ port_range = getenv("MPIEXEC_PORTRANGE");
+ if (!port_range)
+ port_range = getenv("MPIEXEC_PORT_RANGE");
+ if (!port_range)
+ port_range = getenv("MPICH_PORT_RANGE");
+
+ /* Listen on a port in the port range */
+ port = 0;
+ status = HYDU_sock_listen(&HYD_PMCD_pmi_serv_listenfd, port_range, &port);
+ HYDU_ERR_POP(status, "unable to listen on port\n");
+
+ /* Register the listening socket with the demux engine */
+ status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_serv_listenfd, HYD_STDOUT, NULL,
+ HYD_PMCD_pmi_serv_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+
+ /* Create a port string for MPI processes to use to connect to */
+ if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0)
+ HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR,
+ "gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
+
+ sport = HYDU_int_to_str(port);
+
+ HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
+ HYDU_snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
+ "%s:%s", hostname, sport);
+ HYDU_FREE(sport);
+ HYDU_Debug("Process manager listening on PMI port %s\n", port_str);
+
+ status = HYDU_env_create(&env, "PMI_PORT", port_str);
+ HYDU_ERR_POP(status, "unable to create env\n");
+
+ status = HYDU_append_env_to_list(*env, &handle.system_env);
+ HYDU_ERR_POP(status, "unable to add env to list\n");
+
+ HYDU_env_free(env);
+ HYDU_FREE(port_str);
+
+ /* Create a process group for the MPI processes in this
+ * comm_world */
+ status = HYD_PMCD_pmi_create_pg();
+ HYDU_ERR_POP(status, "unable to create process group\n");
+
+ if(handle.is_proxy_remote) {
+ if(handle.is_proxy_terminator) {
+ status = shutdown_remote_proxies();
+ HYDU_ERR_POP(status, "Error shutting down remote proxies\n");
+ }
+ else{
+ status = launch_procs_with_remote_proxy();
+ HYDU_ERR_POP(status, "Error launching procs with remote proxy\n");
+ }
+ }
+ else {
+ status = launch_procs_with_local_proxy();
+ HYDU_ERR_POP(status, "Error launching procs with local proxy\n");
+ }
+
+ fn_exit:
HYDU_FUNC_EXIT();
return status;
@@ -208,11 +323,16 @@
HYDU_FUNC_ENTER();
- status = HYD_BSCI_wait_for_completion();
- if (status != HYD_SUCCESS) {
- status = HYD_PMCD_pmi_serv_cleanup();
- HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+ if(handle.is_proxy_remote) {
+ status = HYD_SUCCESS;
}
+ else{
+ status = HYD_BSCI_wait_for_completion();
+ if (status != HYD_SUCCESS) {
+ status = HYD_PMCD_pmi_serv_cleanup();
+ HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+ }
+ }
fn_exit:
HYDU_FUNC_EXIT();
Modified: mpich2/trunk/src/pm/hydra/utils/env/env.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/env/env.c 2009-03-27 18:02:56 UTC (rev 4212)
+++ mpich2/trunk/src/pm/hydra/utils/env/env.c 2009-03-27 18:11:52 UTC (rev 4213)
@@ -100,7 +100,7 @@
env = env_list;
while (env) {
status = env_to_str(env, &str_list[i++]);
- HYDU_ERR_POP(status, "env_to_str returned error\n");
+ HYDU_ERR_POP(status, "HYDU_env_to_str returned error\n");
env = env->next;
}
str_list[i++] = NULL;
More information about the mpich2-commits
mailing list