[mpich2-commits] r4229 - in mpich2/trunk/src/pm/hydra: . include pm/pmiserv utils/launch utils/sock
balaji at mcs.anl.gov
balaji at mcs.anl.gov
Tue Mar 31 21:12:41 CDT 2009
Author: balaji
Date: 2009-03-31 21:12:40 -0500 (Tue, 31 Mar 2009)
New Revision: 4229
Modified:
mpich2/trunk/src/pm/hydra/configure.in
mpich2/trunk/src/pm/hydra/include/hydra_base.h
mpich2/trunk/src/pm/hydra/include/hydra_utils.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
mpich2/trunk/src/pm/hydra/utils/launch/launch.c
mpich2/trunk/src/pm/hydra/utils/sock/sock.c
Log:
We now use threads to connect to different persistent proxies in
parallel and send them relevant executable information.
Modified: mpich2/trunk/src/pm/hydra/configure.in
===================================================================
--- mpich2/trunk/src/pm/hydra/configure.in 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/configure.in 2009-04-01 02:12:40 UTC (rev 4229)
@@ -35,13 +35,15 @@
CFLAGS=$save_cflags
dnl Check if the necessary headers are available
-AC_CHECK_HEADERS(unistd.h stdlib.h string.h strings.h stdarg.h sys/types.h sys/socket.h sched.h)
+AC_CHECK_HEADERS(unistd.h stdlib.h string.h strings.h stdarg.h sys/types.h sys/socket.h \
+ sched.h pthread.h)
# Check for special types
AC_TYPE_SIZE_T
dnl Check for necessary functions
-AC_CHECK_FUNCS(gettimeofday time strdup sigaction signal usleep alloca snprintf unsetenv strerror strsignal)
+AC_CHECK_FUNCS(gettimeofday time strdup sigaction signal usleep alloca snprintf unsetenv \
+ strerror strsignal)
dnl Check what we need to do about the environ extern
AC_CACHE_CHECK([for environ in unistd.h],pac_cv_environ_in_unistd,
Modified: mpich2/trunk/src/pm/hydra/include/hydra_base.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_base.h 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/include/hydra_base.h 2009-04-01 02:12:40 UTC (rev 4229)
@@ -57,6 +57,23 @@
extern char **environ;
#endif /* MANUAL_EXTERN_ENVIRON */
+
+/* sockets required headers */
+#include <poll.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#if !defined size_t
+#define size_t unsigned int
+#endif /* size_t */
+
+
#define HYD_DEFAULT_PROXY_PORT 9899
#define HYD_STDOUT (1)
@@ -132,9 +149,21 @@
struct HYD_Partition_exec *next;
};
+#if !defined HAVE_PTHREAD_H
+#error "pthread.h needed"
+#else
+#include <pthread.h>
+#endif
+
+struct HYD_Thread_context {
+ pthread_t thread;
+};
+
/* Partition information */
struct HYD_Partition {
char *name;
+ struct sockaddr_in sa;
+
char *user_bind_map;
int total_proc_count;
@@ -167,6 +196,6 @@
HYD_Env_t *prop_env;
struct HYD_Exec_info *next;
-} *exec_info;
+};
#endif /* HYDRA_BASE_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/include/hydra_utils.h
===================================================================
--- mpich2/trunk/src/pm/hydra/include/hydra_utils.h 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/include/hydra_utils.h 2009-04-01 02:12:40 UTC (rev 4229)
@@ -171,6 +171,9 @@
HYD_Status HYDU_create_host_list(char *host_file, struct HYD_Partition **partition_list);
HYD_Status HYDU_create_process(char **client_arg, HYD_Env_t * env_list,
int *in, int *out, int *err, int *pid, int core);
+HYD_Status HYDU_create_thread(void *(func)(void *), void *args,
+ struct HYD_Thread_context *ctxt);
+HYD_Status HYDU_join_thread(struct HYD_Thread_context ctxt);
/* signals */
@@ -193,22 +196,10 @@
/* Sock utilities */
-#include <poll.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-
-#ifdef HAVE_SYS_SOCKET_H
-#include <sys/socket.h>
-#endif
-
-#if !defined size_t
-#define size_t unsigned int
-#endif /* size_t */
-
HYD_Status HYDU_sock_listen(int *listen_fd, char *port_range, uint16_t * port);
-HYD_Status HYDU_sock_connect(const char *host, uint16_t port, int *fd);
+HYD_Status HYDU_sock_gethostbyname(const char *host, struct sockaddr_in *sa, uint16_t port);
+HYD_Status HYDU_sock_connect(struct sockaddr_in sa, int *fd);
+HYD_Status HYDU_sock_tryconnect(struct sockaddr_in sa, int *fd);
HYD_Status HYDU_sock_accept(int listen_fd, int *fd);
HYD_Status HYDU_sock_readline(int fd, char *buf, int maxlen, int *linelen);
HYD_Status HYDU_sock_read(int fd, void *buf, int maxlen, int *count);
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c 2009-04-01 02:12:40 UTC (rev 4229)
@@ -115,6 +115,8 @@
for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
if (HYD_PMCD_pmi_proxy_params.out[i] == fd)
HYD_PMCD_pmi_proxy_params.out[i] = -1;
+
+ close(fd);
}
fn_exit:
@@ -145,6 +147,8 @@
for (i = 0; i < HYD_PMCD_pmi_proxy_params.exec_proc_count; i++)
if (HYD_PMCD_pmi_proxy_params.err[i] == fd)
HYD_PMCD_pmi_proxy_params.err[i] = -1;
+
+ close(fd);
}
fn_exit:
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c 2009-04-01 02:12:40 UTC (rev 4229)
@@ -179,7 +179,9 @@
* bunch of processes to do this. */
/* Connect to all proxies and send a KILL command */
for (partition = handle.partition_list; partition; partition = partition->next) {
- status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
+ /* We only "try" to connect here, since the proxy might have
+ * already exited and the connect might fail. */
+ status = HYDU_sock_tryconnect(partition->sa, &fd);
if (status != HYD_SUCCESS) {
HYDU_Warn_printf("unable to connect to the proxy on %s\n", partition->name);
overall_status = HYD_INTERNAL_ERROR;
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c 2009-04-01 02:12:40 UTC (rev 4229)
@@ -15,6 +15,102 @@
int HYD_PMCD_pmi_serv_listenfd;
HYD_Handle handle;
+static void launch_helper(void *args)
+{
+ struct HYD_Partition *partition = (struct HYD_Partition *) args;
+ int first_partition = (partition == handle.partition_list);
+ enum HYD_PMCD_pmi_proxy_cmds cmd;
+ int i, list_len, arg_len;
+ HYD_Status status = HYD_SUCCESS;
+
+ /*
+ * Here are the steps we will follow:
+ *
+ * 1. Put all the arguments to pass in to a string list.
+ *
+ * 2. Connect to the proxy (this will be our primary control
+ * socket).
+ *
+ * 3. Read this string list and write the following to the socket:
+ * (a) The PROC_INFO command.
+ * (b) Integer sized data with the number of arguments to
+ * follow.
+ * (c) For each argument to pass, first send an integer which
+ * tells the proxy how many bytes are coming in that
+ * argument.
+ *
+ * 4. Open two new sockets and connect them to the proxy.
+ *
+ * 5. On the first new socket, send USE_AS_STDOUT and the second
+ * send USE_AS_STDERR.
+ *
+ * 6. For PMI_ID "0", open a separate socket and send the
+ * USE_AS_STDIN command on it.
+ *
+ * 7. We need to figure out what to do with the LAUNCH_JOB
+ * command; since it's going on a different socket, it might go
+ * out-of-order. Maybe a state machine on the proxy to see if
+ * it got all the information it needs to launch the job would
+ * work.
+ */
+
+ status = HYDU_sock_connect(partition->sa, &partition->control_fd);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = PROC_INFO;
+ status = HYDU_sock_write(partition->control_fd, &cmd,
+ sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* Check how many arguments we have */
+ list_len = HYDU_strlist_lastidx(partition->proxy_args);
+ status = HYDU_sock_write(partition->control_fd, &list_len, sizeof(int));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* Convert the string list to parseable data and send */
+ for (i = 0; partition->proxy_args[i]; i++) {
+ arg_len = strlen(partition->proxy_args[i]) + 1;
+
+ status = HYDU_sock_write(partition->control_fd, &arg_len, sizeof(int));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ status = HYDU_sock_write(partition->control_fd, partition->proxy_args[i], arg_len);
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+ }
+
+ /* Create an stdout socket */
+ status = HYDU_sock_connect(partition->sa, &partition->out);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = USE_AS_STDOUT;
+ status = HYDU_sock_write(partition->out, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* Create an stderr socket */
+ status = HYDU_sock_connect(partition->sa, &partition->err);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = USE_AS_STDERR;
+ status = HYDU_sock_write(partition->err, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+
+ /* If rank 0 is here, create an stdin socket */
+ if (first_partition) {
+ status = HYDU_sock_connect(partition->sa, &handle.in);
+ HYDU_ERR_POP(status, "unable to connect to proxy\n");
+
+ cmd = USE_AS_STDIN;
+ status = HYDU_sock_write(handle.in, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ HYDU_ERR_POP(status, "unable to write data to proxy\n");
+ }
+
+ fn_exit:
+ return;
+
+ fn_fail:
+ goto fn_exit;
+}
+
static HYD_Status fill_in_proxy_args(void)
{
int i, arg, process_id;
@@ -198,7 +294,7 @@
HYD_Status status = HYD_SUCCESS;
for (partition = handle.partition_list; partition; partition = partition->next) {
- status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
+ status = HYDU_sock_connect(partition->sa, &fd);
HYDU_ERR_POP(status, "unable to connect to proxy\n");
cmd = PROXY_SHUTDOWN;
@@ -218,41 +314,10 @@
static HYD_Status launch_procs_in_persistent_mode(void)
{
struct HYD_Partition *partition;
- enum HYD_PMCD_pmi_proxy_cmds cmd;
- int i, list_len, arg_len, first_partition;
+ int len, id;
+ struct HYD_Thread_context *thread_context;
HYD_Status status = HYD_SUCCESS;
- /*
- * Here are the steps we will follow:
- *
- * 1. Put all the arguments to pass in to a string list.
- *
- * 2. Connect to the proxy (this will be our primary control
- * socket).
- *
- * 3. Read this string list and write the following to the socket:
- * (a) The PROC_INFO command.
- * (b) Integer sized data with the number of arguments to
- * follow.
- * (c) For each argument to pass, first send an integer which
- * tells the proxy how many bytes are coming in that
- * argument.
- *
- * 4. Open two new sockets and connect them to the proxy.
- *
- * 5. On the first new socket, send USE_AS_STDOUT and the second
- * send USE_AS_STDERR.
- *
- * 6. For PMI_ID "0", open a separate socket and send the
- * USE_AS_STDIN command on it.
- *
- * 7. We need to figure out what to do with the LAUNCH_JOB
- * command; since it's going on a different socket, it might go
- * out-of-order. Maybe a state machine on the proxy to see if
- * it got all the information it needs to launch the job would
- * work.
- */
-
status = fill_in_proxy_args();
HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
@@ -262,65 +327,31 @@
status = HYD_BSCI_init(handle.bootstrap);
HYDU_ERR_POP(status, "bootstrap server initialization failed\n");
- first_partition = 1;
+ len = 0;
for (partition = handle.partition_list; partition && partition->exec_list;
- partition = partition->next) {
+ partition = partition->next)
+ len++;
- status = HYDU_sock_connect(partition->name, handle.proxy_port, &partition->control_fd);
- HYDU_ERR_POP(status, "unable to connect to proxy\n");
+ HYDU_MALLOC(thread_context, struct HYD_Thread_context *,
+ len * sizeof(struct HYD_Thread_context), status);
- cmd = PROC_INFO;
- status = HYDU_sock_write(partition->control_fd, &cmd,
- sizeof(enum HYD_PMCD_pmi_proxy_cmds));
- HYDU_ERR_POP(status, "unable to write data to proxy\n");
+ id = 0;
+ for (partition = handle.partition_list; partition && partition->exec_list;
+ partition = partition->next) {
+ HYDU_create_thread(launch_helper, (void *) partition, &thread_context[id]);
+ id++;
+ }
- /* Check how many arguments we have */
- list_len = HYDU_strlist_lastidx(partition->proxy_args);
- status = HYDU_sock_write(partition->control_fd, &list_len, sizeof(int));
- HYDU_ERR_POP(status, "unable to write data to proxy\n");
+ id = 0;
+ for (partition = handle.partition_list; partition && partition->exec_list;
+ partition = partition->next) {
+ HYDU_join_thread(thread_context[id]);
- /* Convert the string list to parseable data and send */
- for (i = 0; partition->proxy_args[i]; i++) {
- arg_len = strlen(partition->proxy_args[i]) + 1;
-
- status = HYDU_sock_write(partition->control_fd, &arg_len, sizeof(int));
- HYDU_ERR_POP(status, "unable to write data to proxy\n");
-
- status = HYDU_sock_write(partition->control_fd, partition->proxy_args[i], arg_len);
- HYDU_ERR_POP(status, "unable to write data to proxy\n");
- }
-
- /* Register the control socket with the demux engine */
status = HYD_DMX_register_fd(1, &partition->control_fd, HYD_STDOUT, partition,
HYD_PMCD_pmi_serv_control_cb);
+ HYDU_ERR_POP(status, "unable to register control fd\n");
- /* Create an stdout socket */
- status = HYDU_sock_connect(partition->name, handle.proxy_port, &partition->out);
- HYDU_ERR_POP(status, "unable to connect to proxy\n");
-
- cmd = USE_AS_STDOUT;
- status = HYDU_sock_write(partition->out, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
- HYDU_ERR_POP(status, "unable to write data to proxy\n");
-
- /* Create an stderr socket */
- status = HYDU_sock_connect(partition->name, handle.proxy_port, &partition->err);
- HYDU_ERR_POP(status, "unable to connect to proxy\n");
-
- cmd = USE_AS_STDERR;
- status = HYDU_sock_write(partition->err, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
- HYDU_ERR_POP(status, "unable to write data to proxy\n");
-
- /* If rank 0 is here, create an stdin socket */
- if (first_partition) {
- status = HYDU_sock_connect(partition->name, handle.proxy_port, &handle.in);
- HYDU_ERR_POP(status, "unable to connect to proxy\n");
-
- cmd = USE_AS_STDIN;
- status = HYDU_sock_write(handle.in, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
- HYDU_ERR_POP(status, "unable to write data to proxy\n");
-
- first_partition = 0;
- }
+ id++;
}
fn_exit:
@@ -330,6 +361,7 @@
goto fn_exit;
}
+
/*
* HYD_PMCI_launch_procs: Here are the steps we follow:
*
@@ -355,6 +387,7 @@
uint16_t port;
char hostname[MAX_HOSTNAME_LEN];
HYD_Env_t *env;
+ struct HYD_Partition *partition;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
@@ -406,6 +439,13 @@
status = HYD_PMCD_pmi_create_pg();
HYDU_ERR_POP(status, "unable to create process group\n");
+ /* For each partition, get the appropriate sockaddr to connect to */
+ for (partition = handle.partition_list; partition; partition = partition->next) {
+ status = HYDU_sock_gethostbyname(partition->name, &partition->sa,
+ handle.proxy_port);
+ HYDU_ERR_POP(status, "unable to get sockaddr information\n");
+ }
+
if (handle.launch_mode == HYD_LAUNCH_RUNTIME) {
status = launch_procs_in_runtime_mode();
HYDU_ERR_POP(status, "error launching procs in runtime mode\n");
Modified: mpich2/trunk/src/pm/hydra/utils/launch/launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/launch/launch.c 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/utils/launch/launch.c 2009-04-01 02:12:40 UTC (rev 4229)
@@ -82,3 +82,46 @@
fn_fail:
goto fn_exit;
}
+
+
+HYD_Status HYDU_create_thread(void *(func)(void *), void *args,
+ struct HYD_Thread_context *ctxt)
+{
+ int ret;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ ret = pthread_create(&ctxt->thread, NULL, func, args);
+ if (ret < 0)
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "pthread create failed (%s)\n",
+ HYDU_strerror(errno));
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYDU_join_thread(struct HYD_Thread_context ctxt)
+{
+ int ret;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ ret = pthread_join(ctxt.thread, NULL);
+ if (ret < 0)
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "pthread join failed (%s)\n",
+ HYDU_strerror(errno));
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
Modified: mpich2/trunk/src/pm/hydra/utils/sock/sock.c
===================================================================
--- mpich2/trunk/src/pm/hydra/utils/sock/sock.c 2009-03-31 17:56:21 UTC (rev 4228)
+++ mpich2/trunk/src/pm/hydra/utils/sock/sock.c 2009-04-01 02:12:40 UTC (rev 4229)
@@ -105,25 +105,39 @@
}
-HYD_Status HYDU_sock_connect(const char *host, uint16_t port, int *fd)
+HYD_Status HYDU_sock_gethostbyname(const char *host, struct sockaddr_in *sa, uint16_t port)
{
- struct sockaddr_in sa;
struct hostent *ht;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
- memset((char *) &sa, 0, sizeof(sa));
- sa.sin_family = AF_INET;
- sa.sin_port = htons(port);
+ memset((char *) sa, 0, sizeof(struct sockaddr_in));
+ (*sa).sin_family = AF_INET;
+ (*sa).sin_port = htons(port);
/* Get the remote host's IP address */
ht = gethostbyname(host);
if (ht == NULL)
HYDU_ERR_SETANDJUMP1(status, HYD_INVALID_PARAM,
"unable to get host address (%s)\n", HYDU_strerror(errno));
- memcpy(&sa.sin_addr, ht->h_addr_list[0], ht->h_length);
+ memcpy(&sa->sin_addr, ht->h_addr_list[0], ht->h_length);
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYDU_sock_connect(struct sockaddr_in sa, int *fd)
+{
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
/* Create a socket and set the required options */
*fd = socket(AF_INET, SOCK_STREAM, 0);
if (*fd < 0)
@@ -148,6 +162,36 @@
}
+HYD_Status HYDU_sock_tryconnect(struct sockaddr_in sa, int *fd)
+{
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Create a socket and set the required options */
+ *fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (*fd < 0)
+ HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "cannot open socket (%s)\n",
+ HYDU_strerror(errno));
+
+ /* Not being able to connect is not an error in all cases. So we
+ * return an error, but only print a warning message. The upper
+ * layer can decide what to do with the return status. */
+ if (connect(*fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) {
+ HYDU_Warn_printf("connect error (%s)\n", HYDU_strerror(errno));
+ status = HYD_SOCK_ERROR;
+ goto fn_fail;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
HYD_Status HYDU_sock_accept(int listen_fd, int *fd)
{
HYD_Status status = HYD_SUCCESS;
More information about the mpich2-commits
mailing list