[mpich2-commits] r4222 - in mpich2/trunk/src/pm/hydra: . bootstrap/fork bootstrap/slurm bootstrap/ssh control/consys include launcher/mpiexec launcher/utils pm/pmiserv utils/launch utils/sock
balaji at mcs.anl.gov
balaji at mcs.anl.gov
Sun Mar 29 22:57:55 CDT 2009
Author: balaji
Date: 2009-03-29 22:57:55 -0500 (Sun, 29 Mar 2009)
New Revision: 4222
Added:
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h
Removed:
mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
Modified:
mpich2/trunk/src/pm/hydra/README
mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm
mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c
mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
mpich2/trunk/src/pm/hydra/include/hydra.h
mpich2/trunk/src/pm/hydra/include/hydra_base.h
mpich2/trunk/src/pm/hydra/include/hydra_utils.h
mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
mpich2/trunk/src/pm/hydra/utils/launch/allocate.c
mpich2/trunk/src/pm/hydra/utils/sock/sock.c
Log:
Merge the code paths for the runtime and persistent launch modes so
that we don't need to maintain duplicate code.
Modified: mpich2/trunk/src/pm/hydra/README
===================================================================
--- mpich2/trunk/src/pm/hydra/README 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/README 2009-03-30 03:57:55 UTC (rev 4222)
@@ -198,3 +198,24 @@
enable it, you should use the option --enable-x to mpiexec.
$ mpiexec --enable-x -f hosts -n 4 ./app
+
+
+Persistent-mode Proxies
+-----------------------
+
+Hydra also supports proxies to be launched in persistent mode on the
+system (e.g., by a system administrator). To launch in persistent
+mode, use:
+
+ $ mpiexec --boot-proxies
+
+ $ mpiexec --use-persistent -n 4 ./app1
+
+ $ mpiexec --use-persistent -n 4 ./app2
+
+ $ mpiexec --use-persistent -n 4 ./app3
+
+ $ mpiexec --shutdown-proxies
+
+Persistent mode can also be picked using the environment setting
+HYDRA_LAUNCH_MODE=persistent.
Modified: mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/bootstrap/fork/fork_launch.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -43,10 +43,6 @@
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
- /* For the remaining processes, set the stdin fd to -1 */
- if (process_id != 0)
- handle.in = -1;
-
process_id++;
}
Modified: mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/bootstrap/slurm/slurm_launch.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -54,10 +54,6 @@
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
- /* For the remaining processes, set the stdin fd to -1 */
- if (process_id != 0)
- handle.in = -1;
-
process_id++;
}
Modified: mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -63,10 +63,6 @@
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
- /* For the remaining processes, set the stdin fd to -1 */
- if (process_id != 0)
- handle.in = -1;
-
process_id++;
}
Modified: mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/Makefile.sm 2009-03-30 03:57:55 UTC (rev 4222)
@@ -7,7 +7,7 @@
HYDRA_LIB_PATH = ../../lib
libhydra_a_DIR = ${HYDRA_LIB_PATH}
-libhydra_a_SOURCES = consys_launch.c consys_wait.c consys_close.c consys_finalize.c
+libhydra_a_SOURCES = consys_launch.c consys_wait.c consys_finalize.c
INCLUDES = -I${abs_srcdir}/../../include \
-I${abs_srcdir}/../../../../include \
-I../../include \
@@ -15,5 +15,4 @@
-I${abs_srcdir}/../../launcher/utils \
-I${abs_srcdir}/../include \
-I${abs_srcdir}/../utils \
- -I${abs_srcdir}/../../pm/include \
- -I${abs_srcdir}/../../demux
+ -I${abs_srcdir}/../../pm/include
Deleted: mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_close.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_close.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -1,45 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "hydra_utils.h"
-#include "csi.h"
-#include "pmci.h"
-#include "demux.h"
-
-extern HYD_Handle handle;
-
-HYD_Status HYD_CSI_close_fd(int fd)
-{
- struct HYD_Partition *partition;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- /* Deregister the FD with the demux engine and close it. */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_SETANDJUMP1(status, status, "error deregistering fd %d\n", fd);
- close(fd);
-
- /* Find the FD in the handle and remove it. */
- for (partition = handle.partition_list; partition; partition = partition->next) {
- if (partition->out == fd) {
- partition->out = -1;
- goto fn_exit;
- }
- if (partition->err == fd) {
- partition->err = -1;
- goto fn_exit;
- }
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_finalize.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -7,7 +7,6 @@
#include "hydra.h"
#include "csi.h"
#include "pmci.h"
-#include "demux.h"
HYD_Status HYD_CSI_finalize(void)
{
@@ -18,9 +17,6 @@
status = HYD_PMCI_finalize();
HYDU_ERR_POP(status, "error returned from PM finalize\n");
- status = HYD_DMX_finalize();
- HYDU_ERR_POP(status, "error returned from demux finalize\n");
-
fn_exit:
HYDU_FUNC_EXIT();
return status;
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,7 +8,6 @@
#include "hydra_utils.h"
#include "csi.h"
#include "pmci.h"
-#include "demux.h"
extern HYD_Handle handle;
@@ -23,32 +22,6 @@
status = HYD_PMCI_launch_procs();
HYDU_ERR_POP(status, "PM returned error while launching processes\n");
- for (partition = handle.partition_list; partition; partition = partition->next) {
- status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, NULL, handle.stdout_cb);
- HYDU_ERR_POP(status, "demux returned error registering fd\n");
-
- if (partition->err != -1) {
- status =
- HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, NULL, handle.stderr_cb);
- HYDU_ERR_POP(status, "demux returned error registering fd\n");
- }
- }
-
- if (handle.in != -1) { /* Only process_id 0 */
- status = HYDU_sock_set_nonblock(handle.in);
- HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
- stdin_fd = 0;
- status = HYDU_sock_set_nonblock(stdin_fd);
- HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
- handle.stdin_buf_count = 0;
- handle.stdin_buf_offset = 0;
-
- status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, handle.stdin_cb);
- HYDU_ERR_POP(status, "demux returned error registering fd\n");
- }
-
fn_exit:
HYDU_FUNC_EXIT();
return status;
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,7 +8,6 @@
#include "csi.h"
#include "csiu.h"
#include "pmci.h"
-#include "demux.h"
extern HYD_Handle handle;
@@ -20,33 +19,11 @@
HYDU_FUNC_ENTER();
- while (1) {
- /* Wait for some event to occur */
- status = HYD_DMX_wait_for_event(HYDU_time_left(handle.start, handle.timeout));
- HYDU_ERR_POP(status, "error waiting for event\n");
+ /* Make sure all the processes have terminated. The process
+ * manager control device will take care of that. */
+ status = HYD_PMCI_wait_for_completion();
+ HYDU_ERR_POP(status, "error waiting for completion\n");
- /* Check to see if there's any open read socket left; if there
- * are, we will just wait for more events. */
- sockets_open = 0;
- for (partition = handle.partition_list; partition; partition = partition->next) {
- if (partition->out != -1 || partition->err != -1) {
- sockets_open++;
- break;
- }
- }
-
- if (sockets_open && HYDU_time_left(handle.start, handle.timeout))
- continue;
-
- /* Make sure all the processes have terminated. The process
- * manager control device will take care of that. */
- status = HYD_PMCI_wait_for_completion();
- HYDU_ERR_POP(status, "error waiting for completion\n");
-
- /* We are done */
- break;
- }
-
fn_exit:
HYDU_FUNC_EXIT();
return status;
Modified: mpich2/trunk/src/pm/hydra/include/hydra.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra.h 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/include/hydra.h 2009-03-30 03:57:55 UTC (rev 4222)
@@ -14,16 +14,9 @@
struct HYD_Handle_ {
char *base_path;
int proxy_port;
- /* The persistent proxy is different from the centralized proxy
- * and hence needs its own port - pproxy_port */
- int pproxy_port;
+ HYD_Launch_mode_t launch_mode;
+
char *bootstrap;
- /* FIXME: We should define a proxy type instead of all these
- * flags... proxy_type = PROXY_LAUNCHER | PROXY_TERMINATOR
- */
- int is_proxy_launcher;
- int is_proxy_terminator;
- int is_proxy_remote;
HYD_Binding binding;
char *user_bind_map;
@@ -66,7 +59,4 @@
extern HYD_Handle handle;
-#define HYD_PROXY_NAME "pmi_proxy"
-#define HYD_PPROXY_PORT 8677
-
#endif /* HYDRA_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/include/hydra_base.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_base.h 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/include/hydra_base.h 2009-03-30 03:57:55 UTC (rev 4222)
@@ -80,6 +80,18 @@
} HYD_Status;
+/* Proxy type */
+typedef enum {
+ HYD_LAUNCH_UNSET,
+ HYD_LAUNCH_RUNTIME,
+
+ /* For persistent proxies */
+ HYD_LAUNCH_BOOT,
+ HYD_LAUNCH_SHUTDOWN,
+ HYD_LAUNCH_PERSISTENT
+} HYD_Launch_mode_t;
+
+
/* Environment information */
typedef struct HYD_Env {
char *env_name;
@@ -139,6 +151,7 @@
int out;
int err;
int exit_status;
+ int control_fd;
char *proxy_args[HYD_NUM_TMP_STRINGS]; /* Full argument list */
struct HYD_Partition *next;
Modified: mpich2/trunk/src/pm/hydra/include/hydra_utils.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_utils.h 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/include/hydra_utils.h 2009-03-30 03:57:55 UTC (rev 4222)
@@ -217,8 +217,8 @@
HYD_Status HYDU_sock_set_nonblock(int fd);
HYD_Status HYDU_sock_set_cloexec(int fd);
HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed);
-HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, char *buf, int *buf_count,
- int *buf_offset, int *closed);
+HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, int stdin_fd, char *buf,
+ int *buf_count, int *buf_offset, int *closed);
/* Memory utilities */
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -11,6 +11,39 @@
extern HYD_Handle handle;
+static HYD_Status close_fd(int fd)
+{
+ struct HYD_Partition *partition;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Deregister the FD with the demux engine and close it. */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_SETANDJUMP1(status, status, "error deregistering fd %d\n", fd);
+ close(fd);
+
+ /* Find the FD in the handle and remove it. */
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ if (partition->out == fd) {
+ partition->out = -1;
+ goto fn_exit;
+ }
+ if (partition->err == fd) {
+ partition->err = -1;
+ goto fn_exit;
+ }
+ }
+
+fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+fn_fail:
+ goto fn_exit;
+}
+
+
HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
@@ -24,7 +57,7 @@
fd, HYDU_strerror(errno));
if (closed) {
- status = HYD_CSI_close_fd(fd);
+ status = close_fd(fd);
HYDU_ERR_SETANDJUMP2(status, status, "socket close error on fd %d: %s\n",
fd, HYDU_strerror(errno));
goto fn_exit;
@@ -51,7 +84,7 @@
fd, HYDU_strerror(errno))
if (closed) {
- status = HYD_CSI_close_fd(fd);
+ status = close_fd(fd);
HYDU_ERR_SETANDJUMP2(status, status, "socket close error on fd %d (%s)\n",
fd, HYDU_strerror(errno));
goto fn_exit;
@@ -73,12 +106,12 @@
HYDU_FUNC_ENTER();
- status = HYDU_sock_stdin_cb(handle.in, events, handle.stdin_tmp_buf,
+ status = HYDU_sock_stdin_cb(handle.in, events, 0, handle.stdin_tmp_buf,
&handle.stdin_buf_count, &handle.stdin_buf_offset, &closed);
HYDU_ERR_POP(status, "stdin callback error\n");
if (closed) {
- status = HYD_CSI_close_fd(fd);
+ status = close_fd(fd);
HYDU_ERR_SETANDJUMP2(status, status, "socket close error on fd %d (errno: %d)\n",
fd, errno);
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -18,13 +18,6 @@
printf("Usage: ./mpiexec [global opts] [exec1 local opts] : [exec2 local opts] : ...\n\n");
printf("Global Options (passed to all executables):\n");
- printf("\t--verbose [Verbose mode]\n");
- printf("\t--version [Version information]\n");
- printf("\t--enable-x/--disable-x [Enable or disable X forwarding]\n");
- printf("\t--proxy-port [Port on which proxies can listen]\n");
- printf("\t--bootstrap [Bootstrap server to use]\n");
- printf("\t--binding [Process binding]");
-
printf("\t-genv {name} {value} [Environment variable name and value]\n");
printf("\t-genvlist {env1,env2,...} [Environment variable list to pass]\n");
printf("\t-genvnone [Do not pass any environment variables]\n");
@@ -40,10 +33,20 @@
printf("\t-envlist {env1,env2,...} [Environment variable list to pass]\n");
printf("\t-envnone [Do not pass any environment variables]\n");
printf("\t-envall [Pass all environment variables (default)]\n");
- printf
- ("\t{exec_name} {args} [Name of the executable to run and its arguments]\n");
+ printf("\t{exec_name} {args} [Executable name to run and arguments]\n");
printf("\n");
+
+ printf("Hydra specific options (treated as global):\n");
+ printf("\t--verbose [Verbose mode]\n");
+ printf("\t--version [Version information]\n");
+ printf("\t--enable-x/--disable-x [Enable or disable X forwarding]\n");
+ printf("\t--proxy-port [Port on which proxies can listen]\n");
+ printf("\t--bootstrap [Bootstrap server to use]\n");
+ printf("\t--binding [Process binding]");
+ printf("\t--boot-proxies [Boot proxies to run in persistent mode]\n");
+ printf("\t--shutdown-proxies [Shutdown persistent mode proxies]\n");
+ printf("\t--use-persistent [Use persistent mode proxies to launch]\n");
}
@@ -51,7 +54,7 @@
{
struct HYD_Partition *partition;
int exit_status = 0;
- int timeout;
+ int timeout, stdin_fd;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
@@ -79,10 +82,6 @@
if (handle.debug)
HYD_LCHU_print_params();
- handle.stdout_cb = HYD_LCHI_stdout_cb;
- handle.stderr_cb = HYD_LCHI_stderr_cb;
- handle.stdin_cb = HYD_LCHI_stdin_cb;
-
HYDU_time_set(&handle.start, NULL); /* NULL implies right now */
if (getenv("MPIEXEC_TIMEOUT"))
timeout = atoi(getenv("MPIEXEC_TIMEOUT"));
@@ -95,6 +94,44 @@
status = HYD_CSI_launch_procs();
HYDU_ERR_POP(status, "control system error launching processes\n");
+ /* During shutdown, no processes are launched, so there is nothing
+ * to wait for. If the launch command didn't return an error, we
+ * are OK; just return a success. */
+ /* FIXME: We are assuming a working model for the process manager
+ * here. We need to get how many processes the PM has launched
+ * instead of assuming this. For example, it is possible to have a
+ * PM implementation that launches separate "new" proxies on a
+ * different port and kills the original proxies using them. */
+ if (handle.launch_mode == HYD_LAUNCH_SHUTDOWN) {
+ exit_status = 0;
+ goto fn_exit;
+ }
+
+ /* Setup stdout/stderr/stdin handlers */
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, NULL,
+ HYD_LCHI_stdout_cb);
+ HYDU_ERR_POP(status, "demux returned error registering fd\n");
+
+ status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, NULL,
+ HYD_LCHI_stderr_cb);
+ HYDU_ERR_POP(status, "demux returned error registering fd\n");
+ }
+
+ status = HYDU_sock_set_nonblock(handle.in);
+ HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+ stdin_fd = 0;
+ status = HYDU_sock_set_nonblock(stdin_fd);
+ HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+ handle.stdin_buf_count = 0;
+ handle.stdin_buf_offset = 0;
+
+ status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, HYD_LCHI_stdin_cb);
+ HYDU_ERR_POP(status, "demux returned error registering fd\n");
+
+
/* Wait for their completion */
status = HYD_CSI_wait_for_completion();
HYDU_ERR_POP(status, "control system error waiting for completion\n");
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -70,27 +70,28 @@
}
if (!strcmp(*argv, "--boot-proxies")) {
- /* FIXME: Prevent usage of incompatible params */
- handle.bootstrap = HYDU_strdup("ssh");
- handle.is_proxy_launcher = 1;
- handle.prop = HYD_ENV_PROP_ALL;
+ HYDU_ERR_CHKANDJUMP(status, handle.launch_mode != HYD_LAUNCH_UNSET,
+ HYD_INTERNAL_ERROR, "duplicate launch mode\n");
+ handle.launch_mode = HYD_LAUNCH_BOOT;
continue;
}
- if (!strcmp(*argv, "--remote-proxy")) {
- /* FIXME: We should get rid of this option eventually.
- * This should be the default case. The centralized
- * version should use an option like "--local-proxy"
- */
- handle.is_proxy_remote = 1;
- handle.prop = HYD_ENV_PROP_ALL;
+ if (!strcmp(*argv, "--shutdown-proxies")) {
+ HYDU_ERR_CHKANDJUMP(status, handle.launch_mode != HYD_LAUNCH_UNSET,
+ HYD_INTERNAL_ERROR, "duplicate launch mode\n");
+ handle.launch_mode = HYD_LAUNCH_SHUTDOWN;
continue;
}
- if (!strcmp(*argv, "--shutdown-proxies")) {
- handle.is_proxy_remote = 1;
- handle.is_proxy_terminator = 1;
- handle.prop = HYD_ENV_PROP_ALL;
+ if (!strcmp(*argv, "--use-persistent") || !strcmp(*argv, "--use-runtime")) {
+ HYDU_ERR_CHKANDJUMP(status, handle.launch_mode != HYD_LAUNCH_UNSET,
+ HYD_INTERNAL_ERROR, "duplicate launch mode\n");
+
+ if (!strcmp(*argv, "--use-persistent"))
+ handle.launch_mode = HYD_LAUNCH_PERSISTENT;
+ else
+ handle.launch_mode = HYD_LAUNCH_RUNTIME;
+
continue;
}
@@ -262,17 +263,6 @@
continue;
}
- if (!strcmp(str[0], "--pproxy-port")) {
- if (!str[1]) {
- INCREMENT_ARGV(status);
- str[1] = *argv;
- }
- HYDU_ERR_CHKANDJUMP(status, handle.pproxy_port != -1, HYD_INTERNAL_ERROR,
- "duplicate persistent proxy port\n");
- handle.pproxy_port = atoi(str[1]);
- continue;
- }
-
if (*argv[0] == '-')
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "unrecognized argument\n");
@@ -299,46 +289,15 @@
break;
continue;
-
}
- /* In the case of the proxy launcher, aka --boot-proxies, there is no executable
- * specified */
- if (handle.is_proxy_launcher || handle.is_proxy_terminator) {
- status = HYD_LCHU_get_current_exec_info(&exec_info);
- HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
-
- exec_info->exec[0] = HYDU_strdup(HYD_PROXY_NAME " --persistent");
- exec_info->exec[1] = NULL;
- if (exec_info->exec_proc_count == 0)
- exec_info->exec_proc_count = 1;
-
- env_name = HYDU_strdup("HYD_PROXY_PORT");
- env_value = HYDU_int_to_str(handle.pproxy_port);
-
- status = HYDU_env_create(&env, env_name, env_value);
- HYDU_ERR_POP(status, "unable to create env struct\n");
-
- HYDU_append_env_to_list(*env, &exec_info->user_env);
- }
-
-
+ /* First set all the variables that do not depend on the launch mode */
tmp = getenv("MPIEXEC_DEBUG");
if (handle.debug == -1 && tmp)
handle.debug = atoi(tmp) ? 1 : 0;
if (handle.debug == -1)
handle.debug = 0;
- if (handle.exec_info_list == NULL)
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no local options set\n");
-
- if (handle.wdir == NULL) {
- HYDU_MALLOC(handle.wdir, char *, HYDRA_MAX_PATH, status);
- if (getcwd(handle.wdir, HYDRA_MAX_PATH) == NULL)
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
- "allocated space is too small for absolute path\n");
- }
-
tmp = getenv("HYDRA_BOOTSTRAP");
if (handle.bootstrap == NULL && tmp)
handle.bootstrap = HYDU_strdup(tmp);
@@ -351,43 +310,82 @@
if (handle.host_file == NULL)
handle.host_file = HYDU_strdup("HYDRA_USE_LOCALHOST");
+ if (handle.proxy_port == -1)
+ handle.proxy_port = HYD_DEFAULT_PROXY_PORT;
+
+ tmp = getenv("HYDRA_LAUNCH_MODE");
+ if (handle.launch_mode == HYD_LAUNCH_UNSET && tmp) {
+ if (!strcmp(tmp, "persistent"))
+ handle.launch_mode = HYD_LAUNCH_PERSISTENT;
+ else if (!strcmp(tmp, "runtime"))
+ handle.launch_mode = HYD_LAUNCH_RUNTIME;
+ }
+ if (handle.launch_mode == HYD_LAUNCH_UNSET)
+ handle.launch_mode = HYD_LAUNCH_RUNTIME;
+
+ /* Get the base path for the proxy */
+ if (handle.wdir == NULL) {
+ HYDU_MALLOC(handle.wdir, char *, HYDRA_MAX_PATH, status);
+ if (getcwd(handle.wdir, HYDRA_MAX_PATH) == NULL)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+ "allocated space is too small for absolute path\n");
+ }
status = HYDU_get_base_path(progname, handle.wdir, &handle.base_path);
HYDU_ERR_POP(status, "unable to get base path\n");
- tmp = getenv("HYDRA_BINDING");
- if (handle.binding == HYD_BIND_UNSET && tmp)
- handle.binding = !strcmp(tmp, "none") ? HYD_BIND_NONE :
- !strcmp(tmp, "rr") ? HYD_BIND_RR :
- !strcmp(tmp, "buddy") ? HYD_BIND_BUDDY :
- !strcmp(tmp, "pack") ? HYD_BIND_PACK : HYD_BIND_USER;
- if (handle.binding == HYD_BIND_UNSET)
- handle.binding = HYD_BIND_NONE;
+ /* Proxy setup or teardown */
+ if ((handle.launch_mode == HYD_LAUNCH_BOOT) ||
+ (handle.launch_mode == HYD_LAUNCH_SHUTDOWN)) {
- /* Check environment for setting the global environment */
- tmp = getenv("HYDRA_ENV");
- if (handle.prop == HYD_ENV_PROP_UNSET && tmp)
- handle.prop = !strcmp(tmp, "all") ? HYD_ENV_PROP_ALL : HYD_ENV_PROP_NONE;
+ /* NULL out variables we don't care about */
+ HYDU_ERR_CHKANDJUMP(status, handle.prop != HYD_ENV_PROP_UNSET, HYD_INTERNAL_ERROR,
+ "environment setting in proxy launch mode\n");
+ handle.prop = HYD_ENV_PROP_NONE;
- /* Make sure local executable is set */
- local_env_set = 0;
- for (exec_info = handle.exec_info_list; exec_info; exec_info = exec_info->next) {
- if (exec_info->exec[0] == NULL)
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no executable specified\n");
+ HYDU_ERR_CHKANDJUMP(status, handle.binding != HYD_BIND_UNSET, HYD_INTERNAL_ERROR,
+ "binding setting in proxy launch mode\n");
+ handle.binding = HYD_BIND_UNSET;
- if (exec_info->exec_proc_count == 0)
- exec_info->exec_proc_count = 1;
-
- if (exec_info->prop != HYD_ENV_PROP_UNSET)
- local_env_set = 1;
+ HYDU_ERR_CHKANDJUMP(status, handle.exec_info_list, HYD_INTERNAL_ERROR,
+ "executables specified in proxy launch mode\n");
}
+ else { /* Application launch */
+ if (handle.exec_info_list == NULL)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no local options set\n");
- /* If no global or local environment is set, use the default */
- if ((handle.prop == HYD_ENV_PROP_UNSET) && (local_env_set == 0))
- handle.prop = HYD_ENV_PROP_ALL;
+ /* Check environment for setting binding */
+ tmp = getenv("HYDRA_BINDING");
+ if (handle.binding == HYD_BIND_UNSET && tmp)
+ handle.binding = !strcmp(tmp, "none") ? HYD_BIND_NONE :
+ !strcmp(tmp, "rr") ? HYD_BIND_RR :
+ !strcmp(tmp, "buddy") ? HYD_BIND_BUDDY :
+ !strcmp(tmp, "pack") ? HYD_BIND_PACK : HYD_BIND_USER;
+ if (handle.binding == HYD_BIND_UNSET)
+ handle.binding = HYD_BIND_NONE;
- if (handle.proxy_port == -1)
- handle.proxy_port = HYD_DEFAULT_PROXY_PORT;
+ /* Check environment for setting the global environment */
+ tmp = getenv("HYDRA_ENV");
+ if (handle.prop == HYD_ENV_PROP_UNSET && tmp)
+ handle.prop = !strcmp(tmp, "all") ? HYD_ENV_PROP_ALL : HYD_ENV_PROP_NONE;
+ /* Make sure local executable is set */
+ local_env_set = 0;
+ for (exec_info = handle.exec_info_list; exec_info; exec_info = exec_info->next) {
+ if (exec_info->exec[0] == NULL)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "no executable specified\n");
+
+ if (exec_info->exec_proc_count == 0)
+ exec_info->exec_proc_count = 1;
+
+ if (exec_info->prop != HYD_ENV_PROP_UNSET)
+ local_env_set = 1;
+ }
+
+ /* If no global or local environment is set, use the default */
+ if ((handle.prop == HYD_ENV_PROP_UNSET) && (local_env_set == 0))
+ handle.prop = HYD_ENV_PROP_ALL;
+ }
+
fn_exit:
for (i = 0; i < 4; i++)
if (str[i])
Modified: mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -14,11 +14,9 @@
{
handle.base_path = NULL;
handle.proxy_port = -1;
- handle.pproxy_port = HYD_PPROXY_PORT;
+ handle.launch_mode = HYD_LAUNCH_UNSET;
+
handle.bootstrap = NULL;
- handle.is_proxy_launcher = 0;
- handle.is_proxy_terminator = 0;
- handle.is_proxy_remote = 0;
handle.binding = HYD_BIND_UNSET;
handle.user_bind_map = NULL;
Added: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_common.h 2009-03-30 03:57:55 UTC (rev 4222)
@@ -0,0 +1,20 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#ifndef PMI_COMMON_H_INCLUDED
+#define PMI_COMMON_H_INCLUDED
+
+/* The set of commands supported */
+enum HYD_PMCD_pmi_proxy_cmds {
+ PROC_INFO,
+ KILL_JOB,
+ PROXY_SHUTDOWN,
+ USE_AS_STDOUT,
+ USE_AS_STDERR,
+ USE_AS_STDIN
+};
+
+#endif /* PMI_COMMON_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,7 +8,7 @@
#include "pmi_handle.h"
#include "pmi_handle_v1.h"
-extern HYD_Handle handle;
+HYD_Handle handle;
HYD_PMCD_pmi_pg_t *pg_list = NULL;
struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_v1;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v1.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -11,7 +11,7 @@
#include "pmi_handle.h"
#include "pmi_handle_v1.h"
-extern HYD_Handle handle;
+HYD_Handle handle;
HYD_PMCD_pmi_pg_t *pg_list;
/* TODO: abort, create_kvs, destroy_kvs, getbyidx, spawn */
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -12,97 +12,96 @@
struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
int HYD_PMCD_pmi_proxy_listenfd;
-static HYD_Status HYD_PMCD_pmi_pproxy_start(void)
+static HYD_Status wait_for_procs_to_finish(void)
{
- /* If this function exits... its always an error */
- HYD_Status status = HYD_INTERNAL_ERROR;
- int ret = 0;
- pid_t proc_id = -1;
- struct rlimit rl;
+ int i, out_count, err_count, count, pid, ret_status;
+ HYD_Status status = HYD_SUCCESS;
- umask(0);
+ while (1) {
+ /* Wait for some event to occur */
+ status = HYD_DMX_wait_for_event(-1);
+ HYDU_ERR_POP(status, "demux engine error waiting for event\n");
- /* Get the limit of fds */
- ret = getrlimit(RLIMIT_NOFILE, &rl);
- if (ret == -1)
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "getrlimit() failed (%s)\n",
- HYDU_strerror(errno));
+ /* Check to see if there's any open read socket left; if there
+ * are, we will just wait for more events. */
+ out_count = 0;
+ err_count = 0;
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
+ if (HYD_PMCD_pmi_proxy_params.out[i] != -1)
+ out_count++;
+ if (HYD_PMCD_pmi_proxy_params.err[i] != -1)
+ err_count++;
- proc_id = fork();
- if (proc_id > 0) {
- /* Ignore exit from child proc - persistent pmi proxy */
- status = HYDU_set_signal(SIGCHLD, SIG_IGN);
- HYDU_ERR_POP(status, "Setting SIGCHLD handler to SIG_IGN failed\n");
+ if (out_count && err_count)
+ break;
+ }
- /* Parent process exits */
- if (!HYD_PMCD_pmi_proxy_params.debug)
- exit(0);
+ if (HYD_PMCD_pmi_proxy_params.procs_are_launched) {
+ if (out_count == 0)
+ close(HYD_PMCD_pmi_proxy_params.out_upstream_fd);
+
+ if (err_count == 0)
+ close(HYD_PMCD_pmi_proxy_params.err_upstream_fd);
+
+ /* We are done */
+ if (!out_count && !err_count)
+ break;
+ }
}
- else if (proc_id == 0) {
- /* Child proc continues */
- int i;
- pid_t spid;
- spid = setsid();
- if (spid == -1)
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "setsid() failed(%s)\n",
- HYDU_strerror(errno));
- if (!HYD_PMCD_pmi_proxy_params.debug)
- for (i = 0; i < rl.rlim_max; i++)
- close(i);
- /* FIXME: dup(0,1,2) to "/dev/null" */
+ /* FIXME: If we did not break out yet, add a small usleep to yield
+ * CPU here. We can not just sleep for the remaining time, as the
+ * timeout value might be large and the application might exit
+ * much quicker. Note that the sched_yield() call is broken on
+ * newer linux kernel versions and should not be used. */
+ /* Once all the sockets are closed, wait for all the processes to
+ * finish. We poll here, but hopefully not for too long. */
+ do {
+ if (HYD_PMCD_pmi_proxy_params.procs_are_launched == 0)
+ break;
- if (getenv("HYD_PROXY_PORT"))
- HYD_PMCD_pmi_proxy_params.proxy_port = atoi(getenv("HYD_PROXY_PORT"));
- else
- HYD_PMCD_pmi_proxy_params.proxy_port = -1;
+ pid = waitpid(-1, &ret_status, WNOHANG);
- status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
- (uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
- HYDU_ERR_POP(status, "unable to listen on socket\n");
+ /* Find the pid and mark it as complete. */
+ if (pid > 0)
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+ if (HYD_PMCD_pmi_proxy_params.pid[i] == pid)
+ HYD_PMCD_pmi_proxy_params.exit_status[i] = WEXITSTATUS(ret_status);
- /* Register the listening socket with the demux engine */
- status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
- HYD_PMCD_pmi_proxy_listen_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
- }
- else {
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "fork() failed (%s) \n",
- HYDU_strerror(errno));
- }
+ /* Check how many more processes are pending */
+ count = 0;
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
+ if (HYD_PMCD_pmi_proxy_params.exit_status[i] == -1) {
+ count++;
+ break;
+ }
+ }
+ if (count == 0)
+ break;
- while (1) {
- status = HYD_DMX_wait_for_event(-1);
+ /* Check if there are any messages from the launcher */
+ status = HYD_DMX_wait_for_event(0);
HYDU_ERR_POP(status, "demux engine error waiting for event\n");
- }
+ } while (1);
fn_exit:
return status;
+
fn_fail:
goto fn_exit;
}
int main(int argc, char **argv)
{
- int i, j, arg, count, pid, ret_status;
- int stdin_fd, process_id, core, pmi_id, rem;
- char *str;
- char *client_args[HYD_NUM_TMP_STRINGS];
- HYD_Env_t *env;
+ int i, ret_status;
struct HYD_Partition_exec *exec;
struct HYD_Partition_segment *segment;
HYD_Status status = HYD_SUCCESS;
- status = HYD_PMCD_pmi_proxy_get_params(argc, argv);
+ status = HYD_PMCD_pmi_proxy_get_params(argv);
HYDU_ERR_POP(status, "bad parameters passed to the proxy\n");
- if (HYD_PMCD_pmi_proxy_params.is_persistent) {
- status = HYD_PMCD_pmi_pproxy_start();
- HYDU_ERR_POP(status, "Error starting persistent PMI proxy\n");
- goto fn_exit;
- }
-
status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
(uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
HYDU_ERR_POP(status, "unable to listen on socket\n");
@@ -120,172 +119,54 @@
* local processes. That is, we can only have a single-level
* hierarchy of proxies. */
- HYD_PMCD_pmi_proxy_params.partition_proc_count = 0;
- for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment; segment = segment->next)
- HYD_PMCD_pmi_proxy_params.partition_proc_count += segment->proc_count;
+ /* Process launching only happens in the runtime case over here */
+ if (HYD_PMCD_pmi_proxy_params.proxy_type == HYD_PMCD_PMI_PROXY_RUNTIME) {
+ HYD_PMCD_pmi_proxy_params.out_upstream_fd = 1;
+ HYD_PMCD_pmi_proxy_params.err_upstream_fd = 2;
+ HYD_PMCD_pmi_proxy_params.in_upstream_fd = 0;
- HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
- for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
- HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
+ status = HYD_PMCD_pmi_proxy_launch_procs();
+ HYDU_ERR_POP(status, "unable to launch procs based on proxy handle info\n");
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ /* Now wait for the processes to finish */
+ status = wait_for_procs_to_finish();
+ HYDU_ERR_POP(status, "error waiting for processes to finish\n");
- /* Initialize the exit status */
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
- HYD_PMCD_pmi_proxy_params.exit_status[i] = -1;
-
- /* For local spawning, set the global environment here itself */
- status = HYDU_putenv_list(HYD_PMCD_pmi_proxy_params.global_env);
- HYDU_ERR_POP(status, "putenv returned error\n");
-
- status = HYDU_bind_init(HYD_PMCD_pmi_proxy_params.user_bind_map);
- HYDU_ERR_POP(status, "unable to initialize process binding\n");
-
- /* Spawn the processes */
- process_id = 0;
- for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
- for (i = 0; i < exec->proc_count; i++) {
-
- pmi_id = ((process_id / HYD_PMCD_pmi_proxy_params.partition_proc_count) *
- HYD_PMCD_pmi_proxy_params.one_pass_count);
- rem = (process_id % HYD_PMCD_pmi_proxy_params.partition_proc_count);
-
- for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment;
- segment = segment->next) {
- if (rem >= segment->proc_count)
- rem -= segment->proc_count;
- else {
- pmi_id += segment->start_pid + rem;
- break;
- }
- }
-
- str = HYDU_int_to_str(pmi_id);
- status = HYDU_env_create(&env, "PMI_ID", str);
- HYDU_ERR_POP(status, "unable to create env\n");
- HYDU_FREE(str);
- status = HYDU_putenv(env);
- HYDU_ERR_POP(status, "putenv failed\n");
-
- if (chdir(HYD_PMCD_pmi_proxy_params.wdir) < 0)
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
- "unable to change wdir (%s)\n", HYDU_strerror(errno));
-
- for (j = 0, arg = 0; exec->exec[j]; j++)
- client_args[arg++] = HYDU_strdup(exec->exec[j]);
- client_args[arg++] = NULL;
-
- core = HYDU_bind_get_core_id(process_id, HYD_PMCD_pmi_proxy_params.binding);
- if (pmi_id == 0) {
- status = HYDU_create_process(client_args, exec->prop_env,
- &HYD_PMCD_pmi_proxy_params.in,
- &HYD_PMCD_pmi_proxy_params.out[process_id],
- &HYD_PMCD_pmi_proxy_params.err[process_id],
- &HYD_PMCD_pmi_proxy_params.pid[process_id], core);
-
- status = HYDU_sock_set_nonblock(HYD_PMCD_pmi_proxy_params.in);
- HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
- stdin_fd = 0;
- status = HYDU_sock_set_nonblock(stdin_fd);
- HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
- HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
- HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
- status =
- HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL,
- HYD_PMCD_pmi_proxy_stdin_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
- }
- else {
- status = HYDU_create_process(client_args, exec->prop_env,
- NULL,
- &HYD_PMCD_pmi_proxy_params.out[process_id],
- &HYD_PMCD_pmi_proxy_params.err[process_id],
- &HYD_PMCD_pmi_proxy_params.pid[process_id], core);
- }
- HYDU_ERR_POP(status, "spawn process returned error\n");
-
- process_id++;
- }
+ ret_status = 0;
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+ ret_status |= HYD_PMCD_pmi_proxy_params.exit_status[i];
}
+ else { /* Persistent mode */
+ do {
+ /* Wait for the processes to finish. If there are no
+ * processes, we will just wait blocking for the work to
+ * arrive. */
+ status = wait_for_procs_to_finish();
+ HYDU_ERR_POP(status, "error waiting for processes to finish\n");
- /* Everything is spawned, now wait for I/O */
- status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
- HYD_PMCD_pmi_proxy_params.out,
- HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stdout_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
+ /* If processes had been launched and terminated, find the
+ * exit status, return it and cleanup everything. */
+ if (HYD_PMCD_pmi_proxy_params.procs_are_launched) {
+ ret_status = 0;
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+ ret_status |= HYD_PMCD_pmi_proxy_params.exit_status[i];
- status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
- HYD_PMCD_pmi_proxy_params.err,
- HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stderr_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
+ /* Send the exit status upstream */
+ status = HYDU_sock_write(HYD_PMCD_pmi_proxy_params.control_fd, &ret_status,
+ sizeof(int));
+ HYDU_ERR_POP(status, "unable to return exit status upstream\n");
- while (1) {
- /* Wait for some event to occur */
- status = HYD_DMX_wait_for_event(-1);
- HYDU_ERR_POP(status, "demux engine error waiting for event\n");
+ status = HYD_DMX_deregister_fd(HYD_PMCD_pmi_proxy_params.control_fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+ close(HYD_PMCD_pmi_proxy_params.control_fd);
- /* Check to see if there's any open read socket left; if there
- * are, we will just wait for more events. */
- count = 0;
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
- if (HYD_PMCD_pmi_proxy_params.out[i] != -1 ||
- HYD_PMCD_pmi_proxy_params.err[i] != -1) {
- count++;
- break;
+ /* cleanup the params structure for the next job */
+ status = HYD_PMCD_pmi_proxy_cleanup_params();
+ HYDU_ERR_POP(status, "unable to cleanup params\n");
}
- }
-
- /* We are done */
- if (!count)
- break;
+ } while (1);
}
- /* FIXME: If we did not break out yet, add a small usleep to yield
- * CPU here. We can not just sleep for the remaining time, as the
- * timeout value might be large and the application might exit
- * much quicker. Note that the sched_yield() call is broken on
- * newer linux kernel versions and should not be used. */
- /* Once all the sockets are closed, wait for all the processes to
- * finish. We poll here, but hopefully not for too long. */
- do {
- pid = waitpid(-1, &ret_status, WNOHANG);
-
- /* Find the pid and mark it as complete. */
- if (pid > 0)
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
- if (HYD_PMCD_pmi_proxy_params.pid[i] == pid)
- HYD_PMCD_pmi_proxy_params.exit_status[i] = WEXITSTATUS(ret_status);
-
- /* Check how many more processes are pending */
- count = 0;
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
- if (HYD_PMCD_pmi_proxy_params.exit_status[i] == -1) {
- count++;
- break;
- }
- }
-
- if (count == 0)
- break;
-
- /* Check if there are any messages from the launcher */
- status = HYD_DMX_wait_for_event(0);
- HYDU_ERR_POP(status, "demux engine error waiting for event\n");
- } while (1);
-
- ret_status = 0;
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
- ret_status |= HYD_PMCD_pmi_proxy_params.exit_status[i];
-
fn_exit:
if (status != HYD_SUCCESS)
return -1;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h 2009-03-30 03:57:55 UTC (rev 4222)
@@ -9,12 +9,19 @@
#include "hydra_base.h"
#include "hydra_utils.h"
+#include "pmi_common.h"
+typedef enum {
+ HYD_PMCD_PMI_PROXY_UNSET,
+ HYD_PMCD_PMI_PROXY_RUNTIME,
+ HYD_PMCD_PMI_PROXY_PERSISTENT
+} HYD_PMCD_pmi_proxy_type;
+
struct HYD_PMCD_pmi_proxy_params {
int debug;
int proxy_port;
- int is_persistent;
+ HYD_PMCD_pmi_proxy_type proxy_type;
char *wdir;
HYD_Binding binding;
char *user_bind_map;
@@ -25,16 +32,22 @@
int partition_proc_count;
int exec_proc_count;
+ int procs_are_launched;
+
/* Process segmentation information for this partition */
struct HYD_Partition_segment *segment_list;
struct HYD_Partition_exec *exec_list;
+ int out_upstream_fd;
+ int err_upstream_fd;
+ int in_upstream_fd;
+ int control_fd;
+
int *pid;
int *out;
int *err;
int *exit_status;
int in;
- int rproxy_connfd;
int stdin_buf_offset;
int stdin_buf_count;
@@ -44,17 +57,15 @@
extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
extern int HYD_PMCD_pmi_proxy_listenfd;
-HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
-HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
-HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv);
-HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
- int *key_lenp, char **valp, int *val_lenp,
- char separator);
-HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len);
-HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len);
+/* utils */
+HYD_Status HYD_PMCD_pmi_proxy_get_params(char **t_argv);
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(void);
+HYD_Status HYD_PMCD_pmi_proxy_procinfo(int fd);
+HYD_Status HYD_PMCD_pmi_proxy_launch(void);
+void HYD_PMCD_pmi_proxy_killjob(void);
+
+/* callback */
HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp);
-HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp);
-HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events, void *userp);
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -8,15 +8,14 @@
#include "hydra_utils.h"
#include "pmi_proxy.h"
#include "demux.h"
-#include "pmi_serv.h"
-extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
int HYD_PMCD_pmi_proxy_listenfd;
HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp)
{
- int accept_fd, cmd_len;
- char cmd[HYD_PMCD_MAX_CMD_LEN];
+ int accept_fd = -1, cmd_len;
+ enum HYD_PMCD_pmi_proxy_cmds cmd;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
@@ -28,13 +27,13 @@
status = HYDU_sock_accept(fd, &accept_fd);
HYDU_ERR_POP(status, "accept error\n");
- status =
- HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_listen_cb);
+ status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL,
+ HYD_PMCD_pmi_proxy_listen_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
}
else { /* We got a command from mpiexec */
- status = HYDU_sock_readline(fd, cmd, HYD_PMCD_MAX_CMD_LEN, &cmd_len);
- HYDU_ERR_POP(status, "Error reading command from proxy");
+ status = HYDU_sock_read(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds), &cmd_len);
+ HYDU_ERR_POP(status, "error reading command from launcher\n");
if (cmd_len == 0) {
/* The connection has closed */
status = HYD_DMX_deregister_fd(fd);
@@ -42,82 +41,51 @@
close(fd);
goto fn_exit;
}
- status = HYD_PMCD_pmi_proxy_handle_cmd(fd, cmd, cmd_len);
- HYDU_ERR_POP(status, "Error handling proxy command\n");
- }
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
- fn_fail:
- goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp)
-{
- int closed, i;
- HYD_Status status = HYD_SUCCESS;
- struct HYD_PMCD_pmi_proxy_params *proxy_params;
-
- HYDU_FUNC_ENTER();
- proxy_params = (struct HYD_PMCD_pmi_proxy_params *) userp;
-
- status = HYDU_sock_stdout_cb(fd, events, proxy_params->rproxy_connfd, &closed);
- HYDU_ERR_POP(status, "stdout callback error\n");
-
- if (closed) {
- int all_procs_exited = 1;
- /* The process exited */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to deregister fd\n");
-
- /* FIXME: This could be a perf killer if we have a lot of procs associated with
- * the same job on a single proxy
- */
- for (i = 0; i < proxy_params->exec_proc_count; i++) {
- int ret_status = 0;
- if (proxy_params->out[i] == fd) {
- waitpid(proxy_params->pid[i], &ret_status, WUNTRACED);
- close(proxy_params->in);
- proxy_params->out[i] = -1;
- proxy_params->err[i] = -1;
- }
- if (proxy_params->out[i] != -1)
- all_procs_exited = 0;
+ if (cmd == PROC_INFO) {
+ status = HYD_PMCD_pmi_proxy_procinfo(fd);
}
- if (all_procs_exited) {
- close(proxy_params->rproxy_connfd);
- status = HYD_DMX_deregister_fd(proxy_params->rproxy_connfd);
- HYDU_ERR_POP(status, "Error deregistering remote job conn fd\n");
- status = HYD_PMCD_pmi_proxy_cleanup_params(proxy_params);
- HYDU_ERR_POP(status, "Error cleaning up proxy params\n");
+ else if (cmd == USE_AS_STDOUT) {
+ HYD_PMCD_pmi_proxy_params.out_upstream_fd = fd;
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
}
- }
+ else if (cmd == USE_AS_STDERR) {
+ HYD_PMCD_pmi_proxy_params.err_upstream_fd = fd;
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+ }
+ else if (cmd == USE_AS_STDIN) {
+ HYD_PMCD_pmi_proxy_params.in_upstream_fd = fd;
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+ }
+ else if (cmd == KILL_JOB) {
+ HYD_PMCD_pmi_proxy_killjob();
+ status = HYD_SUCCESS;
+ }
+ else if (cmd == PROXY_SHUTDOWN) {
+ /* FIXME: shutdown should be handled more cleanly. That
+ * is, check if there are other processes still running
+ * and kill them before exiting. */
+ exit(-1);
+ }
+ else {
+ status = HYD_INTERNAL_ERROR;
+ }
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
+ HYDU_ERR_POP(status, "error handling proxy command\n");
- fn_fail:
- goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp)
-{
- int closed = 0, i;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
- /* FIXME: This cb should take care of the commands from mpiexec */
-
- if (closed) {
- /* The connection has closed */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to deregister fd\n");
-
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
- if (HYD_PMCD_pmi_proxy_params.out[i] == fd)
- HYD_PMCD_pmi_proxy_params.out[i] = -1;
+ /* One of these commands can trigger the start of the
+ * application since they can arrive in any order. */
+ if ((cmd == PROC_INFO) || (cmd == USE_AS_STDOUT) || (cmd == USE_AS_STDERR) ||
+ (cmd == USE_AS_STDIN))
+ if ((HYD_PMCD_pmi_proxy_params.segment_list != NULL) &&
+ (HYD_PMCD_pmi_proxy_params.out_upstream_fd != -1) &&
+ (HYD_PMCD_pmi_proxy_params.err_upstream_fd != -1))
+ if ((HYD_PMCD_pmi_proxy_params.segment_list->start_pid != 0) ||
+ (HYD_PMCD_pmi_proxy_params.in_upstream_fd != -1))
+ HYD_PMCD_pmi_proxy_launch_procs();
}
fn_exit:
@@ -135,7 +103,8 @@
HYDU_FUNC_ENTER();
- status = HYDU_sock_stdout_cb(fd, events, 1, &closed);
+ status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.out_upstream_fd,
+ &closed);
HYDU_ERR_POP(status, "stdout callback error\n");
if (closed) {
@@ -164,7 +133,8 @@
HYDU_FUNC_ENTER();
- status = HYDU_sock_stdout_cb(fd, events, 2, &closed);
+ status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.err_upstream_fd,
+ &closed);
HYDU_ERR_POP(status, "stdout callback error\n");
if (closed) {
@@ -193,7 +163,9 @@
HYDU_FUNC_ENTER();
+ /* FIXME: HYD_PMCD_pmi_proxy_params.in_upstream_fd needs to be passed in */
status = HYDU_sock_stdin_cb(HYD_PMCD_pmi_proxy_params.in, events,
+ HYD_PMCD_pmi_proxy_params.in_upstream_fd,
HYD_PMCD_pmi_proxy_params.stdin_tmp_buf,
&HYD_PMCD_pmi_proxy_params.stdin_buf_count,
&HYD_PMCD_pmi_proxy_params.stdin_buf_offset, &closed);
@@ -205,8 +177,6 @@
HYDU_ERR_POP(status, "unable to deregister fd\n");
close(HYD_PMCD_pmi_proxy_params.in);
- close(fd);
-
HYD_PMCD_pmi_proxy_params.in = -1;
}
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -5,338 +5,58 @@
*/
#include "pmi_proxy.h"
-#include "pmi_serv.h"
#include "demux.h"
#include "hydra_utils.h"
struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
-HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
- int *key_lenp, char **valp, int *val_lenp,
- char separator)
+static HYD_Status init_params()
{
- char *p = NULL;
- int len = 0;
- int klen = 0;
- int vlen = 0;
-
HYD_Status status = HYD_SUCCESS;
- p = *bufp;
- len = *buf_lenp;
+ HYD_PMCD_pmi_proxy_params.debug = 0;
- while (len && isspace(*p)) {
- p++;
- len--;
- }
- if (len <= 0)
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+ HYD_PMCD_pmi_proxy_params.proxy_port = -1;
+ HYD_PMCD_pmi_proxy_params.proxy_type = HYD_PMCD_PMI_PROXY_UNSET;
+ HYD_PMCD_pmi_proxy_params.wdir = NULL;
+ HYD_PMCD_pmi_proxy_params.binding = HYD_BIND_UNSET;
+ HYD_PMCD_pmi_proxy_params.user_bind_map = NULL;
- *keyp = p;
- klen = 0;
- while (len && (*p != '=')) {
- p++;
- len--;
- klen++;
- }
- if (key_lenp)
- *key_lenp = klen;
- p++;
- len--;
- if (len <= 0)
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
+ HYD_PMCD_pmi_proxy_params.global_env = NULL;
- *valp = p;
- vlen = 0;
- /* FIXME: Allow escaping ';' */
- while (len && (*p != separator)) {
- p++;
- len--;
- vlen++;
- }
- if (val_lenp)
- *val_lenp = vlen;
- p++;
- len--;
- if (len < 0)
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "Error reading keyval from command\n");
-
- while (len && isspace(*p)) {
- p++;
- len--;
- }
- /* p now points to the next key or the end of string */
- *bufp = p;
- if (*p != '\0') {
- /* Remaining length of buffer to be processed */
- *buf_lenp = len;
- }
- else {
- /* End of string - no more keyvals */
- *buf_lenp = 0;
- }
-
- fn_exit:
- return status;
- fn_fail:
- goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len)
-{
- char *key, *val;
- int i = 0, key_len = 0, val_len = 0, core = 0;
- struct HYD_Partition_exec *exec = NULL;
- HYD_Env_t *env = NULL;
- HYD_Status status = HYD_SUCCESS;
-
- /* FIXME: We currently support only one job - We need a list of proxy params for multiple jobs */
- status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
- HYDU_ERR_POP(status, "Error initializing proxy params\n");
-
- HYD_PMCD_pmi_proxy_params.rproxy_connfd = job_connfd;
-
- status = HYD_DMX_deregister_fd(job_connfd);
- HYDU_ERR_POP(status, "Unable to deregister job conn fd\n");
- status =
- HYD_DMX_register_fd(1, &job_connfd, HYD_STDIN, (void *) &HYD_PMCD_pmi_proxy_params,
- HYD_PMCD_pmi_proxy_remote_cb);
- HYDU_ERR_POP(status, "Unable to register job conn fd\n");
-
- status = HYDU_alloc_partition_exec(&HYD_PMCD_pmi_proxy_params.exec_list);
- HYDU_ERR_POP(status, "unable to allocate partition exec\n");
-
- exec = HYD_PMCD_pmi_proxy_params.exec_list;
-
- while (cmd_len > 0) {
- status =
- HYD_PMCD_pmi_proxy_get_next_keyvalp(&launch_cmd, &cmd_len, &key, &key_len, &val,
- &val_len, HYD_PMCD_CMD_SEP_CHAR);
- HYDU_ERR_POP(status, "Unable to get next key from launch command\n");
-
- /* FIXME: Use pre-defined macros for keys */
- if (!strncmp(key, "exec_name", key_len)) {
- HYDU_MALLOC(exec->exec[0], char *, (val_len + 1), status);
- HYDU_ERR_POP(status, "Error allocating memory for executable name\n");
- HYDU_snprintf(exec->exec[0], val_len, "%s", val);
- exec->exec[1] = NULL;
- }
- else if (!strncmp(key, "exec_cnt", key_len)) {
- for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec->next; exec = exec->next);
- exec->proc_count = atoi(val);
- }
- else if (!strncmp(key, "env", key_len)) {
- char *env_str;
- int env_str_len;
-
- env_str = val;
- env_str_len = val_len;
- exec->prop_env = NULL;
- while (env_str_len > 0) {
- status =
- HYD_PMCD_pmi_proxy_get_next_keyvalp(&env_str, &env_str_len, &key, &key_len,
- &val, &val_len,
- HYD_PMCD_CMD_ENV_SEP_CHAR);
- HYDU_ERR_POP(status,
- "Error getting next environment variable from launch command\n");
-
- HYDU_MALLOC(env, HYD_Env_t *, sizeof(HYD_Env_t), status);
- HYDU_ERR_POP(status,
- "Error allocating memory for environment variable in proxy params\n");
-
- HYDU_MALLOC(env->env_name, char *, key_len + 1, status);
- HYDU_ERR_POP(status,
- "Error allocating memory for environment variable in proxy params\n");
- HYDU_snprintf(env->env_name, key_len + 1, "%s", key);
-
- HYDU_MALLOC(env->env_value, char *, val_len + 1, status);
- HYDU_ERR_POP(status,
- "Error allocating memory for environment variable in proxy params\n");
- HYDU_snprintf(env->env_value, val_len + 1, "%s", val);
- env->next = exec->prop_env;
- exec->prop_env = env;
- }
- }
- else {
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
- "Unrecognized key in launch command\n");
- }
-
- /* FIXME: Set these ... */
- /* HYD_PMCD_pmi_proxy_params.wdir =
- * HYD_PMCD_pmi_proxy_params.binding =
- * HYD_PMCD_pmi_proxy_params.user_bind_map = ;
- * HYDU_append_env_to_list(*env, &HYD_PMCD_pmi_proxy_params.global_env);
- * HYD_PMCD_pmi_proxy_params.one_pass_count
- * status = HYDU_alloc_partition_segment(&segment->next);
- * segment->proc_count = ;
- * segment->start_pid = ;
- */
- }
-
+ HYD_PMCD_pmi_proxy_params.one_pass_count = 0;
+ HYD_PMCD_pmi_proxy_params.partition_proc_count = 0;
HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
- for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
- HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
- HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ HYD_PMCD_pmi_proxy_params.procs_are_launched = 0;
- for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
- for (i = 0; i < exec->proc_count; i++) {
- char *str = NULL;
- core = 0;
- env = NULL;
- /* FIXME: Use the start pmi_id from launch command */
- str = HYDU_int_to_str(i);
- status = HYDU_env_create(&env, "PMI_ID", str);
- HYDU_ERR_POP(status, "unable to create env\n");
- status = HYDU_putenv(env);
- HYDU_ERR_POP(status, "putenv failed\n");
- status = HYDU_create_process(&exec->exec[0], exec->prop_env, NULL,
- &HYD_PMCD_pmi_proxy_params.out[i],
- &HYD_PMCD_pmi_proxy_params.err[i],
- &HYD_PMCD_pmi_proxy_params.pid[i], core);
- HYDU_ERR_POP(status, "Error launching process\n");
- status =
- HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_params.out[i], HYD_STDOUT,
- (void *) &HYD_PMCD_pmi_proxy_params,
- HYD_PMCD_pmi_proxy_rstdout_cb);
- HYDU_ERR_POP(status, "Error registering process stdout\n");
- }
- }
+ HYD_PMCD_pmi_proxy_params.segment_list = NULL;
+ HYD_PMCD_pmi_proxy_params.exec_list = NULL;
- fn_exit:
- return status;
- fn_fail:
- goto fn_exit;
-}
+ HYD_PMCD_pmi_proxy_params.out_upstream_fd = -1;
+ HYD_PMCD_pmi_proxy_params.err_upstream_fd = -1;
+ HYD_PMCD_pmi_proxy_params.in_upstream_fd = -1;
+ HYD_PMCD_pmi_proxy_params.control_fd = -1;
-/* Handle proxy commands */
-HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len)
-{
- char *key = NULL;
- char *cmd_name = NULL;
- int i = 0, key_len = 0, cmd_name_len = 0;
- char *cmdp = NULL;
- HYD_Status status = HYD_SUCCESS;
+ HYD_PMCD_pmi_proxy_params.pid = NULL;
+ HYD_PMCD_pmi_proxy_params.out = NULL;
+ HYD_PMCD_pmi_proxy_params.err = NULL;
+ HYD_PMCD_pmi_proxy_params.exit_status = NULL;
+ HYD_PMCD_pmi_proxy_params.in = -1;
- cmdp = cmd;
- /* First key/val is the command name */
- status = HYD_PMCD_pmi_proxy_get_next_keyvalp(&cmdp, &cmd_len, &key, &key_len, &cmd_name,
- &cmd_name_len, HYD_PMCD_CMD_SEP_CHAR);
- HYDU_ERR_POP(status, "Error retreiving command name from command\n");
+ HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
+ HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
+ HYD_PMCD_pmi_proxy_params.stdin_tmp_buf[0] = '\0';
- if (!strncmp(cmd_name, HYD_PMCD_CMD_KILLALL_PROCS, key_len)) {
- for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
- if (HYD_PMCD_pmi_proxy_params.pid[i] != -1)
- kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
-
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to register fd\n");
- close(fd);
- }
- else if (!strncmp(cmd_name, HYD_PMCD_CMD_LAUNCH_PROCS, key_len)) {
- status = HYD_PMCD_pmi_proxy_handle_launch_cmd(fd, cmdp, cmd_len);
- HYDU_ERR_POP(status, "Unable to handle launch command\n");
- }
- else if (!strncmp(cmd_name, HYD_PMCD_CMD_SHUTDOWN, key_len)) {
- /* FIXME: Not a clean shutdown... Kill all procs before exiting */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to register fd\n");
- close(fd);
- exit(0);
- }
- else {
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
- "got unrecognized command from mpiexec\n");
- }
- fn_exit:
return status;
- fn_fail:
- goto fn_exit;
}
-/* Initialize proxy params */
-HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params)
+/* FIXME: This function performs minimal error checking as it is not
+ * supposed to be called by the user, but rather by the process
+ * management server. It will still be helpful for debugging to add
+ * some error checks. */
+static HYD_Status parse_params(char **t_argv)
{
- HYD_Status status = HYD_SUCCESS;
- proxy_params->debug = 0;
- proxy_params->proxy_port = -1;
- proxy_params->is_persistent = 0;
- proxy_params->wdir = NULL;
- proxy_params->binding = HYD_BIND_UNSET;
- proxy_params->user_bind_map = NULL;
- proxy_params->global_env = NULL;
- proxy_params->one_pass_count = 0;
- proxy_params->partition_proc_count = 0;
- proxy_params->exec_proc_count = 0;
- proxy_params->segment_list = NULL;
- proxy_params->exec_list = NULL;
- proxy_params->pid = NULL;
- proxy_params->out = NULL;
- proxy_params->err = NULL;
- proxy_params->exit_status = NULL;
- proxy_params->in = -1;
- proxy_params->rproxy_connfd = -1;
- proxy_params->stdin_buf_offset = 0;
- proxy_params->stdin_buf_count = 0;
- proxy_params->stdin_tmp_buf[0] = '\0';
- return status;
-}
-
-/* Cleanup proxy params after use */
-HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params * proxy_params)
-{
- HYD_Status status = HYD_SUCCESS;
- if (proxy_params->wdir != NULL)
- HYDU_FREE(proxy_params->wdir);
- if (proxy_params->user_bind_map != NULL)
- HYDU_FREE(proxy_params->user_bind_map);
- if (proxy_params->global_env != NULL) {
- HYD_Env_t *p, *q;
- do {
- p = proxy_params->global_env;
- q = p->next;
- HYDU_FREE(p);
- } while (q);
- }
- if (proxy_params->segment_list != NULL) {
- /* FIXME : incomplete */
- }
- if (proxy_params->exec_list != NULL) {
- struct HYD_Partition_exec *p, *q;
- do {
- p = proxy_params->exec_list;
- q = p->next;
- HYDU_FREE(p);
- } while (q);
- }
- if (proxy_params->pid != NULL)
- HYDU_FREE(proxy_params->pid);
- if (proxy_params->out != NULL)
- HYDU_FREE(proxy_params->out);
- if (proxy_params->err != NULL)
- HYDU_FREE(proxy_params->err);
- if (proxy_params->exit_status != NULL)
- HYDU_FREE(proxy_params->exit_status);
-
- status = HYD_PMCD_pmi_proxy_init_params(proxy_params);
- HYDU_ERR_POP(status, "Error initializing proxy params\n");
-
- fn_exit:
- return status;
- fn_fail:
- goto fn_exit;
-}
-
-HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv)
-{
char **argv = t_argv, *str;
int arg, i, count;
HYD_Env_t *env;
@@ -346,18 +66,12 @@
HYDU_FUNC_ENTER();
- status = HYD_PMCD_pmi_proxy_init_params(&HYD_PMCD_pmi_proxy_params);
- HYDU_ERR_POP(status, "Error initializing proxy params\n");
-
- while (*argv) {
- ++argv;
- if (*argv == NULL)
- break;
-
+ while (++argv && *argv) {
if (!strcmp(*argv, "--verbose")) {
HYD_PMCD_pmi_proxy_params.debug = 1;
continue;
}
+
/* Proxy port */
if (!strcmp(*argv, "--proxy-port")) {
argv++;
@@ -365,11 +79,6 @@
continue;
}
- if (!strcmp(*argv, "--persistent")) {
- HYD_PMCD_pmi_proxy_params.is_persistent = 1;
- continue;
- }
-
/* Working directory */
if (!strcmp(*argv, "--wdir")) {
argv++;
@@ -508,6 +217,9 @@
/* If we already touched the next --exec, step back */
if (*argv && !strcmp(*argv, "--exec"))
argv--;
+
+ if (!(*argv))
+ break;
}
fn_exit:
@@ -517,3 +229,313 @@
fn_fail:
goto fn_exit;
}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_get_params(char **t_argv)
+{
+ char **argv = t_argv;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = init_params();
+ HYDU_ERR_POP(status, "Error initializing proxy params\n");
+
+ /* For the persistent mode, the parameters are fairly
+ * straightward. For the runtime mode, we call the parse_params()
+ * function to parse through argv and fill in the proxy handle. */
+ ++argv;
+ if (!strcmp(*argv, "--persistent-mode")) {
+ HYD_PMCD_pmi_proxy_params.proxy_type = HYD_PMCD_PMI_PROXY_PERSISTENT;
+
+ /* the next argument should be proxy port */
+ ++argv;
+ if (strcmp(*argv, "--proxy-port"))
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "incorrect proxy parameters\n");
+
+ ++argv;
+ HYD_PMCD_pmi_proxy_params.proxy_port = atoi(*argv);
+ }
+ else {
+ HYD_PMCD_pmi_proxy_params.proxy_type = HYD_PMCD_PMI_PROXY_RUNTIME;
+ status = parse_params(t_argv);
+ HYDU_ERR_POP(status, "error parsing proxy params\n");
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(void)
+{
+ struct HYD_Partition_segment *segment, *tsegment;
+ struct HYD_Partition_exec *exec, *texec;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ if (HYD_PMCD_pmi_proxy_params.wdir)
+ HYDU_FREE(HYD_PMCD_pmi_proxy_params.wdir);
+
+ if (HYD_PMCD_pmi_proxy_params.user_bind_map)
+ HYDU_FREE(HYD_PMCD_pmi_proxy_params.user_bind_map);
+
+ if (HYD_PMCD_pmi_proxy_params.global_env)
+ HYDU_env_free_list(HYD_PMCD_pmi_proxy_params.global_env);
+
+ if (HYD_PMCD_pmi_proxy_params.segment_list) {
+ segment = HYD_PMCD_pmi_proxy_params.segment_list;
+ while (segment) {
+ tsegment = segment->next;
+ if (segment->mapping) {
+ HYDU_free_strlist(segment->mapping);
+ HYDU_FREE(segment->mapping);
+ }
+ HYDU_FREE(segment);
+ segment = tsegment;
+ }
+ }
+
+ if (HYD_PMCD_pmi_proxy_params.exec_list) {
+ exec = HYD_PMCD_pmi_proxy_params.exec_list;
+ while (exec) {
+ texec = exec->next;
+ HYDU_free_strlist(exec->exec);
+ if (exec->prop_env)
+ HYDU_env_free(exec->prop_env);
+ HYDU_FREE(exec);
+ exec = texec;
+ }
+ }
+
+ if (HYD_PMCD_pmi_proxy_params.pid)
+ HYDU_FREE(HYD_PMCD_pmi_proxy_params.pid);
+
+ if (HYD_PMCD_pmi_proxy_params.out)
+ HYDU_FREE(HYD_PMCD_pmi_proxy_params.out);
+
+ if (HYD_PMCD_pmi_proxy_params.err)
+ HYDU_FREE(HYD_PMCD_pmi_proxy_params.err);
+
+ if (HYD_PMCD_pmi_proxy_params.exit_status)
+ HYDU_FREE(HYD_PMCD_pmi_proxy_params.exit_status);
+
+ /* Reinitialize all params to set everything to "NULL" or
+ * equivalent. */
+ status = init_params();
+ HYDU_ERR_POP(status, "unable to initialize params\n");
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_procinfo(int fd)
+{
+ char **arglist;
+ int num_strings, str_len, recvd, i;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Read information about the application to launch into a string
+ * array and call parse_params() to interpret it and load it into
+ * the proxy handle. */
+ status = HYDU_sock_read(fd, &num_strings, sizeof(int), &recvd);
+ HYDU_ERR_POP(status, "error reading data from upstream\n");
+
+ HYDU_MALLOC(arglist, char **, num_strings * sizeof(char *), status);
+
+ for (i = 0; i < num_strings; i++) {
+ status = HYDU_sock_read(fd, &str_len, sizeof(int), &recvd);
+ HYDU_ERR_POP(status, "error reading data from upstream\n");
+
+ HYDU_MALLOC(arglist[i], char *, str_len, status);
+
+ status = HYDU_sock_read(fd, arglist[i], str_len, &recvd);
+ HYDU_ERR_POP(status, "error reading data from upstream\n");
+ }
+ arglist[num_strings] = NULL;
+
+ /* Get the parser to fill in the proxy params structure. */
+ status = parse_params(arglist);
+ HYDU_ERR_POP(status, "unable to parse argument list\n");
+
+ /* Save this fd as we need to send back the exit status on
+ * this. */
+ HYD_PMCD_pmi_proxy_params.control_fd = fd;
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_launch_procs()
+{
+ int i, j, arg, stdin_fd, process_id, core, pmi_id, rem;
+ char *str;
+ char *client_args[HYD_NUM_TMP_STRINGS];
+ HYD_Env_t *env;
+ struct HYD_Partition_segment *segment;
+ struct HYD_Partition_exec *exec;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ HYD_PMCD_pmi_proxy_params.partition_proc_count = 0;
+ for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment;
+ segment = segment->next)
+ HYD_PMCD_pmi_proxy_params.partition_proc_count += segment->proc_count;
+
+ HYD_PMCD_pmi_proxy_params.exec_proc_count = 0;
+ for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next)
+ HYD_PMCD_pmi_proxy_params.exec_proc_count += exec->proc_count;
+
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
+ HYD_PMCD_pmi_proxy_params.exec_proc_count * sizeof(int), status);
+
+ /* Initialize the exit status */
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
+ HYD_PMCD_pmi_proxy_params.exit_status[i] = -1;
+
+ /* For local spawning, set the global environment here itself */
+ status = HYDU_putenv_list(HYD_PMCD_pmi_proxy_params.global_env);
+ HYDU_ERR_POP(status, "putenv returned error\n");
+
+ status = HYDU_bind_init(HYD_PMCD_pmi_proxy_params.user_bind_map);
+ HYDU_ERR_POP(status, "unable to initialize process binding\n");
+
+ /* Spawn the processes */
+ process_id = 0;
+ for (exec = HYD_PMCD_pmi_proxy_params.exec_list; exec; exec = exec->next) {
+ for (i = 0; i < exec->proc_count; i++) {
+
+ pmi_id = ((process_id / HYD_PMCD_pmi_proxy_params.partition_proc_count) *
+ HYD_PMCD_pmi_proxy_params.one_pass_count);
+ rem = (process_id % HYD_PMCD_pmi_proxy_params.partition_proc_count);
+
+ for (segment = HYD_PMCD_pmi_proxy_params.segment_list; segment;
+ segment = segment->next) {
+ if (rem >= segment->proc_count)
+ rem -= segment->proc_count;
+ else {
+ pmi_id += segment->start_pid + rem;
+ break;
+ }
+ }
+
+ str = HYDU_int_to_str(pmi_id);
+ status = HYDU_env_create(&env, "PMI_ID", str);
+ HYDU_ERR_POP(status, "unable to create env\n");
+ HYDU_FREE(str);
+ status = HYDU_putenv(env);
+ HYDU_ERR_POP(status, "putenv failed\n");
+
+ if (chdir(HYD_PMCD_pmi_proxy_params.wdir) < 0)
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
+ "unable to change wdir (%s)\n",
+ HYDU_strerror(errno));
+
+ for (j = 0, arg = 0; exec->exec[j]; j++)
+ client_args[arg++] = HYDU_strdup(exec->exec[j]);
+ client_args[arg++] = NULL;
+
+ core = HYDU_bind_get_core_id(process_id, HYD_PMCD_pmi_proxy_params.binding);
+ if (pmi_id == 0) {
+ status = HYDU_create_process(client_args, exec->prop_env,
+ &HYD_PMCD_pmi_proxy_params.in,
+ &HYD_PMCD_pmi_proxy_params.out[process_id],
+ &HYD_PMCD_pmi_proxy_params.err[process_id],
+ &HYD_PMCD_pmi_proxy_params.pid[process_id],
+ core);
+
+ status = HYDU_sock_set_nonblock(HYD_PMCD_pmi_proxy_params.in);
+ HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+ stdin_fd = 0;
+ status = HYDU_sock_set_nonblock(stdin_fd);
+ HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+ HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
+ HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
+ status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL,
+ HYD_PMCD_pmi_proxy_stdin_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ }
+ else {
+ status = HYDU_create_process(client_args, exec->prop_env,
+ NULL,
+ &HYD_PMCD_pmi_proxy_params.out[process_id],
+ &HYD_PMCD_pmi_proxy_params.err[process_id],
+ &HYD_PMCD_pmi_proxy_params.pid[process_id],
+ core);
+ }
+ HYDU_ERR_POP(status, "spawn process returned error\n");
+
+ process_id++;
+ }
+ }
+
+ /* Everything is spawned, register the required FDs */
+ status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
+ HYD_PMCD_pmi_proxy_params.out,
+ HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stdout_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+
+ status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
+ HYD_PMCD_pmi_proxy_params.err,
+ HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stderr_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+
+ /* Indicate that the processes have been launched */
+ HYD_PMCD_pmi_proxy_params.procs_are_launched = 1;
+
+ HYDU_FUNC_EXIT();
+
+ fn_exit:
+ return status;
+ fn_fail:
+ goto fn_exit;
+}
+
+
+void HYD_PMCD_pmi_proxy_killjob(void)
+{
+ int i;
+
+ HYDU_FUNC_ENTER();
+
+ /* Send the kill signal to all processes */
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++) {
+ if (HYD_PMCD_pmi_proxy_params.pid[i] != -1) {
+ kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGTERM);
+ kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
+ }
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return;
+
+ fn_fail:
+ goto fn_exit;
+}
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h 2009-03-30 03:57:55 UTC (rev 4222)
@@ -7,20 +7,12 @@
#ifndef PMI_SERV_H_INCLUDED
#define PMI_SERV_H_INCLUDED
-/* The set of commands supported */
-#define HYD_PMCD_CMD_KILLALL_PROCS "kill_all_procs"
-#define HYD_PMCD_CMD_KILLALL_PROXIES "kill_all_proxies"
-#define HYD_PMCD_CMD_LAUNCH_PROCS "launch_procs"
-#define HYD_PMCD_CMD_SHUTDOWN "shutdown"
+#include "pmi_common.h"
-#define HYD_PMCD_CMD_SEP_CHAR ';'
-#define HYD_PMCD_CMD_ENV_SEP_CHAR ','
-
-#define HYD_PMCD_MAX_CMD_LEN 1024
-
extern int HYD_PMCD_pmi_serv_listenfd;
HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events, void *userp);
+HYD_Status HYD_PMCD_pmi_serv_control_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_serv_cleanup(void);
void HYD_PMCD_pmi_serv_signal_cb(int signal);
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -14,7 +14,7 @@
#include "pmi_serv.h"
int HYD_PMCD_pmi_serv_listenfd;
-extern HYD_Handle handle;
+HYD_Handle handle;
struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_handle_list;
/*
@@ -137,11 +137,38 @@
}
+HYD_Status HYD_PMCD_pmi_serv_control_cb(int fd, HYD_Event_t events, void *userp)
+{
+ struct HYD_Partition *partition;
+ int count;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ partition = (struct HYD_Partition *) userp;
+
+ status = HYDU_sock_read(fd, &partition->exit_status, sizeof(int), &count);
+ HYDU_ERR_POP(status, "unable to read status from proxy\n");
+
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "error deregistering fd\n");
+
+ close(fd);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
HYD_Status HYD_PMCD_pmi_serv_cleanup(void)
{
struct HYD_Partition *partition;
int fd;
- char cmd[HYD_PMCD_MAX_CMD_LEN];
+ enum HYD_PMCD_pmi_proxy_cmds cmd;
HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
@@ -149,8 +176,6 @@
/* FIXME: Instead of doing this from this process itself, fork a
* bunch of processes to do this. */
/* Connect to all proxies and send a KILL command */
- HYDU_snprintf(cmd, HYD_PMCD_MAX_CMD_LEN, "%s=%s %c\n",
- "cmd", HYD_PMCD_CMD_KILLALL_PROCS, HYD_PMCD_CMD_SEP_CHAR);
for (partition = handle.partition_list; partition; partition = partition->next) {
status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
if (status != HYD_SUCCESS) {
@@ -159,7 +184,8 @@
continue; /* Move on to the next proxy */
}
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ cmd = KILL_JOB;
+ status = HYDU_sock_write(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
if (status != HYD_SUCCESS) {
HYDU_Warn_printf("unable to send data to the proxy on %s\n", partition->name);
overall_status = HYD_INTERNAL_ERROR;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -30,12 +30,14 @@
status = HYD_PMCD_pmi_finalize();
HYDU_ERR_POP(status, "unable to finalize process manager utils\n");
- /* We use BSC only for local proxies */
- if (!handle.is_proxy_remote) {
+ if (handle.launch_mode == HYD_LAUNCH_RUNTIME) {
status = HYD_BSCI_finalize();
HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
}
+ status = HYD_DMX_finalize();
+ HYDU_ERR_POP(status, "error returned from demux finalize\n");
+
fn_exit:
HYDU_FUNC_EXIT();
return status;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -13,18 +13,17 @@
#include "pmi_serv.h"
int HYD_PMCD_pmi_serv_listenfd;
-extern HYD_Handle handle;
+HYD_Handle handle;
-/* Local proxy is a proxy that is local to this process */
-static HYD_Status launch_procs_with_local_proxy(void)
+static HYD_Status fill_in_proxy_args(void)
{
- HYD_Status status = HYD_SUCCESS;
int i, arg, process_id;
char *path_str[HYD_NUM_TMP_STRINGS];
HYD_Env_t *env;
struct HYD_Partition *partition;
struct HYD_Partition_exec *exec;
struct HYD_Partition_segment *segment;
+ HYD_Status status = HYD_SUCCESS;
handle.one_pass_count = 0;
for (partition = handle.partition_list; partition; partition = partition->next)
@@ -114,6 +113,27 @@
}
}
+ fn_exit:
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+/* Local proxy is a proxy that is local to this process */
+static HYD_Status launch_procs_in_runtime_mode(void)
+{
+ int i, arg, process_id;
+ char *path_str[HYD_NUM_TMP_STRINGS];
+ HYD_Env_t *env;
+ struct HYD_Partition *partition;
+ struct HYD_Partition_exec *exec;
+ struct HYD_Partition_segment *segment;
+ HYD_Status status = HYD_SUCCESS;
+
+ status = fill_in_proxy_args();
+ HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
+
/* Initialize the bootstrap server and ask it to launch the
* processes. */
status = HYD_BSCI_init(handle.bootstrap);
@@ -124,93 +144,196 @@
fn_exit:
return status;
+
fn_fail:
goto fn_exit;
}
-/* Request remote proxies to shutdown */
-static HYD_Status shutdown_remote_proxies(void)
+static HYD_Status boot_proxies(void)
{
- char shutdown_proxies_cmd[HYD_PMCD_MAX_CMD_LEN];
- struct HYD_Partition *partition = NULL;
- int HYD_PMCD_pmi_proxy_connfd = -1;
HYD_Status status = HYD_SUCCESS;
+ int i, arg;
+ char *path_str[HYD_NUM_TMP_STRINGS];
+ struct HYD_Partition *partition;
+ handle.one_pass_count = 0;
+ for (partition = handle.partition_list; partition; partition = partition->next)
+ handle.one_pass_count += partition->total_proc_count;
+
+ /* Create the arguments list for each proxy */
for (partition = handle.partition_list; partition; partition = partition->next) {
- status = HYDU_sock_connect(partition->name, handle.pproxy_port,
- &HYD_PMCD_pmi_proxy_connfd);
- HYDU_ERR_POP(status, "Error connecting to remote proxy");
- /* Create shutdown command */
- HYDU_snprintf(shutdown_proxies_cmd, HYD_PMCD_MAX_CMD_LEN,
- "%s=%s %c\n", "cmd", HYD_PMCD_CMD_SHUTDOWN, HYD_PMCD_CMD_SEP_CHAR);
+ arg = HYDU_strlist_lastidx(partition->proxy_args);
+ i = 0;
+ path_str[i++] = HYDU_strdup(handle.base_path);
+ path_str[i++] = HYDU_strdup("pmi_proxy");
+ path_str[i] = NULL;
+ status = HYDU_str_alloc_and_join(path_str, &partition->proxy_args[arg++]);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+ HYDU_free_strlist(path_str);
- status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, shutdown_proxies_cmd,
- strlen(shutdown_proxies_cmd));
- HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+ partition->proxy_args[arg++] = HYDU_strdup("--persistent-mode");
+ partition->proxy_args[arg++] = HYDU_strdup("--proxy-port");
+ partition->proxy_args[arg++] = HYDU_int_to_str(handle.proxy_port);
+ partition->proxy_args[arg++] = NULL;
- /* FIXME: Read result */
- partition->out = HYD_PMCD_pmi_proxy_connfd;
- partition->err = -1;
+ if (handle.debug) {
+ HYDU_Debug("Executable passed to the bootstrap: ");
+ HYDU_print_strlist(partition->proxy_args);
+ }
}
+ /* Initialize the bootstrap server and ask it to launch the
+ * processes. */
+ status = HYD_BSCI_init(handle.bootstrap);
+ HYDU_ERR_POP(status, "bootstrap server initialization failed\n");
+
+ status = HYD_BSCI_launch_procs();
+ HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
+
fn_exit:
return status;
+
fn_fail:
goto fn_exit;
}
-/* Remote proxy is a proxy external to this process */
-static HYD_Status launch_procs_with_remote_proxy(void)
+static HYD_Status shutdown_proxies(void)
{
+ struct HYD_Partition *partition;
+ enum HYD_PMCD_pmi_proxy_cmds cmd;
+ int fd;
HYD_Status status = HYD_SUCCESS;
- char launch_procs_cmd[HYD_PMCD_MAX_CMD_LEN];
- char env_list[HYD_PMCD_MAX_CMD_LEN]; /* FIXME: Wrong *MAX*... */
- int env_list_len = 0;
- char *p = NULL;
- struct HYD_Partition *partition = NULL;
- struct HYD_Partition_exec *exec = NULL;
- struct HYD_Env *env = NULL;
- int HYD_PMCD_pmi_proxy_connfd = -1;
for (partition = handle.partition_list; partition; partition = partition->next) {
- status = HYDU_sock_connect(partition->name, handle.pproxy_port,
- &HYD_PMCD_pmi_proxy_connfd);
- HYDU_ERR_POP(status, "Error connecting to remote proxy");
+ status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
- exec = partition->exec_list;
+ cmd = PROXY_SHUTDOWN;
+ status = HYDU_sock_write(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
- /* FIXME: Create a util func for converting env list to a string */
- env = handle.system_env;
- p = env_list;
- *p = '\0';
- env_list_len = HYD_PMCD_MAX_CMD_LEN;
- while (env) {
- HYDU_snprintf(p, env_list_len, "%s=%s %c",
- env->env_name, env->env_value, HYD_PMCD_CMD_ENV_SEP_CHAR);
- env_list_len -= strlen(p);
- p += strlen(p);
- env = env->next;
+ close(fd);
+ }
+
+ fn_exit:
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+static HYD_Status launch_procs_in_persistent_mode(void)
+{
+ struct HYD_Partition *partition;
+ enum HYD_PMCD_pmi_proxy_cmds cmd;
+ int i, list_len, arg_len, first_partition;
+ HYD_Status status = HYD_SUCCESS;
+
+ /*
+ * Here are the steps we will follow:
+ *
+ * 1. Put all the arguments to pass in to a string list.
+ *
+ * 2. Connect to the proxy (this will be our primary control
+ * socket).
+ *
+ * 3. Read this string list and write the following to the socket:
+ * (a) The PROC_INFO command.
+ * (b) Integer sized data with the number of arguments to
+ * follow.
+ * (c) For each argument to pass, first send an integer which
+ * tells the proxy how many bytes are coming in that
+ * argument.
+ *
+ * 4. Open two new sockets and connect them to the proxy.
+ *
+ * 5. On the first new socket, send USE_AS_STDOUT and the second
+ * send USE_AS_STDERR.
+ *
+ * 6. For PMI_ID "0", open a separate socket and send the
+ * USE_AS_STDIN command on it.
+ *
+ * 7. We need to figure out what to do with the LAUNCH_JOB
+ * command; since it's going on a different socket, it might go
+ * out-of-order. Maybe a state machine on the proxy to see if
+ * it got all the information it needs to launch the job would
+ * work.
+ */
+
+ status = fill_in_proxy_args();
+ HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
+
+ /* Though we don't use the bootstrap server right now, we still
+ * initialize it, as we need to query it for information
+ * sometimes. */
+ status = HYD_BSCI_init(handle.bootstrap);
+ HYDU_ERR_POP(status, "bootstrap server initialization failed\n");
+
+ first_partition = 1;
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ status = HYDU_sock_connect(partition->name, handle.proxy_port,
+ &partition->control_fd);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = PROC_INFO;
+ status = HYDU_sock_write(partition->control_fd, &cmd,
+ sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* Check how many arguments we have */
+ list_len = HYDU_strlist_lastidx(partition->proxy_args);
+ status = HYDU_sock_write(partition->control_fd, &list_len, sizeof(int));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* Convert the string list to parseable data and send */
+ for (i = 0; partition->proxy_args[i]; i++) {
+ arg_len = strlen(partition->proxy_args[i]) + 1;
+
+ status = HYDU_sock_write(partition->control_fd, &arg_len, sizeof(int));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ status = HYDU_sock_write(partition->control_fd, partition->proxy_args[i],
+ arg_len);
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
}
- /* Create launch command */
- HYDU_snprintf(launch_procs_cmd, HYD_PMCD_MAX_CMD_LEN,
- "%s=%s %c %s=%s %c %s=%d %c %s=%s %c\n",
- "cmd", HYD_PMCD_CMD_LAUNCH_PROCS, HYD_PMCD_CMD_SEP_CHAR,
- "exec_name", exec->exec[0], HYD_PMCD_CMD_SEP_CHAR,
- "exec_cnt", exec->proc_count, HYD_PMCD_CMD_SEP_CHAR,
- "env", env_list, HYD_PMCD_CMD_SEP_CHAR);
- status = HYDU_sock_writeline(HYD_PMCD_pmi_proxy_connfd, launch_procs_cmd,
- strlen(launch_procs_cmd));
- HYDU_ERR_POP(status, "Error writing the launch procs command\n");
+ /* Register the control socket with the demux engine */
+ status = HYD_DMX_register_fd(1, &partition->control_fd, HYD_STDOUT, partition,
+ HYD_PMCD_pmi_serv_control_cb);
- /* FIXME: Read result */
- partition->out = HYD_PMCD_pmi_proxy_connfd;
- partition->err = -1;
+ /* Create an stdout socket */
+ status = HYDU_sock_connect(partition->name, handle.proxy_port, &partition->out);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = USE_AS_STDOUT;
+ status = HYDU_sock_write(partition->out, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* Create an stderr socket */
+ status = HYDU_sock_connect(partition->name, handle.proxy_port, &partition->err);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = USE_AS_STDERR;
+ status = HYDU_sock_write(partition->err, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* If rank 0 is here, create an stdin socket */
+ if (first_partition) {
+ status = HYDU_sock_connect(partition->name, handle.proxy_port, &handle.in);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = USE_AS_STDIN;
+ status = HYDU_sock_write(handle.in, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ first_partition = 0;
+ }
}
fn_exit:
return status;
+
fn_fail:
goto fn_exit;
}
@@ -271,7 +394,6 @@
"gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
sport = HYDU_int_to_str(port);
-
HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
HYDU_snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
"%s:%s", hostname, sport);
@@ -292,20 +414,22 @@
status = HYD_PMCD_pmi_create_pg();
HYDU_ERR_POP(status, "unable to create process group\n");
- if (handle.is_proxy_remote) {
- if (handle.is_proxy_terminator) {
- status = shutdown_remote_proxies();
- HYDU_ERR_POP(status, "Error shutting down remote proxies\n");
- }
- else {
- status = launch_procs_with_remote_proxy();
- HYDU_ERR_POP(status, "Error launching procs with remote proxy\n");
- }
+ if (handle.launch_mode == HYD_LAUNCH_RUNTIME) {
+ status = launch_procs_in_runtime_mode();
+ HYDU_ERR_POP(status, "error launching procs in runtime mode\n");
}
- else {
- status = launch_procs_with_local_proxy();
- HYDU_ERR_POP(status, "Error launching procs with local proxy\n");
+ else if (handle.launch_mode == HYD_LAUNCH_BOOT) {
+ status = boot_proxies();
+ HYDU_ERR_POP(status, "error booting proxies\n");
}
+ else if (handle.launch_mode == HYD_LAUNCH_SHUTDOWN) {
+ status = shutdown_proxies();
+ HYDU_ERR_POP(status, "error shutting down proxies\n");
+ }
+ else if (handle.launch_mode == HYD_LAUNCH_PERSISTENT) {
+ status = launch_procs_in_persistent_mode();
+ HYDU_ERR_POP(status, "error launching procs in persistent mode\n");
+ }
fn_exit:
HYDU_FUNC_EXIT();
@@ -318,19 +442,65 @@
HYD_Status HYD_PMCI_wait_for_completion(void)
{
+ struct HYD_Partition *partition;
+ int sockets_open, all_procs_exited;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- if (handle.is_proxy_remote) {
+ if ((handle.launch_mode == HYD_LAUNCH_BOOT) ||
+ (handle.launch_mode == HYD_LAUNCH_SHUTDOWN)) {
status = HYD_SUCCESS;
}
else {
- status = HYD_BSCI_wait_for_completion();
- if (status != HYD_SUCCESS) {
- status = HYD_PMCD_pmi_serv_cleanup();
- HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+ while (1) {
+ /* Wait for some event to occur */
+ status = HYD_DMX_wait_for_event(HYDU_time_left(handle.start, handle.timeout));
+ HYDU_ERR_POP(status, "error waiting for event\n");
+
+ /* Check to see if there's any open read socket left; if
+ * there are, we will just wait for more events. */
+ sockets_open = 0;
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ if (partition->out != -1 || partition->err != -1) {
+ sockets_open++;
+ break;
+ }
+ }
+
+ if (sockets_open && HYDU_time_left(handle.start, handle.timeout))
+ continue;
+
+ break;
}
+
+ /* The bootstrap will wait for all processes to terminate */
+ if (handle.launch_mode == HYD_LAUNCH_RUNTIME) {
+ status = HYD_BSCI_wait_for_completion();
+ if (status != HYD_SUCCESS) {
+ status = HYD_PMCD_pmi_serv_cleanup();
+ HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+ }
+ }
+ else if (handle.launch_mode == HYD_LAUNCH_PERSISTENT) {
+ do {
+ /* Check if the exit status has already arrived */
+ all_procs_exited = 1;
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ if (partition->exit_status == -1) {
+ all_procs_exited = 0;
+ break;
+ }
+ }
+
+ if (all_procs_exited)
+ break;
+
+ /* If not, wait for some event to occur */
+ status = HYD_DMX_wait_for_event(HYDU_time_left(handle.start, handle.timeout));
+ HYDU_ERR_POP(status, "error waiting for event\n");
+ } while (1);
+ }
}
fn_exit:
Modified: mpich2/trunk/src/pm/hydra/utils/launch/allocate.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/launch/allocate.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/utils/launch/allocate.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -22,6 +22,7 @@
(*partition)->out = -1;
(*partition)->err = -1;
(*partition)->exit_status = -1;
+ (*partition)->control_fd = -1;
(*partition)->proxy_args[0] = NULL;
(*partition)->exec_list = NULL;
Modified: mpich2/trunk/src/pm/hydra/utils/sock/sock.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/sock/sock.c 2009-03-28 21:38:11 UTC (rev 4221)
+++ mpich2/trunk/src/pm/hydra/utils/sock/sock.c 2009-03-30 03:57:55 UTC (rev 4222)
@@ -134,7 +134,7 @@
* return an error, but only print a warning message. The upper
* layer can decide what to do with the return status. */
if (connect(*fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) {
- HYDU_Warn_printf("connect error (%s)\n", HYDU_strerror(errno));
+ HYDU_Error_printf("connect error (%s)\n", HYDU_strerror(errno));
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -375,8 +375,8 @@
}
-HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, char *buf, int *buf_count,
- int *buf_offset, int *closed)
+HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, int stdin_fd, char *buf,
+ int *buf_count, int *buf_offset, int *closed)
{
int count;
HYD_Status status = HYD_SUCCESS;
@@ -401,7 +401,7 @@
}
/* If we are still here, we need to refill our temporary buffer */
- count = read(0, buf, HYD_TMPBUF_SIZE);
+ count = read(stdin_fd, buf, HYD_TMPBUF_SIZE);
if (count < 0) {
if (errno == EINTR || errno == EAGAIN) {
/* This call was interrupted or there was no data to read; just break out. */
More information about the mpich2-commits
mailing list