[mpich2-commits] r3992 - in mpich2/trunk/src/pm/hydra: bootstrap/ssh bootstrap/utils control/consys control/utils demux include launcher/mpiexec launcher/utils pm/central pm/utils utils/env utils/launch utils/sock
balaji at mcs.anl.gov
balaji at mcs.anl.gov
Mon Mar 9 23:26:16 CDT 2009
Author: balaji
Date: 2009-03-09 23:26:16 -0500 (Mon, 09 Mar 2009)
New Revision: 3992
Modified:
mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_signal.c
mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c
mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
mpich2/trunk/src/pm/hydra/control/utils/csiu.c
mpich2/trunk/src/pm/hydra/demux/demux.c
mpich2/trunk/src/pm/hydra/demux/demux.h
mpich2/trunk/src/pm/hydra/include/hydra.h
mpich2/trunk/src/pm/hydra/include/hydra_env.h
mpich2/trunk/src/pm/hydra/include/hydra_launch.h
mpich2/trunk/src/pm/hydra/include/hydra_sock.h
mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
mpich2/trunk/src/pm/hydra/pm/central/central_cb.c
mpich2/trunk/src/pm/hydra/pm/central/central_finalize.c
mpich2/trunk/src/pm/hydra/pm/central/central_launch.c
mpich2/trunk/src/pm/hydra/pm/central/proxy.c
mpich2/trunk/src/pm/hydra/pm/central/proxy.h
mpich2/trunk/src/pm/hydra/pm/central/proxy_cb.c
mpich2/trunk/src/pm/hydra/pm/central/proxy_utils.c
mpich2/trunk/src/pm/hydra/pm/utils/pmi.c
mpich2/trunk/src/pm/hydra/utils/env/env.c
mpich2/trunk/src/pm/hydra/utils/launch/allocate.c
mpich2/trunk/src/pm/hydra/utils/launch/args.c
mpich2/trunk/src/pm/hydra/utils/launch/launch.c
mpich2/trunk/src/pm/hydra/utils/sock/sock.c
Log:
First working draft of the proxy. The proxy still doesn't handle
stdin. Also, the interface between the proxy and the boot-strap server
is broken; so a proxy has no way to launch more proxies. Thus, only a
one-level hierarchy of proxies is supported right now.
Modified: mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/bootstrap/ssh/ssh_launch.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -31,8 +31,7 @@
status = HYD_BSCU_Set_common_signals(HYD_BSCU_Signal_handler);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("signal utils returned error when trying to set signal\n");
+ HYDU_Error_printf("signal utils returned error when trying to set signal\n");
goto fn_fail;
}
@@ -41,10 +40,8 @@
* they want launched. Without this functionality, the proxy
* cannot use this and will have to perfom its own launch. */
process_id = 0;
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition;
- partition = partition->next) {
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->group_rank) /* Only rank 0 is spawned */
continue;
@@ -72,11 +69,9 @@
status =
HYDU_Create_process(client_arg,
(process_id == 0 ? &handle.in : NULL),
- &partition->out, &partition->err,
- &partition->pid);
+ &partition->out, &partition->err, &partition->pid);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("bootstrap spawn process returned error\n");
+ HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
}
@@ -110,10 +105,8 @@
HYDU_FUNC_ENTER();
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition;
- partition = partition->next) {
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
/* Setup the executable arguments */
arg = 0;
client_arg[arg++] = MPIU_Strdup("/usr/bin/ssh");
@@ -135,11 +128,9 @@
client_arg[arg++] = MPIU_Strdup(execname);
client_arg[arg++] = NULL;
- status =
- HYDU_Create_process(client_arg, NULL, NULL, NULL, NULL);
+ status = HYDU_Create_process(client_arg, NULL, NULL, NULL, NULL);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("bootstrap spawn process returned error\n");
+ HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
}
Modified: mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_signal.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_signal.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_signal.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -17,30 +17,26 @@
status = HYDU_Set_signal(SIGINT, handler);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("signal utils returned error when trying to set SIGINT signal\n");
+ HYDU_Error_printf("signal utils returned error when trying to set SIGINT signal\n");
goto fn_fail;
}
status = HYDU_Set_signal(SIGQUIT, handler);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("signal utils returned error when trying to set SIGQUIT signal\n");
+ HYDU_Error_printf("signal utils returned error when trying to set SIGQUIT signal\n");
goto fn_fail;
}
status = HYDU_Set_signal(SIGTERM, handler);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("signal utils returned error when trying to set SIGTERM signal\n");
+ HYDU_Error_printf("signal utils returned error when trying to set SIGTERM signal\n");
goto fn_fail;
}
#if defined SIGSTOP
status = HYDU_Set_signal(SIGSTOP, handler);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("signal utils returned error when trying to set SIGSTOP signal\n");
+ HYDU_Error_printf("signal utils returned error when trying to set SIGSTOP signal\n");
goto fn_fail;
}
#endif /* SIGSTOP */
@@ -48,8 +44,7 @@
#if defined SIGCONT
status = HYDU_Set_signal(SIGCONT, handler);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("signal utils returned error when trying to set SIGCONT signal\n");
+ HYDU_Error_printf("signal utils returned error when trying to set SIGCONT signal\n");
goto fn_fail;
}
#endif /* SIGCONT */
Modified: mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/bootstrap/utils/bscu_wait.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -27,10 +27,8 @@
HYDU_FUNC_ENTER();
not_completed = 0;
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next)
- for (partition = proc_params->partition; partition;
- partition = partition->next)
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next)
+ for (partition = proc_params->partition; partition; partition = partition->next)
if (partition->exit_status == -1)
not_completed++;
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_close.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_close.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_close.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -25,19 +25,15 @@
/* Deregister the FD with the demux engine and close it. */
status = HYD_DMX_Deregister_fd(fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when deregistering fd: %d\n",
- fd);
+ HYDU_Error_printf("demux engine returned error when deregistering fd: %d\n", fd);
goto fn_fail;
}
close(fd);
/* Find the FD in the handle and remove it. */
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition;
- partition = partition->next) {
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->out == fd) {
partition->out = -1;
goto fn_exit;
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_launch.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -23,30 +23,23 @@
status = HYD_PMCI_Launch_procs();
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("process manager returned error when launching processes\n");
+ HYDU_Error_printf("process manager returned error when launching processes\n");
goto fn_fail;
}
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition;
- partition = partition->next) {
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
status =
- HYD_DMX_Register_fd(1, &partition->out, HYD_STDOUT,
- proc_params->stdout_cb);
+ HYD_DMX_Register_fd(1, &partition->out, HYD_STDOUT, proc_params->stdout_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when registering fd\n");
+ HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
status =
- HYD_DMX_Register_fd(1, &partition->err, HYD_STDOUT,
- proc_params->stderr_cb);
+ HYD_DMX_Register_fd(1, &partition->err, HYD_STDOUT, proc_params->stderr_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when registering fd\n");
+ HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
}
@@ -71,11 +64,9 @@
handle.stdin_buf_count = 0;
handle.stdin_buf_offset = 0;
- status =
- HYD_DMX_Register_fd(1, &stdin_fd, HYD_STDOUT, handle.stdin_cb);
+ status = HYD_DMX_Register_fd(1, &stdin_fd, HYD_STDOUT, handle.stdin_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when registering fd\n");
+ HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
}
Modified: mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/control/consys/consys_wait.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -25,18 +25,15 @@
/* Wait for some event to occur */
status = HYD_DMX_Wait_for_event();
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when waiting for event\n");
+ HYDU_Error_printf("demux engine returned error when waiting for event\n");
goto fn_fail;
}
/* Check to see if there's any open read socket left; if there
* are, we will just wait for more events. */
sockets_open = 0;
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition;
- partition = partition->next) {
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->out != -1 || partition->err != -1) {
sockets_open++;
break;
@@ -53,8 +50,7 @@
* control device will take care of that. */
status = HYD_BSCI_Wait_for_completion();
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("bootstrap server returned error when waiting for completion\n");
+ HYDU_Error_printf("bootstrap server returned error when waiting for completion\n");
goto fn_fail;
}
Modified: mpich2/trunk/src/pm/hydra/control/utils/csiu.c
===================================================================
--- mpich2/trunk/src/pm/hydra/control/utils/csiu.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/control/utils/csiu.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -21,9 +21,7 @@
}
else {
gettimeofday(&now, NULL);
- time_left =
- (1000 *
- (handle.timeout.tv_sec - now.tv_sec + handle.start.tv_sec));
+ time_left = (1000 * (handle.timeout.tv_sec - now.tv_sec + handle.start.tv_sec));
if (time_left < 0)
time_left = 0;
}
Modified: mpich2/trunk/src/pm/hydra/demux/demux.c
===================================================================
--- mpich2/trunk/src/pm/hydra/demux/demux.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/demux/demux.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -44,8 +44,7 @@
}
HYD_Status HYD_DMX_Register_fd(int num_fds, int *fd, HYD_Event_t events,
- HYD_Status(*callback) (int fd,
- HYD_Event_t events))
+ HYD_Status(*callback) (int fd, HYD_Event_t events))
{
HYD_DMXI_Callback_t *cb_element, *run;
int i;
@@ -61,8 +60,7 @@
}
}
- HYDU_MALLOC(cb_element, HYD_DMXI_Callback_t *,
- sizeof(HYD_DMXI_Callback_t), status);
+ HYDU_MALLOC(cb_element, HYD_DMXI_Callback_t *, sizeof(HYD_DMXI_Callback_t), status);
cb_element->num_fds = num_fds;
HYDU_MALLOC(cb_element->fd, int *, num_fds * sizeof(int), status);
memcpy(cb_element->fd, fd, num_fds * sizeof(int));
@@ -134,8 +132,7 @@
HYDU_FUNC_ENTER();
- HYDU_MALLOC(pollfds, struct pollfd *,
- num_cb_fds * sizeof(struct pollfd), status);
+ HYDU_MALLOC(pollfds, struct pollfd *, num_cb_fds * sizeof(struct pollfd), status);
run = cb_list;
i = 0;
@@ -188,8 +185,7 @@
status = run->callback(pollfds[i].fd, events);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf("callback returned error status\n",
- errno);
+ HYDU_Error_printf("callback returned error status\n", errno);
goto fn_fail;
}
}
Modified: mpich2/trunk/src/pm/hydra/demux/demux.h
===================================================================
--- mpich2/trunk/src/pm/hydra/demux/demux.h 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/demux/demux.h 2009-03-10 04:26:16 UTC (rev 3992)
@@ -10,8 +10,7 @@
#include "hydra.h"
HYD_Status HYD_DMX_Register_fd(int num_fds, int *fd, HYD_Event_t events,
- HYD_Status(*callback) (int fd,
- HYD_Event_t events));
+ HYD_Status(*callback) (int fd, HYD_Event_t events));
HYD_Status HYD_DMX_Deregister_fd(int fd);
HYD_Status HYD_DMX_Wait_for_event(void);
HYD_Status HYD_DMX_Finalize(void);
Modified: mpich2/trunk/src/pm/hydra/include/hydra.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra.h 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/include/hydra.h 2009-03-10 04:26:16 UTC (rev 3992)
@@ -52,11 +52,11 @@
#endif /* MANUAL_EXTERN_ENVIRON */
typedef enum {
+ HYD_SUCCESS = 0,
HYD_NO_MEM,
HYD_SOCK_ERROR,
HYD_INVALID_PARAM,
- HYD_INTERNAL_ERROR,
- HYD_SUCCESS = 0
+ HYD_INTERNAL_ERROR
} HYD_Status;
#define HYD_STDOUT (1)
Modified: mpich2/trunk/src/pm/hydra/include/hydra_env.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_env.h 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/include/hydra_env.h 2009-03-10 04:26:16 UTC (rev 3992)
@@ -12,13 +12,13 @@
HYD_Status HYDU_Env_global_list(HYD_Env_t ** env_list);
char *HYDU_Env_type_str(HYD_Env_type_t type);
HYD_Env_t *HYDU_Env_dup(HYD_Env_t env);
-HYD_Env_t *HYDU_Env_found_in_list(HYD_Env_t * env_list, HYD_Env_t * env);
+HYD_Env_t *HYDU_Env_found_in_list(HYD_Env_t * env_list, HYD_Env_t env);
HYD_Status HYDU_Env_add_to_list(HYD_Env_t ** env_list, HYD_Env_t env);
HYD_Env_t *HYDU_Env_listdup(HYD_Env_t * env);
HYD_Status HYDU_Env_create(HYD_Env_t ** env, char *env_name,
- char *env_value, HYD_Env_type_t env_type,
- int start);
+ char *env_value, HYD_Env_type_t env_type, int start);
HYD_Status HYDU_Env_free(HYD_Env_t * env);
HYD_Status HYDU_Env_free_list(HYD_Env_t * env);
+HYD_Status HYDU_Env_putenv(HYD_Env_t env);
#endif /* HYDRA_ENV_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/include/hydra_launch.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_launch.h 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/include/hydra_launch.h 2009-03-10 04:26:16 UTC (rev 3992)
@@ -9,12 +9,11 @@
#include "hydra.h"
-HYD_Status HYDU_Append_env(HYD_Env_t * env_list, char **client_arg,
- int id);
+HYD_Status HYDU_Append_env(HYD_Env_t * env_list, char **client_arg, int id);
HYD_Status HYDU_Append_exec(char **exec, char **client_arg);
-HYD_Status HYDU_Append_wdir(char **client_arg);
+HYD_Status HYDU_Append_wdir(char **client_arg, char *wdir);
HYD_Status HYDU_Allocate_Partition(struct HYD_Partition_list **partition);
-HYD_Status HYDU_Create_process(char **client_arg, int *in, int *out,
- int *err, int *pid);
+HYD_Status HYDU_Create_process(char **client_arg, int *in, int *out, int *err, int *pid);
+HYD_Status HYDU_Dump_args(char **args);
#endif /* HYDRA_LAUNCH_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/include/hydra_sock.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_sock.h 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/include/hydra_sock.h 2009-03-10 04:26:16 UTC (rev 3992)
@@ -19,8 +19,7 @@
#define size_t unsigned int
#endif /* size_t */
-HYD_Status HYDU_Sock_listen(int *listen_fd, char *port_range,
- uint16_t * port);
+HYD_Status HYDU_Sock_listen(int *listen_fd, char *port_range, uint16_t * port);
HYD_Status HYDU_Sock_connect(const char *host, uint16_t port, int *fd);
HYD_Status HYDU_Sock_accept(int listen_fd, int *fd);
HYD_Status HYDU_Sock_readline(int fd, char *buf, int maxlen, int *linelen);
@@ -29,5 +28,8 @@
HYD_Status HYDU_Sock_write(int fd, char *buf, int maxsize);
HYD_Status HYDU_Sock_set_nonblock(int fd);
HYD_Status HYDU_Sock_set_cloexec(int fd);
+HYD_Status HYDU_Sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed);
+HYD_Status HYDU_Sock_stdin_cb(int fd, HYD_Event_t events, char *buf, int *buf_count,
+ int *buf_offset, int *closed);
#endif /* HYDRA_SOCKS_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/callback.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -13,45 +13,27 @@
HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events)
{
- int count;
- char buf[HYD_TMPBUF_SIZE];
+ int closed;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- if (events & HYD_STDIN) {
- HYDU_Error_printf("stdout handler got an stdin event: %d\n",
- events);
- status = HYD_INTERNAL_ERROR;
+ /* Write output to fd 1 */
+ status = HYDU_Sock_stdout_cb(fd, events, 1, &closed);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("socket stdout callback error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
- count = read(fd, buf, HYD_TMPBUF_SIZE);
- if (count < 0) {
- HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
- else if (count == 0) {
- /* The connection has closed */
+ if (closed) {
status = HYD_CSI_Close_fd(fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n",
- fd, errno);
+ HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
goto fn_exit;
}
- count = write(1, buf, count);
- if (count < 0) {
- HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
-
fn_exit:
HYDU_FUNC_EXIT();
return status;
@@ -63,45 +45,28 @@
HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events)
{
- int count;
+ int count, closed;
char buf[HYD_TMPBUF_SIZE];
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- if (events & HYD_STDIN) {
- HYDU_Error_printf("stderr handler got an stdin event: %d\n",
- events);
- status = HYD_INTERNAL_ERROR;
+ /* Write output to fd 2 */
+ status = HYDU_Sock_stdout_cb(fd, events, 2, &closed);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("socket stdout callback error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
- count = read(fd, buf, HYD_TMPBUF_SIZE);
- if (count < 0) {
- HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
- else if (count == 0) {
- /* The connection has closed */
+ if (closed) {
status = HYD_CSI_Close_fd(fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n",
- fd, errno);
+ HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
goto fn_exit;
}
- count = write(2, buf, count);
- if (count < 0) {
- HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
-
fn_exit:
HYDU_FUNC_EXIT();
return status;
@@ -113,64 +78,25 @@
HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events)
{
- int count;
+ int count, closed;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- if (events & HYD_STDIN) {
- HYDU_Error_printf
- ("stdin handler got a writeable event on local stdin: %d\n",
- events);
- status = HYD_INTERNAL_ERROR;
+ status = HYDU_Sock_stdin_cb(fd, events, handle.stdin_tmp_buf,
+ &handle.stdin_buf_count, &handle.stdin_buf_offset, &closed);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("sock stdin callback returned an error\n");
+ status = HYD_SOCK_ERROR;
goto fn_fail;
}
- while (1) {
- /* If we already have buffered data, send it out */
- if (handle.stdin_buf_count) {
- count =
- write(handle.in,
- handle.stdin_tmp_buf + handle.stdin_buf_offset,
- handle.stdin_buf_count);
- if (count < 0) {
- /* We can't get an EAGAIN as we just got out of poll */
- HYDU_Error_printf
- ("socket write error on fd: %d (errno: %d)\n",
- handle.in, errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
- handle.stdin_buf_offset += count;
- handle.stdin_buf_count -= count;
- break;
- }
-
- /* If we are still here, we need to refill our temporary buffer */
- count = read(0, handle.stdin_tmp_buf, HYD_TMPBUF_SIZE);
- if (count < 0) {
- if (errno == EINTR || errno == EAGAIN) {
- /* This call was interrupted or there was no data to read; just break out. */
- break;
- }
-
- HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n",
- fd, errno);
- status = HYD_SOCK_ERROR;
+ if (closed) {
+ status = HYD_CSI_Close_fd(fd);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
- else if (count == 0) {
- /* The connection has closed */
- status = HYD_CSI_Close_fd(fd);
- if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("socket close error on fd: %d (errno: %d)\n", fd,
- errno);
- goto fn_fail;
- }
- break;
- }
- handle.stdin_buf_count += count;
}
fn_exit:
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/mpiexec.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -17,37 +17,26 @@
static void usage(void)
{
printf("\n");
- printf
- ("Usage: ./mpiexec [global opts] [exec1 local opts] : [exec2 local opts] : ...\n\n");
+ printf("Usage: ./mpiexec [global opts] [exec1 local opts] : [exec2 local opts] : ...\n\n");
printf("Global Options (passed to all executables):\n");
printf("\t-v/-vv/-vvv [Verbose level]\n");
- printf
- ("\t--enable-x/--disable-x [Enable or disable X forwarding]\n");
- printf
- ("\t-genv {name} {value} [Environment variable name and value]\n");
- printf
- ("\t-genvlist {env1,env2,...} [Environment variable list to pass]\n");
- printf
- ("\t-genvnone [Do not pass any environment variables]\n");
- printf
- ("\t-genvall [Pass all environment variables (default)]\n");
+ printf("\t--enable-x/--disable-x [Enable or disable X forwarding]\n");
+ printf("\t-genv {name} {value} [Environment variable name and value]\n");
+ printf("\t-genvlist {env1,env2,...} [Environment variable list to pass]\n");
+ printf("\t-genvnone [Do not pass any environment variables]\n");
+ printf("\t-genvall [Pass all environment variables (default)]\n");
printf("\n");
printf("Local Options (passed to individual executables):\n");
printf("\t-n/-np {value} [Number of processes]\n");
+ printf("\t-f {name} [File containing the host names]\n");
+ printf("\t-env {name} {value} [Environment variable name and value]\n");
+ printf("\t-envlist {env1,env2,...} [Environment variable list to pass]\n");
+ printf("\t-envnone [Do not pass any environment variables]\n");
+ printf("\t-envall [Pass all environment variables (default)]\n");
printf
- ("\t-f {name} [File containing the host names]\n");
- printf
- ("\t-env {name} {value} [Environment variable name and value]\n");
- printf
- ("\t-envlist {env1,env2,...} [Environment variable list to pass]\n");
- printf
- ("\t-envnone [Do not pass any environment variables]\n");
- printf
- ("\t-envall [Pass all environment variables (default)]\n");
- printf
("\t{exec_name} {args} [Name of the executable to run and its arguments]\n");
printf("\n");
@@ -110,8 +99,7 @@
/* Launch the processes */
status = HYD_CSI_Launch_procs();
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("control system returned error when launching processes\n");
+ HYDU_Error_printf("control system returned error when launching processes\n");
goto fn_fail;
}
@@ -125,10 +113,8 @@
/* Check for the exit status for all the processes */
exit_status = 0;
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next)
- for (partition = proc_params->partition; partition;
- partition = partition->next)
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next)
+ for (partition = proc_params->partition; partition; partition = partition->next)
exit_status |= partition->exit_status;
/* Call finalize functions for lower layers to cleanup their resources */
Modified: mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/launcher/mpiexec/utils.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -40,8 +40,7 @@
HYDU_FUNC_ENTER();
- HYDU_MALLOC(proc_params, struct HYD_Proc_params *,
- sizeof(struct HYD_Proc_params), status);
+ HYDU_MALLOC(proc_params, struct HYD_Proc_params *, sizeof(struct HYD_Proc_params), status);
proc_params->exec_proc_count = 0;
proc_params->partition = NULL;
@@ -129,23 +128,20 @@
while (--argc && ++argv) {
/* Help options */
- if (!strcmp(*argv, "-h") || !strcmp(*argv, "--help") ||
- !strcmp(*argv, "-help")) {
+ if (!strcmp(*argv, "-h") || !strcmp(*argv, "--help") || !strcmp(*argv, "-help")) {
/* Just return from this function; the main code will show the usage */
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
/* Check what debug level is requested */
- if (!strcmp(*argv, "-v") || !strcmp(*argv, "-vv") ||
- !strcmp(*argv, "-vvv")) {
+ if (!strcmp(*argv, "-v") || !strcmp(*argv, "-vv") || !strcmp(*argv, "-vvv")) {
CHECK_LOCAL_PARAM_START(local_params_started, status);
/* Debug level already set */
if (handle.debug != -1) {
HYDU_Error_printf
- ("Duplicate debug level setting; previously set to %d\n",
- handle.debug);
+ ("Duplicate debug level setting; previously set to %d\n", handle.debug);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
@@ -167,8 +163,7 @@
/* X forwarding already enabled or disabled */
if (handle.enablex != -1) {
HYDU_Error_printf
- ("Duplicate enable-x setting; previously set to %d\n",
- handle.enablex);
+ ("Duplicate enable-x setting; previously set to %d\n", handle.enablex);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
@@ -185,8 +180,7 @@
/* propagation already set */
if (handle.prop != HYD_ENV_PROP_UNSET) {
HYDU_Error_printf
- ("Duplicate propagation setting; previously set to %d\n",
- handle.prop);
+ ("Duplicate propagation setting; previously set to %d\n", handle.prop);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
@@ -205,9 +199,7 @@
if (env_name == NULL)
break;
- status =
- HYDU_Env_create(&env, env_name, NULL,
- HYD_ENV_STATIC, 0);
+ status = HYDU_Env_create(&env, env_name, NULL, HYD_ENV_STATIC, 0);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("unable to create env struct\n");
goto fn_fail;
@@ -230,8 +222,7 @@
status = get_current_proc_params(&proc_params);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("get_current_proc_params returned error\n");
+ HYDU_Error_printf("get_current_proc_params returned error\n");
goto fn_fail;
}
@@ -258,16 +249,13 @@
if (env_name == NULL)
break;
- status =
- HYDU_Env_create(&env, env_name, NULL,
- HYD_ENV_STATIC, 0);
+ status = HYDU_Env_create(&env, env_name, NULL, HYD_ENV_STATIC, 0);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("unable to create env struct\n");
goto fn_fail;
}
- status =
- HYDU_Env_add_to_list(&proc_params->prop_env, *env);
+ status = HYDU_Env_add_to_list(&proc_params->prop_env, *env);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("unable to add env to list\n");
goto fn_fail;
@@ -288,9 +276,7 @@
CHECK_NEXT_ARG_VALID(status);
env_value = MPIU_Strdup(*argv);
- status =
- HYDU_Env_create(&env, env_name, env_value, HYD_ENV_STATIC,
- 0);
+ status = HYDU_Env_create(&env, env_name, env_value, HYD_ENV_STATIC, 0);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("unable to create env struct\n");
goto fn_fail;
@@ -303,8 +289,7 @@
status = get_current_proc_params(&proc_params);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("get_current_proc_params returned error\n");
+ HYDU_Error_printf("get_current_proc_params returned error\n");
goto fn_fail;
}
@@ -328,8 +313,7 @@
status = get_current_proc_params(&proc_params);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("get_current_proc_params returned error\n");
+ HYDU_Error_printf("get_current_proc_params returned error\n");
goto fn_fail;
}
@@ -368,8 +352,7 @@
if (!strcmp(*argv, ":")) { /* Next executable */
status = allocate_proc_params(&proc_params->next);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("allocate_proc_params returned error\n");
+ HYDU_Error_printf("allocate_proc_params returned error\n");
goto fn_fail;
}
break;
@@ -403,8 +386,7 @@
if (handle.wdir == NULL) {
HYDU_MALLOC(handle.wdir, char *, HYDRA_MAX_PATH, status);
if (getcwd(handle.wdir, HYDRA_MAX_PATH) == NULL) {
- HYDU_Error_printf
- ("allocated space is too small for absolute path\n");
+ HYDU_Error_printf("allocated space is too small for absolute path\n");
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
Modified: mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c
===================================================================
--- mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/launcher/utils/lchu.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -23,8 +23,7 @@
if (strcmp(handle.host_file, "HYDRA_USE_LOCALHOST")) {
fp = fopen(handle.host_file, "r");
if (fp == NULL) {
- HYDU_Error_printf("unable to open host file %s\n",
- handle.host_file);
+ HYDU_Error_printf("unable to open host file %s\n", handle.host_file);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
@@ -35,16 +34,14 @@
if (!strcmp(handle.host_file, "HYDRA_USE_LOCALHOST")) {
HYDU_Allocate_Partition(&proc_params->partition);
proc_params->partition->name = MPIU_Strdup("localhost");
- proc_params->partition->proc_count =
- proc_params->exec_proc_count;
+ proc_params->partition->proc_count = proc_params->exec_proc_count;
total_procs = proc_params->exec_proc_count;
}
else {
total_procs = 0;
while (!feof(fp)) {
if ((fscanf(fp, "%s", line) < 0) && errno) {
- HYDU_Error_printf
- ("unable to read input line (errno: %d)\n", errno);
+ HYDU_Error_printf("unable to read input line (errno: %d)\n", errno);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
@@ -55,10 +52,8 @@
procs = strtok(NULL, ":");
num_procs = procs ? atoi(procs) : 1;
- if (num_procs >
- (proc_params->exec_proc_count - total_procs))
- num_procs =
- (proc_params->exec_proc_count - total_procs);
+ if (num_procs > (proc_params->exec_proc_count - total_procs))
+ num_procs = (proc_params->exec_proc_count - total_procs);
if (!proc_params->partition) {
HYDU_Allocate_Partition(&proc_params->partition);
@@ -85,8 +80,7 @@
}
if (proc_params) {
- HYDU_Error_printf("Not enough number of hosts in host file: %s\n",
- handle.host_file);
+ HYDU_Error_printf("Not enough number of hosts in host file: %s\n", handle.host_file);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
@@ -111,8 +105,7 @@
HYDU_FUNC_ENTER();
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition;) {
HYDU_FREE(partition->name);
if (partition->mapping) {
@@ -158,8 +151,7 @@
if (proc_params->prop == HYD_ENV_PROP_ALL) {
proc_params->prop_env = HYDU_Env_listdup(handle.global_env);
for (env = proc_params->user_env; env; env = env->next) {
- status =
- HYDU_Env_add_to_list(&proc_params->prop_env, *env);
+ status = HYDU_Env_add_to_list(&proc_params->prop_env, *env);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("unable to add env to list\n");
goto fn_fail;
@@ -210,10 +202,8 @@
HYDU_FUNC_ENTER();
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition;
- partition = partition->next) {
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
HYDU_FREE(partition->out);
HYDU_FREE(partition->err);
}
Modified: mpich2/trunk/src/pm/hydra/pm/central/central_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/central_cb.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/central/central_cb.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -55,25 +55,20 @@
if (fd == HYD_PMCD_Central_listenfd) { /* Someone is trying to connect to us */
status = HYDU_Sock_accept(fd, &accept_fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when accepting connection\n");
+ HYDU_Error_printf("sock utils returned error when accepting connection\n");
goto fn_fail;
}
- status =
- HYD_DMX_Register_fd(1, &accept_fd, HYD_STDOUT,
- HYD_PMCD_Central_cb);
+ status = HYD_DMX_Register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_Central_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when registering fd\n");
+ HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
}
else {
status = HYDU_Sock_readline(fd, buf, HYD_TMPBUF_SIZE, &linelen);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when reading PMI line\n");
+ HYDU_Error_printf("sock utils returned error when reading PMI line\n");
goto fn_fail;
}
@@ -142,9 +137,7 @@
}
else {
/* We don't understand the command */
- HYDU_Error_printf
- ("Unrecognized PMI command: %s; cleaning up processes\n",
- cmd);
+ HYDU_Error_printf("Unrecognized PMI command: %s; cleaning up processes\n", cmd);
/* Cleanup all the processes and return. We don't need to
* check the return status since we are anyway returning
Modified: mpich2/trunk/src/pm/hydra/pm/central/central_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/central_finalize.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/central/central_finalize.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -23,8 +23,7 @@
* it. */
status = HYD_DMX_Deregister_fd(HYD_PMCD_Central_listenfd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when deregistering fd\n");
+ HYDU_Error_printf("demux engine returned error when deregistering fd\n");
goto fn_fail;
}
Modified: mpich2/trunk/src/pm/hydra/pm/central/central_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/central_launch.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/central/central_launch.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -37,9 +37,9 @@
*/
HYD_Status HYD_PMCI_Launch_procs(void)
{
- char *port_range, *port_str, *sport;
+ char *port_range, *port_str, *sport, *str;
uint16_t low_port, high_port, port;
- int one = 1, i, arg;
+ int one = 1, i, j, arg;
int num_procs, process_id, group_id;
char hostname[MAX_HOSTNAME_LEN];
struct sockaddr_in sa;
@@ -59,8 +59,7 @@
port_range = getenv("MPICH_PORT_RANGE");
/* Listen on a port in the port range */
- status =
- HYDU_Sock_listen(&HYD_PMCD_Central_listenfd, port_range, &port);
+ status = HYDU_Sock_listen(&HYD_PMCD_Central_listenfd, port_range, &port);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("sock utils returned listen error\n");
goto fn_fail;
@@ -68,31 +67,26 @@
/* Register the listening socket with the demux engine */
status =
- HYD_DMX_Register_fd(1, &HYD_PMCD_Central_listenfd, HYD_STDOUT,
- HYD_PMCD_Central_cb);
+ HYD_DMX_Register_fd(1, &HYD_PMCD_Central_listenfd, HYD_STDOUT, HYD_PMCD_Central_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error for registering fd\n");
+ HYDU_Error_printf("demux engine returned error for registering fd\n");
goto fn_fail;
}
/* Create a port string for MPI processes to use to connect to */
if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0) {
- HYDU_Error_printf("gethostname error (hostname: %s; errno: %d)\n",
- hostname, errno);
+ HYDU_Error_printf("gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
HYDU_Int_to_str(port, sport, status);
- HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1,
- status);
+ HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
MPIU_Snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
"%s:%s", hostname, sport);
HYDU_FREE(sport);
- status =
- HYDU_Env_create(&env, "PMI_PORT", port_str, HYD_ENV_STATIC, 0);
+ status = HYDU_Env_create(&env, "PMI_PORT", port_str, HYD_ENV_STATIC, 0);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("unable to create env\n");
goto fn_fail;
@@ -103,6 +97,7 @@
goto fn_fail;
}
HYDU_Env_free(env);
+ HYDU_FREE(port_str);
status = HYDU_Env_create(&env, "PMI_ID", NULL, HYD_ENV_AUTOINC, 0);
if (status != HYD_SUCCESS) {
@@ -115,54 +110,24 @@
goto fn_fail;
}
HYDU_Env_free(env);
- HYDU_FREE(port_str);
/* Create a process group for the MPI processes in this
* comm_world */
status = HYD_PMCU_Create_pg();
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("pm utils returned error creating process group\n");
+ HYDU_Error_printf("pm utils returned error creating process group\n");
goto fn_fail;
}
- /* FIXME: Temporary hack for testing till the proxy is in shape to
- * be used -- we just break the partition list to multiple
- * segments, one for each process and call the application
- * executable directly. */
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- group_id = 0;
- for (partition = proc_params->partition; partition;) {
- next_partition = partition->next; /* Keep track of the next partition */
+ /* Create the arguments list for each proxy */
+ process_id = 0;
+ group_id = 0;
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
partition->group_id = group_id++;
partition->group_rank = 0;
- run = partition;
- for (process_id = 1; process_id < partition->proc_count;
- process_id++) {
- HYDU_Allocate_Partition(&run->next);
- run = run->next;
-
- run->name = MPIU_Strdup(partition->name);
- run->proc_count = 1;
- run->group_id = group_id++;
- run->group_rank = 0;
- }
-
- partition->proc_count = 1;
- run->next = next_partition;
- partition = next_partition;
- }
- }
-
- /* Create the arguments list for each proxy */
- process_id = 0;
- for (proc_params = handle.proc_params; proc_params;
- proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition;
- partition = partition->next) {
/* Setup the executable arguments */
arg = 0;
partition->args[arg++] = MPIU_Strdup("sh");
@@ -170,18 +135,51 @@
partition->args[arg++] = MPIU_Strdup("\"");
partition->args[arg++] = NULL;
- HYDU_Append_env(handle.system_env, partition->args,
- process_id);
- HYDU_Append_env(proc_params->prop_env, partition->args,
- process_id);
- HYDU_Append_wdir(partition->args);
+ /* Pass the entire environment here; the proxy will cherry
+ * pick from this. */
+ HYDU_Append_env(handle.system_env, partition->args, process_id);
+ HYDU_Append_env(proc_params->prop_env, partition->args, process_id);
+ HYDU_Append_wdir(partition->args, handle.wdir);
+
+ for (arg = 0; partition->args[arg]; arg++);
+ partition->args[arg++] = MPIU_Strdup("proxy");
+
+ HYDU_Int_to_str(partition->proc_count, str, status);
+ partition->args[arg++] = MPIU_Strdup("--proc-count");
+ partition->args[arg++] = MPIU_Strdup(str);
+
+ partition->args[arg++] = MPIU_Strdup("--partition");
+ partition->args[arg++] = MPIU_Strdup(partition->name);
+ partition->args[arg++] = MPIU_Strdup(str);
+ HYDU_FREE(str);
+
+ HYDU_Int_to_str(process_id, str, status);
+ partition->args[arg++] = MPIU_Strdup("--pmi-id");
+ partition->args[arg++] = MPIU_Strdup(str);
+ HYDU_FREE(str);
+
+ j = 0;
+ for (env = handle.system_env; env; env = env->next)
+ j++;
+ for (env = proc_params->prop_env; env; env = env->next)
+ j++;
+ HYDU_Int_to_str(j, str, status);
+ partition->args[arg++] = MPIU_Strdup("--environment");
+ partition->args[arg++] = MPIU_Strdup(str);
+ for (env = handle.system_env; env; env = env->next)
+ partition->args[arg++] = MPIU_Strdup(env->env_name);
+ for (env = proc_params->prop_env; env; env = env->next)
+ partition->args[arg++] = MPIU_Strdup(env->env_name);
+ HYDU_FREE(str);
+
+ partition->args[arg] = NULL;
HYDU_Append_exec(proc_params->exec, partition->args);
for (arg = 0; partition->args[arg]; arg++);
partition->args[arg++] = MPIU_Strdup("\"");
partition->args[arg++] = NULL;
- process_id++;
+ process_id += partition->proc_count;
}
}
@@ -189,8 +187,7 @@
* processes. */
status = HYD_BSCI_Launch_procs();
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("bootstrap server returned error launching processes\n");
+ HYDU_Error_printf("bootstrap server returned error launching processes\n");
goto fn_fail;
}
Modified: mpich2/trunk/src/pm/hydra/pm/central/proxy.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/central/proxy.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -15,9 +15,11 @@
int main(int argc, char **argv)
{
- int i, high_port, low_port, port;
+ int i, j, arg, high_port, low_port, port, sockets_open;
int HYD_Proxy_listenfd;
- char *port_range, *port_str;
+ char *port_range, *port_str, *str;
+ HYD_Env_t *env, pmi;
+ char *client_args[HYD_EXEC_ARGS];
HYD_Status status = HYD_SUCCESS;
status = HYD_Proxy_get_params(argc, argv);
@@ -42,12 +44,9 @@
}
/* Register the listening socket with the demux engine */
- status =
- HYD_DMX_Register_fd(1, &HYD_Proxy_listenfd, HYD_STDOUT,
- HYD_Proxy_listen_cb);
+ status = HYD_DMX_Register_fd(1, &HYD_Proxy_listenfd, HYD_STDOUT, HYD_Proxy_listen_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error for registering fd\n");
+ HYDU_Error_printf("demux engine returned error for registering fd\n");
goto fn_fail;
}
@@ -63,22 +62,48 @@
HYD_Proxy_params.proc_count * sizeof(int), status);
HYDU_MALLOC(HYD_Proxy_params.err, int *,
HYD_Proxy_params.proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_Proxy_params.pid, int *,
+ HYD_Proxy_params.proc_count * sizeof(int), status);
/* Spawn the processes */
for (i = 0; i < HYD_Proxy_params.proc_count; i++) {
+
+ pmi.env_name = MPIU_Strdup("PMI_ID");
+ HYDU_Int_to_str(HYD_Proxy_params.pmi_id + i, str, status);
+ pmi.env_value = MPIU_Strdup(str);
+ pmi.env_type = HYD_ENV_STATIC;
+ pmi.start_val = 0;
+ pmi.next = NULL;
+
+ /* Update the PMI_ID value with this one */
+ status = HYDU_Env_add_to_list(&HYD_Proxy_params.env_list, pmi);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("unable to add env to list\n");
+ goto fn_fail;
+ }
+
+ /* Set the environment with the current values */
+ for (env = HYD_Proxy_params.env_list; env; env = env->next) {
+ status = HYDU_Env_putenv(*env);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("unable to putenv\n");
+ goto fn_fail;
+ }
+ }
+
+ for (j = 0, arg = 0; HYD_Proxy_params.args[j]; j++)
+ client_args[arg++] = MPIU_Strdup(HYD_Proxy_params.args[j]);
+ client_args[arg++] = NULL;
+
if ((i & HYD_Proxy_params.pmi_id) == 0) {
- status =
- HYDU_Create_process(HYD_Proxy_params.args,
- &HYD_Proxy_params.in,
- &HYD_Proxy_params.out[i],
- &HYD_Proxy_params.err[i],
- &HYD_Proxy_params.pid[i]);
+ status = HYDU_Create_process(client_args, &HYD_Proxy_params.in,
+ &HYD_Proxy_params.out[i], &HYD_Proxy_params.err[i],
+ &HYD_Proxy_params.pid[i]);
}
else {
- status = HYDU_Create_process(HYD_Proxy_params.args, NULL,
+ status = HYDU_Create_process(client_args, NULL,
&HYD_Proxy_params.out[i],
- &HYD_Proxy_params.err[i],
- &HYD_Proxy_params.pid[i]);
+ &HYD_Proxy_params.err[i], &HYD_Proxy_params.pid[i]);
}
if (status != HYD_SUCCESS) {
HYDU_Error_printf("spawn process returned error\n");
@@ -87,22 +112,16 @@
}
/* Everything is spawned, now wait for I/O */
- status =
- HYD_DMX_Register_fd(HYD_Proxy_params.proc_count,
- HYD_Proxy_params.out, HYD_STDOUT,
- HYD_Proxy_stdout_cb);
+ status = HYD_DMX_Register_fd(HYD_Proxy_params.proc_count, HYD_Proxy_params.out,
+ HYD_STDOUT, HYD_Proxy_stdout_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when registering fd\n");
+ HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
- status =
- HYD_DMX_Register_fd(HYD_Proxy_params.proc_count,
- HYD_Proxy_params.err, HYD_STDOUT,
- HYD_Proxy_stderr_cb);
+ status = HYD_DMX_Register_fd(HYD_Proxy_params.proc_count, HYD_Proxy_params.err,
+ HYD_STDOUT, HYD_Proxy_stderr_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when registering fd\n");
+ HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
@@ -110,6 +129,29 @@
HYD_Proxy_params.stdin_buf_offset = 0;
HYD_Proxy_params.stdin_buf_count = 0;
+ while (1) {
+ /* Wait for some event to occur */
+ status = HYD_DMX_Wait_for_event();
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("demux engine returned error when waiting for event\n");
+ goto fn_fail;
+ }
+
+ /* Check to see if there's any open read socket left; if there
+ * are, we will just wait for more events. */
+ sockets_open = 0;
+ for (i = 0; i < HYD_Proxy_params.proc_count; i++) {
+ if (HYD_Proxy_params.out[i] != -1 || HYD_Proxy_params.err[i] != -1) {
+ sockets_open++;
+ break;
+ }
+ }
+
+ /* We are done */
+ if (!sockets_open)
+ break;
+ }
+
fn_exit:
return status;
Modified: mpich2/trunk/src/pm/hydra/pm/central/proxy.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy.h 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/central/proxy.h 2009-03-10 04:26:16 UTC (rev 3992)
@@ -11,6 +11,7 @@
struct HYD_Proxy_params {
HYD_Env_t *global_env;
+ HYD_Env_t *env_list;
int proc_count;
int pmi_id;
char *args[HYD_EXEC_ARGS];
Modified: mpich2/trunk/src/pm/hydra/pm/central/proxy_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy_cb.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/central/proxy_cb.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -12,9 +12,6 @@
struct HYD_Proxy_params HYD_Proxy_params;
int HYD_Proxy_listenfd;
-/* FIXME: A lot of this content is copied from the mpiexec
- * callback. This needs to be moved to utility functions instead. */
-
HYD_Status HYD_Proxy_listen_cb(int fd, HYD_Event_t events)
{
int accept_fd, count, i;
@@ -24,8 +21,7 @@
HYDU_FUNC_ENTER();
if (events & HYD_STDIN) {
- HYDU_Error_printf("stdout handler got an stdin event: %d\n",
- events);
+ HYDU_Error_printf("stdout handler got an stdin event: %d\n", events);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
@@ -33,25 +29,20 @@
if (fd == HYD_Proxy_listenfd) { /* mpiexec is trying to connect */
status = HYDU_Sock_accept(fd, &accept_fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when accepting connection\n");
+ HYDU_Error_printf("sock utils returned error when accepting connection\n");
goto fn_fail;
}
- status =
- HYD_DMX_Register_fd(1, &accept_fd, HYD_STDOUT,
- HYD_Proxy_listen_cb);
+ status = HYD_DMX_Register_fd(1, &accept_fd, HYD_STDOUT, HYD_Proxy_listen_cb);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when registering fd\n");
+ HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
}
else { /* We got a command from mpiexec */
count = read(fd, &cmd, HYD_TMPBUF_SIZE);
if (count < 0) {
- HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n",
- fd, errno);
+ HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -59,8 +50,7 @@
/* The connection has closed */
status = HYD_DMX_Deregister_fd(fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when deregistering fd\n");
+ HYDU_Error_printf("demux engine returned error when deregistering fd\n");
goto fn_fail;
}
goto fn_exit;
@@ -88,43 +78,28 @@
HYD_Status HYD_Proxy_stdout_cb(int fd, HYD_Event_t events)
{
- int count;
- char buf[HYD_TMPBUF_SIZE];
+ int closed, i;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- if (events & HYD_STDIN) {
- HYDU_Error_printf("stdout handler got an stdin event: %d\n",
- events);
- status = HYD_INTERNAL_ERROR;
+ status = HYDU_Sock_stdout_cb(fd, events, 1, &closed);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("sock stdout callback error\n");
goto fn_fail;
}
- count = read(fd, buf, HYD_TMPBUF_SIZE);
- if (count < 0) {
- HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
- else if (count == 0) {
+ if (closed) {
/* The connection has closed */
status = HYD_DMX_Deregister_fd(fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when deregistering fd\n");
+ HYDU_Error_printf("demux engine returned error when deregistering fd\n");
goto fn_fail;
}
- goto fn_exit;
- }
- count = write(1, buf, count);
- if (count < 0) {
- HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
+ for (i = 0; i < HYD_Proxy_params.proc_count; i++)
+ if (HYD_Proxy_params.out[i] == fd)
+ HYD_Proxy_params.out[i] = -1;
}
fn_exit:
@@ -138,43 +113,28 @@
HYD_Status HYD_Proxy_stderr_cb(int fd, HYD_Event_t events)
{
- int count;
- char buf[HYD_TMPBUF_SIZE];
+ int closed, i;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- if (events & HYD_STDIN) {
- HYDU_Error_printf("stderr handler got an stdin event: %d\n",
- events);
- status = HYD_INTERNAL_ERROR;
+ status = HYDU_Sock_stdout_cb(fd, events, 2, &closed);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("sock stdout callback error\n");
goto fn_fail;
}
- count = read(fd, buf, HYD_TMPBUF_SIZE);
- if (count < 0) {
- HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
- else if (count == 0) {
+ if (closed) {
/* The connection has closed */
status = HYD_DMX_Deregister_fd(fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when deregistering fd\n");
+ HYDU_Error_printf("demux engine returned error when deregistering fd\n");
goto fn_fail;
}
- goto fn_exit;
- }
- count = write(2, buf, count);
- if (count < 0) {
- HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd,
- errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
+ for (i = 0; i < HYD_Proxy_params.proc_count; i++)
+ if (HYD_Proxy_params.err[i] == fd)
+ HYD_Proxy_params.err[i] = -1;
}
fn_exit:
@@ -188,64 +148,29 @@
HYD_Status HYD_Proxy_stdin_cb(int fd, HYD_Event_t events)
{
- int count;
+ int closed;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- if (events & HYD_STDIN) {
- HYDU_Error_printf
- ("stdin handler got a writeable event on local stdin: %d\n",
- events);
- status = HYD_INTERNAL_ERROR;
+ status = HYDU_Sock_stdin_cb(fd, events, HYD_Proxy_params.stdin_tmp_buf,
+ &HYD_Proxy_params.stdin_buf_count,
+ &HYD_Proxy_params.stdin_buf_count, &closed);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("sock stdin callback error\n");
goto fn_fail;
}
- while (1) {
- /* If we already have buffered data, send it out */
- if (HYD_Proxy_params.stdin_buf_count) {
- count =
- write(HYD_Proxy_params.in,
- HYD_Proxy_params.stdin_tmp_buf +
- HYD_Proxy_params.stdin_buf_offset,
- HYD_Proxy_params.stdin_buf_count);
- if (count < 0) {
- /* We can't get an EAGAIN as we just got out of poll */
- HYDU_Error_printf
- ("socket write error on fd: %d (errno: %d)\n",
- HYD_Proxy_params.in, errno);
- status = HYD_SOCK_ERROR;
- goto fn_fail;
- }
- HYD_Proxy_params.stdin_buf_offset += count;
- HYD_Proxy_params.stdin_buf_count -= count;
- break;
- }
-
- /* If we are still here, we need to refill our temporary buffer */
- count = read(0, HYD_Proxy_params.stdin_tmp_buf, HYD_TMPBUF_SIZE);
- if (count < 0) {
- if (errno == EINTR || errno == EAGAIN) {
- /* This call was interrupted or there was no data to read; just break out. */
- break;
- }
-
- HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n",
- fd, errno);
- status = HYD_SOCK_ERROR;
+ if (closed) {
+ /* The connection has closed */
+ status = HYD_DMX_Deregister_fd(fd);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("demux engine returned error when deregistering fd\n");
goto fn_fail;
}
- else if (count == 0) {
- /* The connection has closed */
- status = HYD_DMX_Deregister_fd(fd);
- if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("demux engine returned error when deregistering fd\n");
- goto fn_fail;
- }
- break;
- }
- HYD_Proxy_params.stdin_buf_count += count;
+
+ if (HYD_Proxy_params.in == fd)
+ HYD_Proxy_params.in = -1;
}
fn_exit:
Modified: mpich2/trunk/src/pm/hydra/pm/central/proxy_utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy_utils.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/central/proxy_utils.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -13,13 +13,15 @@
{
int argc = t_argc;
char **argv = t_argv;
- int app_params = 0, arg;
+ int arg, i, count;
struct HYD_Partition_list *partition, *run;
+ HYD_Env_t *found, env;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
HYD_Proxy_params.global_env = NULL;
+ HYD_Proxy_params.env_list = NULL;
HYD_Proxy_params.partition = NULL;
status = HYDU_Env_global_list(&HYD_Proxy_params.global_env);
@@ -28,7 +30,6 @@
goto fn_fail;
}
- app_params = 0;
while (--argc && ++argv) {
/* Process count */
@@ -59,18 +60,48 @@
if (!HYD_Proxy_params.partition)
HYD_Proxy_params.partition = partition;
else {
- for (run = HYD_Proxy_params.partition; run->next;
- run = run->next);
+ for (run = HYD_Proxy_params.partition; run->next; run = run->next);
run->next = partition;
}
continue;
}
+ /* Environment information is passed as a list of names; we
+ * need to find the values from our environment. */
+ if (!strcmp(*argv, "--environment")) {
+ argv++;
+ count = atoi(*argv);
+ for (i = 0; i < count; i++) {
+ argv++;
+
+ /* First find the environment variable */
+ env.env_name = MPIU_Strdup(*argv);
+ found = HYDU_Env_found_in_list(HYD_Proxy_params.global_env, env);
+ if (!found) {
+ HYDU_Error_printf("Unable to find requested env: %s\n", env.env_name);
+ status = HYD_INTERNAL_ERROR;
+ goto fn_fail;
+ }
+
+ /* Now add it to the env_list */
+ status = HYDU_Env_add_to_list(&HYD_Proxy_params.env_list, *found);
+ if (status != HYD_SUCCESS) {
+ HYDU_Error_printf("Unable to add env to list\n");
+ goto fn_fail;
+ }
+ }
+ continue;
+ }
+
/* Fall through case is application parameters. Load
* everything into the args variable. */
- for (arg = 0; argv;)
- HYD_Proxy_params.args[arg++] = MPIU_Strdup(argv++);
+ for (arg = 0; *argv;) {
+ HYD_Proxy_params.args[arg++] = MPIU_Strdup(*argv);
+ ++argv;
+ --argc;
+ }
HYD_Proxy_params.args[arg++] = NULL;
+ break;
}
fn_exit:
Modified: mpich2/trunk/src/pm/hydra/pm/utils/pmi.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/utils/pmi.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/pm/utils/pmi.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -19,10 +19,8 @@
HYDU_FUNC_ENTER();
- HYDU_MALLOC(*kvs, HYD_PMCU_pmi_kvs_t *, sizeof(HYD_PMCU_pmi_kvs_t),
- status);
- MPIU_Snprintf((*kvs)->kvs_name, MAXNAMELEN, "kvs_%d_%d",
- (int) getpid(), pgid);
+ HYDU_MALLOC(*kvs, HYD_PMCU_pmi_kvs_t *, sizeof(HYD_PMCU_pmi_kvs_t), status);
+ MPIU_Snprintf((*kvs)->kvs_name, MAXNAMELEN, "kvs_%d_%d", (int) getpid(), pgid);
(*kvs)->key_pair = NULL;
fn_exit:
@@ -40,8 +38,7 @@
HYDU_FUNC_ENTER();
- HYDU_MALLOC(*pg, HYD_PMCU_pmi_pg_t *, sizeof(HYD_PMCU_pmi_pg_t),
- status);
+ HYDU_MALLOC(*pg, HYD_PMCU_pmi_pg_t *, sizeof(HYD_PMCU_pmi_pg_t), status);
(*pg)->id = pgid;
(*pg)->num_procs = 0;
(*pg)->barrier_count = 0;
@@ -72,8 +69,7 @@
HYDU_FUNC_ENTER();
- HYDU_MALLOC(process, HYD_PMCU_pmi_process_t *,
- sizeof(HYD_PMCU_pmi_process_t), status);
+ HYDU_MALLOC(process, HYD_PMCU_pmi_process_t *, sizeof(HYD_PMCU_pmi_process_t), status);
process->fd = fd;
process->pg = pg;
process->next = NULL;
@@ -166,8 +162,7 @@
HYDU_STR_ALLOC_AND_JOIN(tmp, cmd, status);
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
HYDU_FREE(ssize);
@@ -210,12 +205,10 @@
if (pmi_version == 1 && pmi_subversion <= 1) {
/* We support PMI v1.0 and 1.1 */
- tmp[0] =
- "cmd=response_to_init pmi_version=1 pmi_subversion=1 rc=0\n";
+ tmp[0] = "cmd=response_to_init pmi_version=1 pmi_subversion=1 rc=0\n";
status = HYDU_Sock_writeline(fd, tmp[0], strlen(tmp[0]));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
}
@@ -262,8 +255,7 @@
HYDU_STR_ALLOC_AND_JOIN(tmp, cmd, status);
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
HYDU_FREE(cmd);
@@ -330,8 +322,7 @@
HYDU_STR_ALLOC_AND_JOIN(tmp, cmd, status);
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
HYDU_FREE(cmd);
@@ -360,8 +351,7 @@
process = find_process(fd);
if (process == NULL) { /* We didn't find the process */
status = HYD_INTERNAL_ERROR;
- HYDU_Error_printf
- ("could not find the process structure for fd %d\n", fd);
+ HYDU_Error_printf("could not find the process structure for fd %d\n", fd);
goto fn_fail;
}
@@ -374,8 +364,7 @@
HYDU_STR_ALLOC_AND_JOIN(tmp, cmd, status);
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
HYDU_FREE(cmd);
@@ -402,8 +391,7 @@
process = find_process(fd);
if (process == NULL) { /* We didn't find the process */
status = HYD_INTERNAL_ERROR;
- HYDU_Error_printf
- ("could not find the process structure for fd %d\n", fd);
+ HYDU_Error_printf("could not find the process structure for fd %d\n", fd);
goto fn_fail;
}
@@ -417,8 +405,7 @@
while (run) {
status = HYDU_Sock_writeline(run->fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
run = run->next;
@@ -458,8 +445,7 @@
process = find_process(fd);
if (process == NULL) { /* We didn't find the process */
status = HYD_INTERNAL_ERROR;
- HYDU_Error_printf
- ("could not find the process structure for fd %d\n", fd);
+ HYDU_Error_printf("could not find the process structure for fd %d\n", fd);
goto fn_fail;
}
@@ -471,8 +457,7 @@
goto fn_fail;
}
- HYDU_MALLOC(key_pair, HYD_PMCU_pmi_kvs_pair_t *,
- sizeof(HYD_PMCU_pmi_kvs_pair_t), status);
+ HYDU_MALLOC(key_pair, HYD_PMCU_pmi_kvs_pair_t *, sizeof(HYD_PMCU_pmi_kvs_pair_t), status);
MPIU_Snprintf(key_pair->key, MAXKEYLEN, "%s", key);
MPIU_Snprintf(key_pair->val, MAXVALLEN, "%s", val);
key_pair->next = NULL;
@@ -503,8 +488,7 @@
HYDU_STR_ALLOC_AND_JOIN(tmp, cmd, status);
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
HYDU_FREE(cmd);
@@ -539,8 +523,7 @@
process = find_process(fd);
if (process == NULL) { /* We didn't find the process */
status = HYD_INTERNAL_ERROR;
- HYDU_Error_printf
- ("could not find the process structure for fd %d\n", fd);
+ HYDU_Error_printf("could not find the process structure for fd %d\n", fd);
goto fn_fail;
}
@@ -582,8 +565,7 @@
HYDU_STR_ALLOC_AND_JOIN(tmp, cmd, status);
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
HYDU_FREE(cmd);
@@ -608,8 +590,7 @@
cmd = "cmd=finalize_ack\n";
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
@@ -632,8 +613,7 @@
status = HYD_BSCI_Get_universe_size(&usize);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("Unable to get universe size from the bootstrap server\n");
+ HYDU_Error_printf("Unable to get universe size from the bootstrap server\n");
goto fn_fail;
}
@@ -648,8 +628,7 @@
HYDU_STR_ALLOC_AND_JOIN(tmp, cmd, status);
status = HYDU_Sock_writeline(fd, cmd, strlen(cmd));
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("sock utils returned error when writing PMI line\n");
+ HYDU_Error_printf("sock utils returned error when writing PMI line\n");
goto fn_fail;
}
HYDU_FREE(cmd);
@@ -663,8 +642,7 @@
}
-static HYD_Status free_pmi_process_list(HYD_PMCU_pmi_process_t *
- process_list)
+static HYD_Status free_pmi_process_list(HYD_PMCU_pmi_process_t * process_list)
{
HYD_PMCU_pmi_process_t *process, *tmp;
HYD_Status status = HYD_SUCCESS;
Modified: mpich2/trunk/src/pm/hydra/utils/env/env.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/env/env.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/utils/env/env.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -32,8 +32,7 @@
status = HYDU_Env_add_to_list(env_list, *env);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf
- ("launcher returned error adding env to list\n");
+ HYDU_Error_printf("launcher returned error adding env to list\n");
goto fn_fail;
}
HYDU_Env_free(env);
@@ -92,7 +91,7 @@
}
-HYD_Env_t *HYDU_Env_found_in_list(HYD_Env_t * env_list, HYD_Env_t * env)
+HYD_Env_t *HYDU_Env_found_in_list(HYD_Env_t * env_list, HYD_Env_t env)
{
HYD_Env_t *run;
@@ -100,7 +99,7 @@
run = env_list;
while (run->next) {
- if (!strcmp(run->env_name, env->env_name))
+ if (!strcmp(run->env_name, env.env_name))
goto fn_exit;
run = run->next;
}
@@ -206,8 +205,7 @@
HYD_Status HYDU_Env_create(HYD_Env_t ** env, char *env_name,
- char *env_value, HYD_Env_type_t env_type,
- int start)
+ char *env_value, HYD_Env_type_t env_type, int start)
{
HYD_Status status = HYD_SUCCESS;
@@ -262,3 +260,29 @@
HYDU_FUNC_EXIT();
return status;
}
+
+
+HYD_Status HYDU_Env_putenv(HYD_Env_t env)
+{
+ int i;
+ char *tmp[HYDU_NUM_JOIN_STR], *env_str;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ i = 0;
+ tmp[i++] = MPIU_Strdup(env.env_name);
+ tmp[i++] = MPIU_Strdup("=");
+ tmp[i++] = MPIU_Strdup(env.env_value);
+ tmp[i++] = NULL;
+ HYDU_STR_ALLOC_AND_JOIN(tmp, env_str, status);
+
+ putenv(env_str);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
Modified: mpich2/trunk/src/pm/hydra/utils/launch/allocate.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/launch/allocate.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/utils/launch/allocate.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -8,8 +8,6 @@
#include "hydra_mem.h"
#include "hydra_launch.h"
-HYD_Handle handle;
-
HYD_Status HYDU_Allocate_Partition(struct HYD_Partition_list **partition)
{
HYD_Status status = HYD_SUCCESS;
Modified: mpich2/trunk/src/pm/hydra/utils/launch/args.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/launch/args.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/utils/launch/args.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -8,8 +8,6 @@
#include "hydra_mem.h"
#include "hydra_launch.h"
-HYD_Handle handle;
-
HYD_Status HYDU_Append_env(HYD_Env_t * env_list, char **client_arg, int id)
{
int i, j, csh_format;
@@ -78,7 +76,7 @@
}
-HYD_Status HYDU_Append_wdir(char **client_arg)
+HYD_Status HYDU_Append_wdir(char **client_arg, char *wdir)
{
int arg;
HYD_Status status = HYD_SUCCESS;
@@ -87,10 +85,26 @@
for (arg = 0; client_arg[arg]; arg++);
client_arg[arg++] = MPIU_Strdup("cd");
- client_arg[arg++] = MPIU_Strdup(handle.wdir);
+ client_arg[arg++] = MPIU_Strdup(wdir);
client_arg[arg++] = MPIU_Strdup(";");
client_arg[arg++] = NULL;
HYDU_FUNC_EXIT();
return status;
}
+
+
+HYD_Status HYDU_Dump_args(char **args)
+{
+ int arg;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ for (arg = 0; args[arg]; arg++)
+ fprintf(stderr, "%s ", args[arg]);
+ fprintf(stderr, "\n");
+
+ HYDU_FUNC_EXIT();
+ return status;
+}
Modified: mpich2/trunk/src/pm/hydra/utils/launch/launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/launch/launch.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/utils/launch/launch.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -8,10 +8,7 @@
#include "hydra_mem.h"
#include "hydra_launch.h"
-HYD_Handle handle;
-
-HYD_Status HYDU_Create_process(char **client_arg, int *in, int *out,
- int *err, int *pid)
+HYD_Status HYDU_Create_process(char **client_arg, int *in, int *out, int *err, int *pid)
{
int inpipe[2], outpipe[2], errpipe[2], arg, tpid;
HYD_Status status = HYD_SUCCESS;
@@ -67,17 +64,6 @@
}
}
- if (chdir(handle.wdir) < 0) {
- if (chdir(getenv("HOME")) < 0) {
- HYDU_Error_printf
- ("unable to set working directory to %s\n",
- handle.wdir);
- status = HYD_INTERNAL_ERROR;
- goto fn_fail;
- }
- }
-
- /* execvp the process */
if (execvp(client_arg[0], client_arg) < 0) {
HYDU_Error_printf("execvp error\n");
status = HYD_INTERNAL_ERROR;
Modified: mpich2/trunk/src/pm/hydra/utils/sock/sock.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/sock/sock.c 2009-03-09 23:02:27 UTC (rev 3991)
+++ mpich2/trunk/src/pm/hydra/utils/sock/sock.c 2009-03-10 04:26:16 UTC (rev 3992)
@@ -7,8 +7,7 @@
#include "hydra_sock.h"
#include "hydra_dbg.h"
-HYD_Status HYDU_Sock_listen(int *listen_fd, char *port_range,
- uint16_t * port)
+HYD_Status HYDU_Sock_listen(int *listen_fd, char *port_range, uint16_t * port)
{
struct sockaddr_in sa;
int one = 1;
@@ -47,17 +46,14 @@
*listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (*listen_fd < 0) {
- HYDU_Error_printf("unable to create a stream socket (errno: %d)\n",
- errno);
+ HYDU_Error_printf("unable to create a stream socket (errno: %d)\n", errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
if (setsockopt(*listen_fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(int))
< 0) {
- HYDU_Error_printf
- ("unable to set the TCP_NODELAY socket option (errno: %d)\n",
- errno);
+ HYDU_Error_printf("unable to set the TCP_NODELAY socket option (errno: %d)\n", errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -75,8 +71,7 @@
if (errno != EADDRINUSE) {
status = HYD_SOCK_ERROR;
HYDU_Error_printf
- ("unable to bind listen socket %d (errno: %d)\n",
- *listen_fd, errno);
+ ("unable to bind listen socket %d (errno: %d)\n", *listen_fd, errno);
goto fn_fail;
}
}
@@ -93,8 +88,7 @@
status = HYDU_Sock_set_cloexec(*listen_fd);
if (status != HYD_SUCCESS) {
- HYDU_Error_printf("unable to set fd %d to close on exec\n",
- *listen_fd);
+ HYDU_Error_printf("unable to set fd %d to close on exec\n", *listen_fd);
goto fn_fail;
}
@@ -136,8 +130,7 @@
/* Get the remote host's IP address */
ht = gethostbyname(host);
if (ht == NULL) {
- HYDU_Error_printf("unable to get host address: %s (errno: %d)\n",
- host, errno);
+ HYDU_Error_printf("unable to get host address: %s (errno: %d)\n", host, errno);
status = HYD_INVALID_PARAM;
goto fn_fail;
}
@@ -146,16 +139,13 @@
/* Create a socket and set the required options */
*fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (*fd < 0) {
- HYDU_Error_printf("unable to create a stream socket (errno: %d)\n",
- errno);
+ HYDU_Error_printf("unable to create a stream socket (errno: %d)\n", errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
if (setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) < 0) {
- HYDU_Error_printf
- ("unable to set the SO_REUSEADDR socket option (errno: %d)\n",
- errno);
+ HYDU_Error_printf("unable to set the SO_REUSEADDR socket option (errno: %d)\n", errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -195,8 +185,7 @@
*fd = accept(listen_fd, 0, 0);
if (*fd < 0) {
- HYDU_Error_printf("accept error on socket %d (errno: %d)\n",
- listen_fd, errno);
+ HYDU_Error_printf("accept error on socket %d (errno: %d)\n", listen_fd, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -244,8 +233,7 @@
if (errno == EINTR)
continue;
- HYDU_Error_printf("error reading from socket %d (errno: %d)\n",
- fd, errno);
+ HYDU_Error_printf("error reading from socket %d (errno: %d)\n", fd, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -290,8 +278,7 @@
} while (*count < 0 && errno == EINTR);
if (*count < 0) {
- HYDU_Error_printf("error reading from socket %d (errno: %d)\n", fd,
- errno);
+ HYDU_Error_printf("error reading from socket %d (errno: %d)\n", fd, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -323,8 +310,7 @@
} while (n < 0 && errno == EINTR);
if (n < maxsize) {
- HYDU_Error_printf("error writing to socket %d (errno: %d)\n", fd,
- errno);
+ HYDU_Error_printf("error writing to socket %d (errno: %d)\n", fd, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -350,8 +336,7 @@
} while (n < 0 && errno == EINTR);
if (n < maxsize) {
- HYDU_Error_printf("error writing to socket %d (errno: %d)\n", fd,
- errno);
+ HYDU_Error_printf("error writing to socket %d (errno: %d)\n", fd, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
@@ -410,3 +395,107 @@
HYDU_FUNC_EXIT();
return status;
}
+
+
+HYD_Status HYDU_Sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed)
+{
+ int count;
+ char buf[HYD_TMPBUF_SIZE];
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ *closed = 0;
+
+ if (events & HYD_STDIN) {
+ HYDU_Error_printf("stdout handler got an stdin event: %d\n", events);
+ status = HYD_INTERNAL_ERROR;
+ goto fn_fail;
+ }
+
+ count = read(fd, buf, HYD_TMPBUF_SIZE);
+ if (count < 0) {
+ HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd, errno);
+ status = HYD_SOCK_ERROR;
+ goto fn_fail;
+ }
+ else if (count == 0) {
+ /* The connection has closed */
+ *closed = 1;
+ goto fn_exit;
+ }
+
+ count = write(stdout_fd, buf, count);
+ if (count < 0) {
+ HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd, errno);
+ status = HYD_SOCK_ERROR;
+ goto fn_fail;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYDU_Sock_stdin_cb(int fd, HYD_Event_t events, char *buf, int *buf_count,
+ int *buf_offset, int *closed)
+{
+ int count;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ *closed = 0;
+
+ if (events & HYD_STDIN) {
+ HYDU_Error_printf("stdin handler got a writeable event on local stdin: %d\n", events);
+ status = HYD_INTERNAL_ERROR;
+ goto fn_fail;
+ }
+
+ while (1) {
+ /* If we already have buffered data, send it out */
+ if (*buf_count) {
+ count = write(fd, buf + *buf_offset, *buf_count);
+ if (count < 0) {
+ /* We can't get an EAGAIN as we just got out of poll */
+ HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd, errno);
+ status = HYD_SOCK_ERROR;
+ goto fn_fail;
+ }
+ *buf_offset += count;
+ *buf_count -= count;
+ break;
+ }
+
+ /* If we are still here, we need to refill our temporary buffer */
+ count = read(0, buf, HYD_TMPBUF_SIZE);
+ if (count < 0) {
+ if (errno == EINTR || errno == EAGAIN) {
+ /* This call was interrupted or there was no data to read; just break out. */
+ break;
+ }
+
+ HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd, errno);
+ status = HYD_SOCK_ERROR;
+ goto fn_fail;
+ }
+ else if (count == 0) {
+ /* The connection has closed */
+ *closed = 1;
+ break;
+ }
+ *buf_count += count;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
More information about the mpich2-commits
mailing list