[mpich2-commits] r4154 - in mpich2/trunk/src/pm/hydra: . pm pm/pmiserv pm/utils
balaji at mcs.anl.gov
balaji at mcs.anl.gov
Fri Mar 20 18:39:20 CDT 2009
Author: balaji
Date: 2009-03-20 18:39:20 -0500 (Fri, 20 Mar 2009)
New Revision: 4154
Added:
mpich2/trunk/src/pm/hydra/pm/pmiserv/
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
Removed:
mpich2/trunk/src/pm/hydra/pm/central/
mpich2/trunk/src/pm/hydra/pm/pmiserv/central.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/central_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/central_finalize.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/central_launch.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy.h
mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy_cb.c
mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy_utils.c
mpich2/trunk/src/pm/hydra/pm/utils/pmcu_pmi.h
Modified:
mpich2/trunk/src/pm/hydra/configure.in
mpich2/trunk/src/pm/hydra/pm/Makefile.sm
mpich2/trunk/src/pm/hydra/pm/pmiserv/Makefile.sm
mpich2/trunk/src/pm/hydra/pm/utils/pmi.c
Log:
Process manager code refactoring. This is not complete yet, but should
compile and work fine. There is still a lot more ugly code in here,
which needs to be cleaned.
Modified: mpich2/trunk/src/pm/hydra/configure.in
===================================================================
--- mpich2/trunk/src/pm/hydra/configure.in 2009-03-20 22:24:07 UTC (rev 4153)
+++ mpich2/trunk/src/pm/hydra/configure.in 2009-03-20 23:39:20 UTC (rev 4154)
@@ -91,9 +91,9 @@
dnl Check what process manager we should use
-AC_ARG_WITH(hydra-pm, [ --with-hydra-pm Process Manager (central or distributed)],
+AC_ARG_WITH(hydra-pm, [ --with-hydra-pm Process Manager],
[ hydra_pm=$withval ],
- [ hydra_pm=central ])
+ [ hydra_pm=pmiserv ])
AC_MSG_CHECKING(process manager)
AC_MSG_RESULT($hydra_pm)
AC_SUBST(hydra_pm)
Modified: mpich2/trunk/src/pm/hydra/pm/Makefile.sm
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/Makefile.sm 2009-03-20 22:24:07 UTC (rev 4153)
+++ mpich2/trunk/src/pm/hydra/pm/Makefile.sm 2009-03-20 23:39:20 UTC (rev 4154)
@@ -4,7 +4,7 @@
# See COPYRIGHT in top-level directory.
#
-SUBDIRS_hydra_pm = central
+SUBDIRS_hydra_pm = pmiserv
SUBDIRS = utils @hydra_pm@ .
mpich2-build-install mpich2-build-uninstall install install-alt:
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central)
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv
___________________________________________________________________
Name: svn:ignore
+ Makefile
Makefile.in
configure
configure.lineno
config.*
hydra_config.h*
autom4te*
mpiexec
*.gcov
*.gcda
*.gcno
Name: svn:mergeinfo
+
Modified: mpich2/trunk/src/pm/hydra/pm/pmiserv/Makefile.sm
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/Makefile.sm 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/Makefile.sm 2009-03-20 23:39:20 UTC (rev 4154)
@@ -5,7 +5,7 @@
#
libhydra_a_DIR = ../../lib
-libhydra_a_SOURCES = central_launch.c central_finalize.c central_cb.c
+libhydra_a_SOURCES = pmi_serv_launch.c pmi_serv_finalize.c pmi_serv_cb.c pmi_query.c
INCLUDES = -I${abs_srcdir}/../../include \
-I${abs_srcdir}/../../../../include \
-I../../include \
@@ -17,10 +17,10 @@
-I../../bootstrap/include \
-I${abs_srcdir}/../../demux
-hydproxy_SOURCES = proxy.c proxy_utils.c proxy_cb.c
-hydproxy_LDADD = -L../../lib -lhydra -L../../../util -lmpiexec -L../../../../../lib -lmpich
-hydproxy_DEPADD = ../../../util/libmpiexec.a
-install_BIN = hydproxy
+pmi_proxy_SOURCES = pmi_proxy.c pmi_proxy_utils.c pmi_proxy_cb.c pmi_query.c
+pmi_proxy_LDADD = -L../../lib -lhydra -L../../../util -lmpiexec -L../../../../../lib -lmpich
+pmi_proxy_DEPADD = ../../../util/libmpiexec.a
+install_BIN = pmi_proxy
# Use the mpich2-build-install target to include mpiexec in the build bin
# directory (all pm's require these targets)
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/central.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/central.h 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/central.h 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,21 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#ifndef CENTRAL_H_INCLUDED
-#define CENTRAL_H_INCLUDED
-
-/* Currently we only have one command */
-enum HYD_Proxy_cmds {
- KILLALL_PROCS
-};
-
-extern int HYD_PMCD_Central_listenfd;
-
-HYD_Status HYD_PMCD_Central_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_PMCD_Central_cleanup(void);
-void HYD_PMCD_Central_signal_cb(int signal);
-
-#endif /* CENTRAL_H_INCLUDED */
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/central_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/central_cb.c 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/central_cb.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,225 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "hydra_utils.h"
-#include "pmcu_pmi.h"
-#include "pmci.h"
-#include "bsci.h"
-#include "demux.h"
-#include "central.h"
-
-int HYD_PMCD_Central_listenfd;
-HYD_Handle handle;
-
-/*
- * HYD_PMCD_Central_cb: This is the core PMI server part of the
- * code. Here's the expected protocol:
- *
- * 1. The client (MPI process) connects to us, which will result in an
- * event on the listener socket.
- *
- * 2. The client sends us a "cmd" or "mcmd" string which means that a
- * single or multi-line command is to follow.
- *
- * 3. Here are the commands that we respect:
- * - initack [done]
- * - init [done]
- * - get_maxes [done]
- * - get_appnum [done]
- * - get_my_kvsname [done]
- * - barrier_in [done]
- * - put [done]
- * - get [done]
- * - finalize [done]
- * - get_universe_size [done]
- * - abort
- * - create_kvs
- * - destroy_kvs
- * - getbyidx
- * - spawn
- */
-HYD_Status HYD_PMCD_Central_cb(int fd, HYD_Event_t events)
-{
- int accept_fd, linelen, i;
- char *buf, *cmd, *args[HYD_PMCU_NUM_STR];
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- HYDU_MALLOC(buf, char *, HYD_TMPBUF_SIZE, status);
-
- if (fd == HYD_PMCD_Central_listenfd) { /* Someone is trying to connect to us */
- status = HYDU_sock_accept(fd, &accept_fd);
- HYDU_ERR_POP(status, "accept error\n");
-
- status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_Central_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
- }
- else {
- status = HYDU_sock_readline(fd, buf, HYD_TMPBUF_SIZE, &linelen);
- HYDU_ERR_POP(status, "PMI read line error\n");
-
- if (linelen == 0) {
- /* This is not a clean close. If a finalize was called, we
- * would have deregistered this socket. The application
- * might have aborted. Just cleanup all the processes */
- status = HYD_PMCD_Central_cleanup();
- if (status != HYD_SUCCESS) {
- HYDU_Warn_printf("bootstrap server returned error cleaning up processes\n");
- status = HYD_SUCCESS;
- goto fn_fail;
- }
-
- status = HYD_DMX_deregister_fd(fd);
- if (status != HYD_SUCCESS) {
- HYDU_Warn_printf("unable to deregister fd %d\n", fd);
- status = HYD_SUCCESS;
- goto fn_fail;
- }
-
- close(fd);
- goto fn_exit;
- }
-
- /* Check what command we got and call the appropriate
- * function */
- buf[linelen - 1] = 0;
-
- cmd = strtok(buf, " ");
- for (i = 0; i < HYD_PMCU_NUM_STR; i++) {
- args[i] = strtok(NULL, " ");
- if (args[i] == NULL)
- break;
- }
-
- if (cmd == NULL) {
- status = HYD_SUCCESS;
- }
- else if (!strcmp("cmd=initack", cmd)) {
- status = HYD_PMCU_pmi_initack(fd, args);
- }
- else if (!strcmp("cmd=init", cmd)) {
- status = HYD_PMCU_pmi_init(fd, args);
- }
- else if (!strcmp("cmd=get_maxes", cmd)) {
- status = HYD_PMCU_pmi_get_maxes(fd, args);
- }
- else if (!strcmp("cmd=get_appnum", cmd)) {
- status = HYD_PMCU_pmi_get_appnum(fd, args);
- }
- else if (!strcmp("cmd=get_my_kvsname", cmd)) {
- status = HYD_PMCU_pmi_get_my_kvsname(fd, args);
- }
- else if (!strcmp("cmd=barrier_in", cmd)) {
- status = HYD_PMCU_pmi_barrier_in(fd, args);
- }
- else if (!strcmp("cmd=put", cmd)) {
- status = HYD_PMCU_pmi_put(fd, args);
- }
- else if (!strcmp("cmd=get", cmd)) {
- status = HYD_PMCU_pmi_get(fd, args);
- }
- else if (!strcmp("cmd=finalize", cmd)) {
- status = HYD_PMCU_pmi_finalize(fd, args);
-
- if (status == HYD_SUCCESS) {
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to register fd\n");
- close(fd);
- }
- }
- else if (!strcmp("cmd=get_universe_size", cmd)) {
- status = HYD_PMCU_pmi_get_usize(fd, args);
- }
- else {
- /* We don't understand the command */
- HYDU_Error_printf("Unrecognized PMI command: %s; cleaning up processes\n", cmd);
-
- /* Cleanup all the processes and return. We don't need to
- * check the return status since we are anyway returning
- * an error */
- HYD_PMCD_Central_cleanup();
- status = HYD_SUCCESS;
- goto fn_fail;
- }
-
- HYDU_ERR_POP(status, "PMI server returned error\n");
- }
-
- fn_exit:
- HYDU_FREE(buf);
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCD_Central_cleanup(void)
-{
- struct HYD_Proc_params *proc_params;
- struct HYD_Partition_list *partition;
- int fd;
- enum HYD_Proxy_cmds cmd;
- HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- /* FIXME: Instead of doing this from this process itself, fork a
- * bunch of processes to do this. */
- /* Connect to all proxies and send a KILL command */
- cmd = KILLALL_PROCS;
- for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition; partition = partition->next) {
- status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
- if (status != HYD_SUCCESS) {
- HYDU_Warn_printf("unable to connect to the proxy on %s\n", partition->name);
- overall_status = HYD_INTERNAL_ERROR;
- continue; /* Move on to the next proxy */
- }
-
- status = HYDU_sock_write(fd, &cmd, sizeof(cmd));
- if (status != HYD_SUCCESS) {
- HYDU_Warn_printf("unable to send data to the proxy on %s\n", partition->name);
- overall_status = HYD_INTERNAL_ERROR;
- continue; /* Move on to the next proxy */
- }
-
- close(fd);
- }
- }
-
- HYDU_FUNC_EXIT();
-
- return overall_status;
-}
-
-
-void HYD_PMCD_Central_signal_cb(int signal)
-{
- HYDU_FUNC_ENTER();
-
- if (signal == SIGINT || signal == SIGQUIT || signal == SIGTERM
-#if defined SIGSTOP
- || signal == SIGSTOP
-#endif /* SIGSTOP */
-#if defined SIGCONT
- || signal == SIGCONT
-#endif /* SIGSTOP */
-) {
- /* There's nothing we can do with the return value for now. */
- HYD_PMCD_Central_cleanup();
- exit(-1);
- }
- else {
- /* Ignore other signals for now */
- }
-
- HYDU_FUNC_EXIT();
- return;
-}
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/central_finalize.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/central_finalize.c 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/central_finalize.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,42 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "pmci.h"
-#include "pmcu_pmi.h"
-#include "bsci.h"
-#include "demux.h"
-#include "central.h"
-
-int HYD_PMCD_Central_listenfd;
-
-HYD_Status HYD_PMCI_finalize(void)
-{
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- /* Deregister the listen socket from the demux engine and close
- * it. */
- status = HYD_DMX_deregister_fd(HYD_PMCD_Central_listenfd);
- HYDU_ERR_POP(status, "unable to deregister fd\n");
-
- close(HYD_PMCD_Central_listenfd);
- HYD_PMCD_Central_listenfd = -1;
-
- status = HYD_PMCU_finalize();
- HYDU_ERR_POP(status, "unable to finalize process manager utils\n");
-
- status = HYD_BSCI_finalize();
- HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/central_launch.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/central_launch.c 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/central_launch.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,215 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "hydra_utils.h"
-#include "pmci.h"
-#include "pmcu_pmi.h"
-#include "bsci.h"
-#include "demux.h"
-#include "central.h"
-
-int HYD_PMCD_Central_listenfd;
-HYD_Handle handle;
-
-/*
- * HYD_PMCI_launch_procs: Here are the steps we follow:
- *
- * 1. Find what all ports the user wants to allow and listen on one of
- * those ports.
- *
- * 2. Create a call-back function to accept connections and register
- * the listening socket with the demux engine.
- *
- * 3. Create a port string out of this hostname and port and add it to
- * the environment list under the variable "PMI_PORT".
- *
- * 4. Create an environment variable for PMI_ID. This is an
- * auto-incrementing variable; the bootstrap server will take care of
- * adding the process ID to the start value.
- *
- * 5. Create a process info setup and ask the bootstrap server to
- * launch the processes.
- */
-HYD_Status HYD_PMCI_launch_procs(void)
-{
- char *port_range, *port_str, *sport, *str;
- uint16_t port;
- int i, arg;
- int process_id, group_id;
- char hostname[MAX_HOSTNAME_LEN];
- HYD_Env_t *env;
- char *path_str[HYDU_NUM_JOIN_STR];
- struct HYD_Proc_params *proc_params;
- struct HYD_Partition_list *partition;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- status = HYDU_set_common_signals(HYD_PMCD_Central_signal_cb);
- HYDU_ERR_POP(status, "unable to set signal\n");
-
- /* Check if the user wants us to use a port within a certain
- * range. */
- port_range = getenv("MPIEXEC_PORTRANGE");
- if (!port_range)
- port_range = getenv("MPIEXEC_PORT_RANGE");
- if (!port_range)
- port_range = getenv("MPICH_PORT_RANGE");
-
- /* Listen on a port in the port range */
- port = 0;
- status = HYDU_sock_listen(&HYD_PMCD_Central_listenfd, port_range, &port);
- HYDU_ERR_POP(status, "unable to listen on port\n");
-
- /* Register the listening socket with the demux engine */
- status = HYD_DMX_register_fd(1, &HYD_PMCD_Central_listenfd, HYD_STDOUT,
- HYD_PMCD_Central_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
-
- /* Create a port string for MPI processes to use to connect to */
- if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0)
- HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR,
- "gethostname error (hostname: %s; errno: %d)\n", hostname,
- errno);
-
- status = HYDU_int_to_str(port, &sport);
- HYDU_ERR_POP(status, "cannot convert int to string\n");
-
- HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
- MPIU_Snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
- "%s:%s", hostname, sport);
- HYDU_FREE(sport);
- HYDU_Debug("Process manager listening on PMI port %s\n", port_str);
-
- status = HYDU_env_create(&env, "PMI_PORT", port_str);
- HYDU_ERR_POP(status, "unable to create env\n");
-
- status = HYDU_append_env_to_list(*env, &handle.system_env);
- HYDU_ERR_POP(status, "unable to add env to list\n");
-
- HYDU_env_free(env);
- HYDU_FREE(port_str);
-
- /* Create a process group for the MPI processes in this
- * comm_world */
- status = HYD_PMCU_create_pg();
- HYDU_ERR_POP(status, "unable to create process group\n");
-
- /* Create the arguments list for each proxy */
- process_id = 0;
- group_id = 0;
- for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
- for (partition = proc_params->partition; partition; partition = partition->next) {
-
- partition->group_id = group_id++;
- partition->group_rank = 0;
-
- for (arg = 0; partition->args[arg]; arg++);
- i = 0;
- path_str[i++] = MPIU_Strdup(handle.base_path);
- path_str[i++] = MPIU_Strdup("hydproxy");
- path_str[i] = NULL;
- status = HYDU_str_alloc_and_join(path_str, &partition->args[arg++]);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- HYDU_free_strlist(path_str);
-
- status = HYDU_int_to_str(partition->proc_count, &str);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- partition->args[arg++] = MPIU_Strdup("--proc-count");
- partition->args[arg++] = MPIU_Strdup(str);
-
- partition->args[arg++] = MPIU_Strdup("--partition");
- partition->args[arg++] = MPIU_Strdup(partition->name);
- partition->args[arg++] = MPIU_Strdup(str);
- HYDU_FREE(str);
-
- status = HYDU_int_to_str(process_id, &str);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- partition->args[arg++] = MPIU_Strdup("--pmi-id");
- partition->args[arg++] = MPIU_Strdup(str);
- HYDU_FREE(str);
-
- status = HYDU_int_to_str(handle.proxy_port, &str);
- HYDU_ERR_POP(status, "unable to convert in to string\n");
-
- partition->args[arg++] = MPIU_Strdup("--proxy-port");
- partition->args[arg++] = MPIU_Strdup(str);
- HYDU_FREE(str);
-
- partition->args[arg++] = MPIU_Strdup("--wdir");
- partition->args[arg++] = MPIU_Strdup(handle.wdir);
-
- partition->args[arg++] = MPIU_Strdup("--environment");
- i = 0;
- for (env = handle.system_env; env; env = env->next)
- i++;
- for (env = handle.prop_env; env; env = env->next)
- i++;
- for (env = proc_params->prop_env; env; env = env->next)
- i++;
- status = HYDU_int_to_str(i, &str);
- HYDU_ERR_POP(status, "unable to convert in to string\n");
-
- partition->args[arg++] = MPIU_Strdup(str);
- partition->args[arg++] = NULL;
-
- HYDU_list_append_env_to_str(handle.system_env, partition->args);
- HYDU_list_append_env_to_str(handle.prop_env, partition->args);
- HYDU_list_append_env_to_str(proc_params->prop_env, partition->args);
-
- for (arg = 0; partition->args[arg]; arg++);
- partition->args[arg] = NULL;
- HYDU_list_append_strlist(proc_params->exec, partition->args);
-
- if (handle.debug) {
- HYDU_Debug("Executable passed to the bootstrap: ");
- HYDU_print_strlist(partition->args);
- }
-
- process_id += partition->proc_count;
- }
- }
-
- /* Initialize the bootstrap server and ask it to launch the
- * processes. */
- status = HYD_BSCI_init(handle.bootstrap);
- HYDU_ERR_POP(status, "bootstrap server initialization failed\n");
-
- status = HYD_BSCI_launch_procs();
- HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCI_wait_for_completion(void)
-{
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- status = HYD_BSCI_wait_for_completion();
- if (status != HYD_SUCCESS) {
- status = HYD_PMCD_Central_cleanup();
- HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/proxy.c)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,200 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "hydra.h"
+#include "hydra_utils.h"
+#include "demux.h"
+#include "pmi_proxy.h"
+
+struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+int HYD_PMCD_pmi_proxy_listenfd;
+
+int main(int argc, char **argv)
+{
+ int i, j, arg, count, pid, ret_status;
+ int stdin_fd, timeout;
+ char *str, *timeout_str;
+ char *client_args[HYD_EXEC_ARGS];
+ char *tmp[HYDU_NUM_JOIN_STR];
+ HYD_Status status = HYD_SUCCESS;
+
+ status = HYD_PMCD_pmi_proxy_get_params(argc, argv);
+ HYDU_ERR_POP(status, "bad parameters passed to the proxy\n");
+
+ status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
+ (uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
+ HYDU_ERR_POP(status, "unable to listen on socket\n");
+
+ /* Register the listening socket with the demux engine */
+ status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT,
+ HYD_PMCD_pmi_proxy_listen_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+
+ /* FIXME: We do not use the bootstrap server right now, as the
+ * current implementation of the bootstrap launch directly reads
+ * the executable information from the HYD_Handle structure. Since
+ * we are a different process, we don't share this
+ * structure. Without the bootstrap server, we can only launch
+ * local processes. That is, we can only have a single-level
+ * hierarchy of proxies. */
+
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.out, int *,
+ HYD_PMCD_pmi_proxy_params.proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.err, int *,
+ HYD_PMCD_pmi_proxy_params.proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.pid, int *,
+ HYD_PMCD_pmi_proxy_params.proc_count * sizeof(int), status);
+ HYDU_MALLOC(HYD_PMCD_pmi_proxy_params.exit_status, int *,
+ HYD_PMCD_pmi_proxy_params.proc_count * sizeof(int), status);
+
+ /* Initialize the exit status */
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++)
+ HYD_PMCD_pmi_proxy_params.exit_status[i] = -1;
+
+ /* Spawn the processes */
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++) {
+
+ j = 0;
+ tmp[j++] = MPIU_Strdup("PMI_ID=");
+ status = HYDU_int_to_str(HYD_PMCD_pmi_proxy_params.pmi_id + i, &str);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ tmp[j++] = MPIU_Strdup(str);
+ HYDU_FREE(str);
+ tmp[j++] = NULL;
+ status = HYDU_str_alloc_and_join(tmp, &str);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ HYDU_putenv(str);
+ for (j = 0; tmp[j]; j++)
+ HYDU_FREE(tmp[j]);
+
+ if (chdir(HYD_PMCD_pmi_proxy_params.wdir) < 0)
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
+ "unable to change wdir (%s)\n", HYDU_strerror(errno));
+
+ for (j = 0, arg = 0; HYD_PMCD_pmi_proxy_params.args[j]; j++)
+ client_args[arg++] = MPIU_Strdup(HYD_PMCD_pmi_proxy_params.args[j]);
+ client_args[arg++] = NULL;
+
+ /* FIXME: We need to figure out how many total number of
+ * processes are there on this partition, and appropriately
+ * bind them. */
+ if ((i + HYD_PMCD_pmi_proxy_params.pmi_id) == 0) {
+ status = HYDU_create_process(client_args, &HYD_PMCD_pmi_proxy_params.in,
+ &HYD_PMCD_pmi_proxy_params.out[i],
+ &HYD_PMCD_pmi_proxy_params.err[i],
+ &HYD_PMCD_pmi_proxy_params.pid[i], i);
+ }
+ else {
+ status = HYDU_create_process(client_args, NULL,
+ &HYD_PMCD_pmi_proxy_params.out[i],
+ &HYD_PMCD_pmi_proxy_params.err[i],
+ &HYD_PMCD_pmi_proxy_params.pid[i],
+ i);
+ }
+ HYDU_ERR_POP(status, "spawn process returned error\n");
+
+ HYDU_FREE(str);
+ }
+
+ /* Everything is spawned, now wait for I/O */
+ status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.proc_count,
+ HYD_PMCD_pmi_proxy_params.out,
+ HYD_STDOUT, HYD_PMCD_pmi_proxy_stdout_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+
+ status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.proc_count,
+ HYD_PMCD_pmi_proxy_params.err,
+ HYD_STDOUT, HYD_PMCD_pmi_proxy_stderr_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+
+ if (HYD_PMCD_pmi_proxy_params.pmi_id == 0) {
+ status = HYDU_sock_set_nonblock(HYD_PMCD_pmi_proxy_params.in);
+ HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+ stdin_fd = 0;
+ status = HYDU_sock_set_nonblock(stdin_fd);
+ HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
+
+ HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
+ HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
+ status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, HYD_PMCD_pmi_proxy_stdin_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ }
+
+ timeout_str = getenv("MPIEXEC_TIMEOUT");
+ if (timeout_str)
+ timeout = atoi(timeout_str);
+ else
+ timeout = -1;
+
+ while (1) {
+ /* Wait for some event to occur */
+ status = HYD_DMX_wait_for_event(timeout);
+ HYDU_ERR_POP(status, "demux engine error waiting for event\n");
+
+ /* Check to see if there's any open read socket left; if there
+ * are, we will just wait for more events. */
+ count = 0;
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++) {
+ if (HYD_PMCD_pmi_proxy_params.out[i] != -1 || HYD_PMCD_pmi_proxy_params.err[i] != -1) {
+ count++;
+ break;
+ }
+ }
+
+ /* We are done */
+ if (!count)
+ break;
+ }
+
+ /* FIXME: If we did not break out yet, add a small usleep to yield
+ * CPU here. We can not just sleep for the remaining time, as the
+ * timeout value might be large and the application might exit
+ * much quicker. Note that the sched_yield() call is broken on
+ * newer linux kernel versions and should not be used. */
+ /* Once all the sockets are closed, wait for all the processes to
+ * finish. We poll here, but hopefully not for too long. */
+ do {
+ pid = waitpid(-1, &ret_status, WNOHANG);
+
+ /* Find the pid and mark it as complete. */
+ if (pid > 0)
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++)
+ if (HYD_PMCD_pmi_proxy_params.pid[i] == pid)
+ HYD_PMCD_pmi_proxy_params.exit_status[i] = WEXITSTATUS(ret_status);
+
+ /* Check how many more processes are pending */
+ count = 0;
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++) {
+ if (HYD_PMCD_pmi_proxy_params.exit_status[i] == -1) {
+ count++;
+ break;
+ }
+ }
+
+ if (count == 0)
+ break;
+
+ /* Check if there are any messages from the launcher */
+ status = HYD_DMX_wait_for_event(0);
+ HYDU_ERR_POP(status, "demux engine error waiting for event\n");
+ } while (1);
+
+ ret_status = 0;
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++)
+ ret_status |= HYD_PMCD_pmi_proxy_params.exit_status[i];
+
+ fn_exit:
+ if (status != HYD_SUCCESS)
+ return -1;
+ else
+ return ret_status;
+
+ fn_fail:
+ goto fn_exit;
+}
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.c
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/proxy.h)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,43 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#ifndef PMI_PROXY_H_INCLUDED
+#define PMI_PROXY_H_INCLUDED
+
+#include "hydra_base.h"
+#include "hydra_utils.h"
+
+struct HYD_PMCD_pmi_proxy_params {
+ HYD_Env_t *global_env;
+ HYD_Env_t *env_list;
+ int proc_count;
+ int proxy_port;
+ int pmi_id;
+ char *args[HYD_EXEC_ARGS];
+ char *wdir;
+ struct HYD_Partition_list *partition;
+
+ int *pid;
+ int *out;
+ int *err;
+ int *exit_status;
+ int in;
+
+ int stdin_buf_offset;
+ int stdin_buf_count;
+ char stdin_tmp_buf[HYD_TMPBUF_SIZE];
+};
+
+extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+extern int HYD_PMCD_pmi_proxy_listenfd;
+
+HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv);
+HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events);
+
+#endif /* PMI_PROXY_H_INCLUDED */
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy.h
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/proxy_cb.c)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,160 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "hydra.h"
+#include "hydra_utils.h"
+#include "pmi_proxy.h"
+#include "demux.h"
+#include "pmi_serv.h"
+
+struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+int HYD_PMCD_pmi_proxy_listenfd;
+
+HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events)
+{
+ int accept_fd, count, i;
+ enum HYD_PMCD_pmi_proxy_cmds cmd;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ if (events & HYD_STDIN)
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdout handler got stdin event\n");
+
+ if (fd == HYD_PMCD_pmi_proxy_listenfd) { /* mpiexec is trying to connect */
+ status = HYDU_sock_accept(fd, &accept_fd);
+ HYDU_ERR_POP(status, "accept error\n");
+
+ status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_pmi_proxy_listen_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ }
+ else { /* We got a command from mpiexec */
+ count = read(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
+ if (count < 0) {
+ HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n",
+ fd, HYDU_strerror(errno));
+ }
+ else if (count == 0) {
+ /* The connection has closed */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+ close(fd);
+ goto fn_exit;
+ }
+
+ if (cmd == KILLALL_PROCS) { /* Got the killall command */
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++)
+ if (HYD_PMCD_pmi_proxy_params.pid[i] != -1)
+ kill(HYD_PMCD_pmi_proxy_params.pid[i], SIGKILL);
+
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ close(fd);
+ }
+ else {
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+ "got unrecognized command from mpiexec\n");
+ }
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events)
+{
+ int closed, i;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYDU_sock_stdout_cb(fd, events, 1, &closed);
+ HYDU_ERR_POP(status, "stdout callback error\n");
+
+ if (closed) {
+ /* The connection has closed */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++)
+ if (HYD_PMCD_pmi_proxy_params.out[i] == fd)
+ HYD_PMCD_pmi_proxy_params.out[i] = -1;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events)
+{
+ int closed, i;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYDU_sock_stdout_cb(fd, events, 2, &closed);
+ HYDU_ERR_POP(status, "stdout callback error\n");
+
+ if (closed) {
+ /* The connection has closed */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+ for (i = 0; i < HYD_PMCD_pmi_proxy_params.proc_count; i++)
+ if (HYD_PMCD_pmi_proxy_params.err[i] == fd)
+ HYD_PMCD_pmi_proxy_params.err[i] = -1;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events)
+{
+ int closed;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYDU_sock_stdin_cb(HYD_PMCD_pmi_proxy_params.in, events,
+ HYD_PMCD_pmi_proxy_params.stdin_tmp_buf,
+ &HYD_PMCD_pmi_proxy_params.stdin_buf_count,
+ &HYD_PMCD_pmi_proxy_params.stdin_buf_offset, &closed);
+ HYDU_ERR_POP(status, "stdin callback error\n");
+
+ if (closed) {
+ /* The connection has closed */
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+ close(HYD_PMCD_pmi_proxy_params.in);
+ close(fd);
+
+ HYD_PMCD_pmi_proxy_params.in = -1;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/proxy_utils.c)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,117 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "hydra.h"
+#include "pmi_proxy.h"
+
+struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
+
+HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv)
+{
+ int argc = t_argc;
+ char **argv = t_argv, *str;
+ int arg, i, count;
+ struct HYD_Partition_list *partition, *run;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ HYD_PMCD_pmi_proxy_params.global_env = NULL;
+ HYD_PMCD_pmi_proxy_params.env_list = NULL;
+ HYD_PMCD_pmi_proxy_params.partition = NULL;
+
+ status = HYDU_list_global_env(&HYD_PMCD_pmi_proxy_params.global_env);
+ HYDU_ERR_POP(status, "unable to get the global env list\n");
+
+ while (--argc && ++argv) {
+
+ /* Process count */
+ if (!strcmp(*argv, "--proc-count")) {
+ argv++;
+ HYD_PMCD_pmi_proxy_params.proc_count = atoi(*argv);
+ continue;
+ }
+
+ /* Proxy port */
+ if (!strcmp(*argv, "--proxy-port")) {
+ argv++;
+ HYD_PMCD_pmi_proxy_params.proxy_port = atoi(*argv);
+ continue;
+ }
+
+ /* PMI_ID: This is the PMI_ID for the first process;
+ * everything else is incremented from here. */
+ if (!strcmp(*argv, "--pmi-id")) {
+ argv++;
+ HYD_PMCD_pmi_proxy_params.pmi_id = atoi(*argv);
+ continue;
+ }
+
+ /* Partition information is passed as two parameters; name
+ * followed by proc count. Multiple partitions are specified
+ * as multiple parameters. */
+ if (!strcmp(*argv, "--partition")) {
+ argv++;
+ HYDU_alloc_partition(&partition);
+ partition->name = MPIU_Strdup(*argv);
+ argv++;
+ partition->proc_count = atoi(*argv);
+
+ if (!HYD_PMCD_pmi_proxy_params.partition)
+ HYD_PMCD_pmi_proxy_params.partition = partition;
+ else {
+ for (run = HYD_PMCD_pmi_proxy_params.partition; run->next; run = run->next);
+ run->next = partition;
+ }
+ continue;
+ }
+
+ /* Working directory */
+ if (!strcmp(*argv, "--wdir")) {
+ argv++;
+ HYD_PMCD_pmi_proxy_params.wdir = MPIU_Strdup(*argv);
+ continue;
+ }
+
+ /* Environment information is passed as a list of names; we
+ * need to find the values from our environment. */
+ if (!strcmp(*argv, "--environment")) {
+ argv++;
+ count = atoi(*argv);
+ for (i = 0; i < count; i++) {
+ argv++;
+ str = *argv;
+
+ /* Some bootstrap servers remove the quotes that we
+ * added, while some others do not. For the cases
+ * where they are not removed, we do it ourselves. */
+ if (*str == '\'') {
+ str++;
+ str[strlen(str) - 1] = 0;
+ }
+ HYDU_putenv(str);
+ }
+ continue;
+ }
+
+ /* Fall through case is application parameters. Load
+ * everything into the args variable. */
+ for (arg = 0; *argv;) {
+ HYD_PMCD_pmi_proxy_params.args[arg++] = MPIU_Strdup(*argv);
+ ++argv;
+ --argc;
+ }
+ HYD_PMCD_pmi_proxy_params.args[arg++] = NULL;
+ break;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
___________________________________________________________________
Name: svn:mergeinfo
+
Added: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.c (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,674 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "hydra.h"
+#include "hydra_utils.h"
+#include "bsci.h"
+#include "pmi_query.h"
+
+HYD_Handle handle;
+HYD_PMCD_pmi_pg_t *pg_list = NULL;
+
+static HYD_Status allocate_kvs(HYD_PMCD_pmi_kvs_t ** kvs, int pgid)
+{
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ HYDU_MALLOC(*kvs, HYD_PMCD_pmi_kvs_t *, sizeof(HYD_PMCD_pmi_kvs_t), status);
+ MPIU_Snprintf((*kvs)->kvs_name, MAXNAMELEN, "kvs_%d_%d", (int) getpid(), pgid);
+ (*kvs)->key_pair = NULL;
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+static HYD_Status create_pg(HYD_PMCD_pmi_pg_t ** pg, int pgid)
+{
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ HYDU_MALLOC(*pg, HYD_PMCD_pmi_pg_t *, sizeof(HYD_PMCD_pmi_pg_t), status);
+ (*pg)->id = pgid;
+ (*pg)->num_procs = 0;
+ (*pg)->barrier_count = 0;
+ (*pg)->process = NULL;
+
+ status = allocate_kvs(&(*pg)->kvs, pgid);
+ HYDU_ERR_POP(status, "unable to allocate kvs space\n");
+
+ (*pg)->next = NULL;
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+static HYD_Status add_process_to_pg(HYD_PMCD_pmi_pg_t * pg, int fd)
+{
+ HYD_PMCD_pmi_process_t *process, *tmp;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ HYDU_MALLOC(process, HYD_PMCD_pmi_process_t *, sizeof(HYD_PMCD_pmi_process_t), status);
+ process->fd = fd;
+ process->pg = pg;
+ process->next = NULL;
+ if (pg->process == NULL)
+ pg->process = process;
+ else {
+ tmp = pg->process;
+ while (tmp->next)
+ tmp = tmp->next;
+ tmp->next = process;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_create_pg(void)
+{
+ struct HYD_Proc_params *proc_params;
+ int num_procs;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Find the number of processes in the PG */
+ num_procs = 0;
+ proc_params = handle.proc_params;
+ while (proc_params) {
+ num_procs += proc_params->exec_proc_count;
+ proc_params = proc_params->next;
+ }
+
+ status = create_pg(&pg_list, 0);
+ HYDU_ERR_POP(status, "unable to create pg\n");
+ pg_list->num_procs = num_procs;
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_initack(int fd, char *args[])
+{
+ int id, size, debug, i;
+ char *ssize, *srank, *sdebug, *tmp[HYDU_NUM_JOIN_STR], *cmd;
+ struct HYD_Proc_params *proc_params;
+ HYD_PMCD_pmi_pg_t *run;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ strtok(args[0], "=");
+ id = atoi(strtok(NULL, "="));
+
+ size = 0;
+ proc_params = handle.proc_params;
+ while (proc_params) {
+ size += proc_params->exec_proc_count;
+ proc_params = proc_params->next;
+ }
+ debug = handle.debug;
+
+ status = HYDU_int_to_str(size, &ssize);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ status = HYDU_int_to_str(id, &srank);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ status = HYDU_int_to_str(debug, &sdebug);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ i = 0;
+ tmp[i++] = "cmd=initack\ncmd=set size=";
+ tmp[i++] = ssize;
+ tmp[i++] = "\ncmd=set rank=";
+ tmp[i++] = srank;
+ tmp[i++] = "\ncmd=set debug=";
+ tmp[i++] = sdebug;
+ tmp[i++] = "\n";
+ tmp[i++] = NULL;
+
+ status = HYDU_str_alloc_and_join(tmp, &cmd);
+ HYDU_ERR_POP(status, "error while joining strings\n");
+
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+
+ HYDU_FREE(ssize);
+ HYDU_FREE(srank);
+ HYDU_FREE(sdebug);
+ HYDU_FREE(cmd);
+
+ run = pg_list;
+ while (run->next)
+ run = run->next;
+
+ /* Add the process to the last PG */
+ status = add_process_to_pg(run, fd);
+ HYDU_ERR_POP(status, "unable to add process to pg\n");
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_init(int fd, char *args[])
+{
+ int pmi_version, pmi_subversion;
+ char *tmp[HYDU_NUM_JOIN_STR];
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ strtok(args[0], "=");
+ pmi_version = atoi(strtok(NULL, "="));
+ strtok(args[1], "=");
+ pmi_subversion = atoi(strtok(NULL, "="));
+
+ if (pmi_version == 1 && pmi_subversion <= 1) {
+ /* We support PMI v1.0 and 1.1 */
+ tmp[0] = "cmd=response_to_init pmi_version=1 pmi_subversion=1 rc=0\n";
+ status = HYDU_sock_writeline(fd, tmp[0], strlen(tmp[0]));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ }
+ else {
+ /* PMI version mismatch */
+ HYDU_ERR_SETANDJUMP2(status, HYD_INTERNAL_ERROR,
+ "PMI version mismatch; %d.%d\n", pmi_version, pmi_subversion);
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_get_maxes(int fd, char *args[])
+{
+ int i;
+ char *tmp[HYDU_NUM_JOIN_STR], *cmd;
+ char *maxkvsname, *maxkeylen, *maxvallen;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYDU_int_to_str(MAXKVSNAME, &maxkvsname);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ status = HYDU_int_to_str(MAXKEYLEN, &maxkeylen);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ status = HYDU_int_to_str(MAXVALLEN, &maxvallen);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ i = 0;
+ tmp[i++] = "cmd=maxes kvsname_max=";
+ tmp[i++] = maxkvsname;
+ tmp[i++] = " keylen_max=";
+ tmp[i++] = maxkeylen;
+ tmp[i++] = " vallen_max=";
+ tmp[i++] = maxvallen;
+ tmp[i++] = "\n";
+ tmp[i++] = NULL;
+
+ status = HYDU_str_alloc_and_join(tmp, &cmd);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ HYDU_FREE(cmd);
+
+ HYDU_FREE(maxkvsname);
+ HYDU_FREE(maxkeylen);
+ HYDU_FREE(maxvallen);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+static HYD_PMCD_pmi_process_t *find_process(int fd)
+{
+ HYD_PMCD_pmi_pg_t *pg;
+ HYD_PMCD_pmi_process_t *process = NULL;
+
+ pg = pg_list;
+ while (pg) {
+ process = pg->process;
+ while (process) {
+ if (process->fd == fd)
+ break;
+ process = process->next;
+ }
+ pg = pg->next;
+ }
+
+ return process;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_get_appnum(int fd, char *args[])
+{
+ char *tmp[HYDU_NUM_JOIN_STR], *cmd;
+ char *sapp_num;
+ int i;
+ HYD_PMCD_pmi_process_t *process;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Find the group id corresponding to this fd */
+ process = find_process(fd);
+ if (process == NULL) /* We didn't find the process */
+ HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
+ "unable to find process structure\n");
+
+ status = HYDU_int_to_str(process->pg->id, &sapp_num);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ i = 0;
+ tmp[i++] = "cmd=appnum appnum=";
+ tmp[i++] = sapp_num;
+ tmp[i++] = "\n";
+ tmp[i++] = NULL;
+
+ status = HYDU_str_alloc_and_join(tmp, &cmd);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ HYDU_FREE(cmd);
+
+ HYDU_FREE(sapp_num);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_get_my_kvsname(int fd, char *args[])
+{
+ char *tmp[HYDU_NUM_JOIN_STR], *cmd;
+ int i;
+ HYD_PMCD_pmi_process_t *process;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Find the group id corresponding to this fd */
+ process = find_process(fd);
+ if (process == NULL) /* We didn't find the process */
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
+ "unable to find process structure for fd %d\n", fd);
+
+ i = 0;
+ tmp[i++] = "cmd=my_kvsname kvsname=";
+ tmp[i++] = process->pg->kvs->kvs_name;
+ tmp[i++] = "\n";
+ tmp[i++] = NULL;
+
+ status = HYDU_str_alloc_and_join(tmp, &cmd);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ HYDU_FREE(cmd);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_barrier_in(int fd, char *args[])
+{
+ HYD_PMCD_pmi_process_t *process, *run;
+ char *cmd;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Find the group id corresponding to this fd */
+ process = find_process(fd);
+ if (process == NULL) /* We didn't find the process */
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
+ "unable to find process structure for fd %d\n", fd);
+
+ process->pg->barrier_count++;
+
+ /* All the processes have arrived at the barrier; send a
+ * barrier_out message to everyone. */
+ if (process->pg->barrier_count == process->pg->num_procs) {
+ cmd = "cmd=barrier_out\n";
+ run = process->pg->process; /* The first process in the list */
+ while (run) {
+ status = HYDU_sock_writeline(run->fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ run = run->next;
+ }
+
+ process->pg->barrier_count = 0;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_put(int fd, char *args[])
+{
+ int i;
+ HYD_PMCD_pmi_process_t *process;
+ HYD_PMCD_pmi_kvs_pair_t *key_pair, *run;
+ char *kvsname, *key, *val, *key_pair_str = NULL;
+ char *tmp[HYDU_NUM_JOIN_STR], *cmd;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ strtok(args[0], "=");
+ kvsname = strtok(NULL, "=");
+ strtok(args[1], "=");
+ key = strtok(NULL, "=");
+ strtok(args[2], "=");
+ val = strtok(NULL, "=");
+
+ /* Find the group id corresponding to this fd */
+ process = find_process(fd);
+ if (process == NULL) /* We didn't find the process */
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
+ "unable to find process structure for fd %d\n", fd);
+
+ if (strcmp(process->pg->kvs->kvs_name, kvsname))
+ HYDU_ERR_SETANDJUMP2(status, HYD_INTERNAL_ERROR,
+ "kvsname (%s) does not match this process' kvs space (%s)\n",
+ kvsname, process->pg->kvs->kvs_name);
+
+ HYDU_MALLOC(key_pair, HYD_PMCD_pmi_kvs_pair_t *, sizeof(HYD_PMCD_pmi_kvs_pair_t), status);
+ MPIU_Snprintf(key_pair->key, MAXKEYLEN, "%s", key);
+ MPIU_Snprintf(key_pair->val, MAXVALLEN, "%s", val);
+ key_pair->next = NULL;
+
+ i = 0;
+ tmp[i++] = "cmd=put_result rc=";
+ if (process->pg->kvs->key_pair == NULL) {
+ process->pg->kvs->key_pair = key_pair;
+ tmp[i++] = "0 msg=success";
+ }
+ else {
+ run = process->pg->kvs->key_pair;
+ while (run->next) {
+ if (!strcmp(run->key, key_pair->key)) {
+ tmp[i++] = "-1 msg=duplicate_key";
+ key_pair_str = MPIU_Strdup(key_pair->key);
+ tmp[i++] = key_pair_str;
+ break;
+ }
+ run = run->next;
+ }
+ run->next = key_pair;
+ tmp[i++] = "0 msg=success";
+ }
+ tmp[i++] = "\n";
+ tmp[i++] = NULL;
+
+ status = HYDU_str_alloc_and_join(tmp, &cmd);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ HYDU_FREE(cmd);
+ HYDU_FREE(key_pair_str);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_get(int fd, char *args[])
+{
+ int i;
+ HYD_PMCD_pmi_process_t *process;
+ HYD_PMCD_pmi_kvs_pair_t *run;
+ char *kvsname, *key;
+ char *tmp[HYDU_NUM_JOIN_STR], *cmd, *key_val_str = NULL;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ strtok(args[0], "=");
+ kvsname = strtok(NULL, "=");
+ strtok(args[1], "=");
+ key = strtok(NULL, "=");
+
+ /* Find the group id corresponding to this fd */
+ process = find_process(fd);
+ if (process == NULL) /* We didn't find the process */
+ HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
+ "unable to find process structure for fd %d\n", fd);
+
+ if (strcmp(process->pg->kvs->kvs_name, kvsname))
+ HYDU_ERR_SETANDJUMP2(status, HYD_INTERNAL_ERROR,
+ "kvsname (%s) does not match this process' kvs space (%s)\n",
+ kvsname, process->pg->kvs->kvs_name);
+
+ i = 0;
+ tmp[i++] = "cmd=get_result rc=";
+ if (process->pg->kvs->key_pair == NULL) {
+ tmp[i++] = "-1 msg=key_";
+ tmp[i++] = key;
+ tmp[i++] = "_not_found value=unknown";
+ }
+ else {
+ run = process->pg->kvs->key_pair;
+ while (run) {
+ if (!strcmp(run->key, key)) {
+ tmp[i++] = "0 msg=success value=";
+ key_val_str = MPIU_Strdup(run->val);
+ tmp[i++] = key_val_str;
+ break;
+ }
+ run = run->next;
+ }
+ if (run == NULL) {
+ tmp[i++] = "-1 msg=key_";
+ tmp[i++] = key;
+ tmp[i++] = "_not_found value=unknown";
+ }
+ }
+ tmp[i++] = "\n";
+ tmp[i++] = NULL;
+
+ status = HYDU_str_alloc_and_join(tmp, &cmd);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ HYDU_FREE(cmd);
+ HYDU_FREE(key_val_str);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_finalize(int fd, char *args[])
+{
+ char *cmd;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ cmd = "cmd=finalize_ack\n";
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_query_get_usize(int fd, char *args[])
+{
+ int usize, i;
+ char *tmp[HYDU_NUM_JOIN_STR], *cmd, *usize_str;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYD_BSCI_get_usize(&usize);
+ HYDU_ERR_POP(status, "unable to get bootstrap universe size\n");
+
+ status = HYDU_int_to_str(usize, &usize_str);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ i = 0;
+ tmp[i++] = "cmd=universe_size size=";
+ tmp[i++] = usize_str;
+ tmp[i++] = "\n";
+ tmp[i++] = NULL;
+
+ status = HYDU_str_alloc_and_join(tmp, &cmd);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
+ HYDU_ERR_POP(status, "error writing PMI line\n");
+ HYDU_FREE(cmd);
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+static HYD_Status free_pmi_process_list(HYD_PMCD_pmi_process_t * process_list)
+{
+ HYD_PMCD_pmi_process_t *process, *tmp;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ process = process_list;
+ while (process) {
+ tmp = process->next;
+ HYDU_FREE(process);
+ process = tmp;
+ }
+
+ HYDU_FUNC_EXIT();
+ return status;
+}
+
+
+static HYD_Status free_pmi_kvs_list(HYD_PMCD_pmi_kvs_t * kvs_list)
+{
+ HYD_PMCD_pmi_kvs_pair_t *key_pair, *tmp;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ key_pair = kvs_list->key_pair;
+ while (key_pair) {
+ tmp = key_pair->next;
+ HYDU_FREE(key_pair);
+ key_pair = tmp;
+ }
+ HYDU_FREE(kvs_list);
+
+ HYDU_FUNC_EXIT();
+ return status;
+}
+
+
+HYD_Status HYD_PMCD_pmi_finalize(void)
+{
+ HYD_PMCD_pmi_pg_t *pg, *tmp;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ pg = pg_list;
+ while (pg) {
+ tmp = pg->next;
+
+ status = free_pmi_process_list(pg->process);
+ HYDU_ERR_POP(status, "unable to free process list\n");
+
+ status = free_pmi_kvs_list(pg->kvs);
+ HYDU_ERR_POP(status, "unable to free kvs list\n");
+
+ HYDU_FREE(pg);
+ pg = tmp;
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
Added: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.h (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_query.h 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,67 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#ifndef PMI_QUERY_H_INCLUDED
+#define PMI_QUERY_H_INCLUDED
+
+#include "hydra.h"
+#include "hydra_utils.h"
+
+#define MAXKEYLEN 64 /* max length of key in keyval space */
+#define MAXVALLEN 256 /* max length of value in keyval space */
+#define MAXNAMELEN 256 /* max length of various names */
+#define MAXKVSNAME MAXNAMELEN /* max length of a kvsname */
+
+typedef struct HYD_PMCD_pmi_kvs_pair {
+ char key[MAXKEYLEN];
+ char val[MAXVALLEN];
+ struct HYD_PMCD_pmi_kvs_pair *next;
+} HYD_PMCD_pmi_kvs_pair_t;
+
+typedef struct HYD_PMCD_pmi_kvs {
+ char kvs_name[MAXNAMELEN]; /* Name of this kvs */
+ HYD_PMCD_pmi_kvs_pair_t *key_pair;
+} HYD_PMCD_pmi_kvs_t;
+
+typedef struct HYD_PMCD_pmi_pg HYD_PMCD_pmi_pg_t;
+typedef struct HYD_PMCD_pmi_process HYD_PMCD_pmi_process_t;
+
+struct HYD_PMCD_pmi_process {
+ /* This is a bad design if we need to tie in an FD to a PMI
+ * process. This essentially kills any chance of PMI server
+ * masquerading. However, PMI v1 requires this state to be
+ * maintained. */
+ int fd;
+ struct HYD_PMCD_pmi_pg *pg;
+ struct HYD_PMCD_pmi_process *next;
+};
+
+struct HYD_PMCD_pmi_pg {
+ int id;
+
+ int num_procs; /* Number of processes in the group */
+ int barrier_count;
+
+ struct HYD_PMCD_pmi_process *process;
+ HYD_PMCD_pmi_kvs_t *kvs;
+
+ struct HYD_PMCD_pmi_pg *next;
+};
+
+HYD_Status HYD_PMCD_pmi_create_pg(void);
+HYD_Status HYD_PMCD_pmi_query_initack(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_init(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_get_maxes(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_get_appnum(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_get_my_kvsname(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_barrier_in(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_put(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_get(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_finalize(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_query_get_usize(int fd, char *args[]);
+HYD_Status HYD_PMCD_pmi_finalize(void);
+
+#endif /* PMI_QUERY_H_INCLUDED */
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/central.h)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,21 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#ifndef PMI_SERV_H_INCLUDED
+#define PMI_SERV_H_INCLUDED
+
+/* Currently we only have one command */
+enum HYD_PMCD_pmi_proxy_cmds {
+ KILLALL_PROCS
+};
+
+extern int HYD_PMCD_pmi_serv_listenfd;
+
+HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events);
+HYD_Status HYD_PMCD_pmi_serv_cleanup(void);
+void HYD_PMCD_pmi_serv_signal_cb(int signal);
+
+#endif /* PMI_SERV_H_INCLUDED */
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv.h
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/central_cb.c)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,225 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "hydra.h"
+#include "hydra_utils.h"
+#include "pmi_query.h"
+#include "pmci.h"
+#include "bsci.h"
+#include "demux.h"
+#include "pmi_serv.h"
+
+int HYD_PMCD_pmi_serv_listenfd;
+HYD_Handle handle;
+
+/*
+ * HYD_PMCD_pmi_serv_cb: This is the core PMI server part of the
+ * code. Here's the expected protocol:
+ *
+ * 1. The client (MPI process) connects to us, which will result in an
+ * event on the listener socket.
+ *
+ * 2. The client sends us a "cmd" or "mcmd" string which means that a
+ * single or multi-line command is to follow.
+ *
+ * 3. Here are the commands that we respect:
+ * - initack [done]
+ * - init [done]
+ * - get_maxes [done]
+ * - get_appnum [done]
+ * - get_my_kvsname [done]
+ * - barrier_in [done]
+ * - put [done]
+ * - get [done]
+ * - finalize [done]
+ * - get_universe_size [done]
+ * - abort
+ * - create_kvs
+ * - destroy_kvs
+ * - getbyidx
+ * - spawn
+ */
+HYD_Status HYD_PMCD_pmi_serv_cb(int fd, HYD_Event_t events)
+{
+ int accept_fd, linelen, i;
+ char *buf, *cmd, *args[HYD_EXEC_ARGS];
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ HYDU_MALLOC(buf, char *, HYD_TMPBUF_SIZE, status);
+
+ if (fd == HYD_PMCD_pmi_serv_listenfd) { /* Someone is trying to connect to us */
+ status = HYDU_sock_accept(fd, &accept_fd);
+ HYDU_ERR_POP(status, "accept error\n");
+
+ status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_pmi_serv_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ }
+ else {
+ status = HYDU_sock_readline(fd, buf, HYD_TMPBUF_SIZE, &linelen);
+ HYDU_ERR_POP(status, "PMI read line error\n");
+
+ if (linelen == 0) {
+ /* This is not a clean close. If a finalize was called, we
+ * would have deregistered this socket. The application
+ * might have aborted. Just cleanup all the processes */
+ status = HYD_PMCD_pmi_serv_cleanup();
+ if (status != HYD_SUCCESS) {
+ HYDU_Warn_printf("bootstrap server returned error cleaning up processes\n");
+ status = HYD_SUCCESS;
+ goto fn_fail;
+ }
+
+ status = HYD_DMX_deregister_fd(fd);
+ if (status != HYD_SUCCESS) {
+ HYDU_Warn_printf("unable to deregister fd %d\n", fd);
+ status = HYD_SUCCESS;
+ goto fn_fail;
+ }
+
+ close(fd);
+ goto fn_exit;
+ }
+
+ /* Check what command we got and call the appropriate
+ * function */
+ buf[linelen - 1] = 0;
+
+ cmd = strtok(buf, " ");
+ for (i = 0; i < HYD_EXEC_ARGS; i++) {
+ args[i] = strtok(NULL, " ");
+ if (args[i] == NULL)
+ break;
+ }
+
+ if (cmd == NULL) {
+ status = HYD_SUCCESS;
+ }
+ else if (!strcmp("cmd=initack", cmd)) {
+ status = HYD_PMCD_pmi_query_initack(fd, args);
+ }
+ else if (!strcmp("cmd=init", cmd)) {
+ status = HYD_PMCD_pmi_query_init(fd, args);
+ }
+ else if (!strcmp("cmd=get_maxes", cmd)) {
+ status = HYD_PMCD_pmi_query_get_maxes(fd, args);
+ }
+ else if (!strcmp("cmd=get_appnum", cmd)) {
+ status = HYD_PMCD_pmi_query_get_appnum(fd, args);
+ }
+ else if (!strcmp("cmd=get_my_kvsname", cmd)) {
+ status = HYD_PMCD_pmi_query_get_my_kvsname(fd, args);
+ }
+ else if (!strcmp("cmd=barrier_in", cmd)) {
+ status = HYD_PMCD_pmi_query_barrier_in(fd, args);
+ }
+ else if (!strcmp("cmd=put", cmd)) {
+ status = HYD_PMCD_pmi_query_put(fd, args);
+ }
+ else if (!strcmp("cmd=get", cmd)) {
+ status = HYD_PMCD_pmi_query_get(fd, args);
+ }
+ else if (!strcmp("cmd=finalize", cmd)) {
+ status = HYD_PMCD_pmi_query_finalize(fd, args);
+
+ if (status == HYD_SUCCESS) {
+ status = HYD_DMX_deregister_fd(fd);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+ close(fd);
+ }
+ }
+ else if (!strcmp("cmd=get_universe_size", cmd)) {
+ status = HYD_PMCD_pmi_query_get_usize(fd, args);
+ }
+ else {
+ /* We don't understand the command */
+ HYDU_Error_printf("Unrecognized PMI command: %s; cleaning up processes\n", cmd);
+
+ /* Cleanup all the processes and return. We don't need to
+ * check the return status since we are anyway returning
+ * an error */
+ HYD_PMCD_pmi_serv_cleanup();
+ status = HYD_SUCCESS;
+ goto fn_fail;
+ }
+
+ HYDU_ERR_POP(status, "PMI server returned error\n");
+ }
+
+ fn_exit:
+ HYDU_FREE(buf);
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCD_pmi_serv_cleanup(void)
+{
+ struct HYD_Proc_params *proc_params;
+ struct HYD_Partition_list *partition;
+ int fd;
+ enum HYD_PMCD_pmi_proxy_cmds cmd;
+ HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* FIXME: Instead of doing this from this process itself, fork a
+ * bunch of processes to do this. */
+ /* Connect to all proxies and send a KILL command */
+ cmd = KILLALL_PROCS;
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
+ status = HYDU_sock_connect(partition->name, handle.proxy_port, &fd);
+ if (status != HYD_SUCCESS) {
+ HYDU_Warn_printf("unable to connect to the proxy on %s\n", partition->name);
+ overall_status = HYD_INTERNAL_ERROR;
+ continue; /* Move on to the next proxy */
+ }
+
+ status = HYDU_sock_write(fd, &cmd, sizeof(cmd));
+ if (status != HYD_SUCCESS) {
+ HYDU_Warn_printf("unable to send data to the proxy on %s\n", partition->name);
+ overall_status = HYD_INTERNAL_ERROR;
+ continue; /* Move on to the next proxy */
+ }
+
+ close(fd);
+ }
+ }
+
+ HYDU_FUNC_EXIT();
+
+ return overall_status;
+}
+
+
+void HYD_PMCD_pmi_serv_signal_cb(int signal)
+{
+ HYDU_FUNC_ENTER();
+
+ if (signal == SIGINT || signal == SIGQUIT || signal == SIGTERM
+#if defined SIGSTOP
+ || signal == SIGSTOP
+#endif /* SIGSTOP */
+#if defined SIGCONT
+ || signal == SIGCONT
+#endif /* SIGSTOP */
+) {
+ /* There's nothing we can do with the return value for now. */
+ HYD_PMCD_pmi_serv_cleanup();
+ exit(-1);
+ }
+ else {
+ /* Ignore other signals for now */
+ }
+
+ HYDU_FUNC_EXIT();
+ return;
+}
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/central_finalize.c)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,42 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "hydra.h"
+#include "pmci.h"
+#include "pmi_query.h"
+#include "bsci.h"
+#include "demux.h"
+#include "pmi_serv.h"
+
+int HYD_PMCD_pmi_serv_listenfd;
+
+HYD_Status HYD_PMCI_finalize(void)
+{
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ /* Deregister the listen socket from the demux engine and close
+ * it. */
+ status = HYD_DMX_deregister_fd(HYD_PMCD_pmi_serv_listenfd);
+ HYDU_ERR_POP(status, "unable to deregister fd\n");
+
+ close(HYD_PMCD_pmi_serv_listenfd);
+ HYD_PMCD_pmi_serv_listenfd = -1;
+
+ status = HYD_PMCD_pmi_finalize();
+ HYDU_ERR_POP(status, "unable to finalize process manager utils\n");
+
+ status = HYD_BSCI_finalize();
+ HYDU_ERR_POP(status, "unable to finalize bootstrap server\n");
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_finalize.c
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c (from rev 4149, mpich2/trunk/src/pm/hydra/pm/central/central_launch.c)
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c (rev 0)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -0,0 +1,215 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ * (C) 2008 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "hydra.h"
+#include "hydra_utils.h"
+#include "pmci.h"
+#include "pmi_query.h"
+#include "bsci.h"
+#include "demux.h"
+#include "pmi_serv.h"
+
+int HYD_PMCD_pmi_serv_listenfd;
+HYD_Handle handle;
+
+/*
+ * HYD_PMCI_launch_procs: Here are the steps we follow:
+ *
+ * 1. Find what all ports the user wants to allow and listen on one of
+ * those ports.
+ *
+ * 2. Create a call-back function to accept connections and register
+ * the listening socket with the demux engine.
+ *
+ * 3. Create a port string out of this hostname and port and add it to
+ * the environment list under the variable "PMI_PORT".
+ *
+ * 4. Create an environment variable for PMI_ID. This is an
+ * auto-incrementing variable; the bootstrap server will take care of
+ * adding the process ID to the start value.
+ *
+ * 5. Create a process info setup and ask the bootstrap server to
+ * launch the processes.
+ */
+HYD_Status HYD_PMCI_launch_procs(void)
+{
+ char *port_range, *port_str, *sport, *str;
+ uint16_t port;
+ int i, arg;
+ int process_id, group_id;
+ char hostname[MAX_HOSTNAME_LEN];
+ HYD_Env_t *env;
+ char *path_str[HYDU_NUM_JOIN_STR];
+ struct HYD_Proc_params *proc_params;
+ struct HYD_Partition_list *partition;
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYDU_set_common_signals(HYD_PMCD_pmi_serv_signal_cb);
+ HYDU_ERR_POP(status, "unable to set signal\n");
+
+ /* Check if the user wants us to use a port within a certain
+ * range. */
+ port_range = getenv("MPIEXEC_PORTRANGE");
+ if (!port_range)
+ port_range = getenv("MPIEXEC_PORT_RANGE");
+ if (!port_range)
+ port_range = getenv("MPICH_PORT_RANGE");
+
+ /* Listen on a port in the port range */
+ port = 0;
+ status = HYDU_sock_listen(&HYD_PMCD_pmi_serv_listenfd, port_range, &port);
+ HYDU_ERR_POP(status, "unable to listen on port\n");
+
+ /* Register the listening socket with the demux engine */
+ status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_serv_listenfd, HYD_STDOUT,
+ HYD_PMCD_pmi_serv_cb);
+ HYDU_ERR_POP(status, "unable to register fd\n");
+
+ /* Create a port string for MPI processes to use to connect to */
+ if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0)
+ HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR,
+ "gethostname error (hostname: %s; errno: %d)\n", hostname,
+ errno);
+
+ status = HYDU_int_to_str(port, &sport);
+ HYDU_ERR_POP(status, "cannot convert int to string\n");
+
+ HYDU_MALLOC(port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
+ MPIU_Snprintf(port_str, strlen(hostname) + 1 + strlen(sport) + 1,
+ "%s:%s", hostname, sport);
+ HYDU_FREE(sport);
+ HYDU_Debug("Process manager listening on PMI port %s\n", port_str);
+
+ status = HYDU_env_create(&env, "PMI_PORT", port_str);
+ HYDU_ERR_POP(status, "unable to create env\n");
+
+ status = HYDU_append_env_to_list(*env, &handle.system_env);
+ HYDU_ERR_POP(status, "unable to add env to list\n");
+
+ HYDU_env_free(env);
+ HYDU_FREE(port_str);
+
+ /* Create a process group for the MPI processes in this
+ * comm_world */
+ status = HYD_PMCD_pmi_create_pg();
+ HYDU_ERR_POP(status, "unable to create process group\n");
+
+ /* Create the arguments list for each proxy */
+ process_id = 0;
+ group_id = 0;
+ for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
+ for (partition = proc_params->partition; partition; partition = partition->next) {
+
+ partition->group_id = group_id++;
+ partition->group_rank = 0;
+
+ for (arg = 0; partition->args[arg]; arg++);
+ i = 0;
+ path_str[i++] = MPIU_Strdup(handle.base_path);
+ path_str[i++] = MPIU_Strdup("pmi_proxy");
+ path_str[i] = NULL;
+ status = HYDU_str_alloc_and_join(path_str, &partition->args[arg++]);
+ HYDU_ERR_POP(status, "unable to join strings\n");
+
+ HYDU_free_strlist(path_str);
+
+ status = HYDU_int_to_str(partition->proc_count, &str);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ partition->args[arg++] = MPIU_Strdup("--proc-count");
+ partition->args[arg++] = MPIU_Strdup(str);
+
+ partition->args[arg++] = MPIU_Strdup("--partition");
+ partition->args[arg++] = MPIU_Strdup(partition->name);
+ partition->args[arg++] = MPIU_Strdup(str);
+ HYDU_FREE(str);
+
+ status = HYDU_int_to_str(process_id, &str);
+ HYDU_ERR_POP(status, "unable to convert int to string\n");
+
+ partition->args[arg++] = MPIU_Strdup("--pmi-id");
+ partition->args[arg++] = MPIU_Strdup(str);
+ HYDU_FREE(str);
+
+ status = HYDU_int_to_str(handle.proxy_port, &str);
+ HYDU_ERR_POP(status, "unable to convert in to string\n");
+
+ partition->args[arg++] = MPIU_Strdup("--proxy-port");
+ partition->args[arg++] = MPIU_Strdup(str);
+ HYDU_FREE(str);
+
+ partition->args[arg++] = MPIU_Strdup("--wdir");
+ partition->args[arg++] = MPIU_Strdup(handle.wdir);
+
+ partition->args[arg++] = MPIU_Strdup("--environment");
+ i = 0;
+ for (env = handle.system_env; env; env = env->next)
+ i++;
+ for (env = handle.prop_env; env; env = env->next)
+ i++;
+ for (env = proc_params->prop_env; env; env = env->next)
+ i++;
+ status = HYDU_int_to_str(i, &str);
+ HYDU_ERR_POP(status, "unable to convert in to string\n");
+
+ partition->args[arg++] = MPIU_Strdup(str);
+ partition->args[arg++] = NULL;
+
+ HYDU_list_append_env_to_str(handle.system_env, partition->args);
+ HYDU_list_append_env_to_str(handle.prop_env, partition->args);
+ HYDU_list_append_env_to_str(proc_params->prop_env, partition->args);
+
+ for (arg = 0; partition->args[arg]; arg++);
+ partition->args[arg] = NULL;
+ HYDU_list_append_strlist(proc_params->exec, partition->args);
+
+ if (handle.debug) {
+ HYDU_Debug("Executable passed to the bootstrap: ");
+ HYDU_print_strlist(partition->args);
+ }
+
+ process_id += partition->proc_count;
+ }
+ }
+
+ /* Initialize the bootstrap server and ask it to launch the
+ * processes. */
+ status = HYD_BSCI_init(handle.bootstrap);
+ HYDU_ERR_POP(status, "bootstrap server initialization failed\n");
+
+ status = HYD_BSCI_launch_procs();
+ HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+HYD_Status HYD_PMCI_wait_for_completion(void)
+{
+ HYD_Status status = HYD_SUCCESS;
+
+ HYDU_FUNC_ENTER();
+
+ status = HYD_BSCI_wait_for_completion();
+ if (status != HYD_SUCCESS) {
+ status = HYD_PMCD_pmi_serv_cleanup();
+ HYDU_ERR_POP(status, "process manager cannot cleanup processes\n");
+ }
+
+ fn_exit:
+ HYDU_FUNC_EXIT();
+ return status;
+
+ fn_fail:
+ goto fn_exit;
+}
Property changes on: mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c
___________________________________________________________________
Name: svn:mergeinfo
+
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy.c 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,195 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "hydra_utils.h"
-#include "demux.h"
-#include "proxy.h"
-
-struct HYD_Proxy_params HYD_Proxy_params;
-int HYD_Proxy_listenfd;
-
-int main(int argc, char **argv)
-{
- int i, j, arg, count, pid, ret_status;
- int stdin_fd, timeout;
- char *str, *timeout_str;
- char *client_args[HYD_EXEC_ARGS];
- char *tmp[HYDU_NUM_JOIN_STR];
- HYD_Status status = HYD_SUCCESS;
-
- status = HYD_Proxy_get_params(argc, argv);
- HYDU_ERR_POP(status, "bad parameters passed to the proxy\n");
-
- status = HYDU_sock_listen(&HYD_Proxy_listenfd, NULL,
- (uint16_t *) & HYD_Proxy_params.proxy_port);
- HYDU_ERR_POP(status, "unable to listen on socket\n");
-
- /* Register the listening socket with the demux engine */
- status = HYD_DMX_register_fd(1, &HYD_Proxy_listenfd, HYD_STDOUT, HYD_Proxy_listen_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
-
- /* FIXME: We do not use the bootstrap server right now, as the
- * current implementation of the bootstrap launch directly reads
- * the executable information from the HYD_Handle structure. Since
- * we are a different process, we don't share this
- * structure. Without the bootstrap server, we can only launch
- * local processes. That is, we can only have a single-level
- * hierarchy of proxies. */
-
- HYDU_MALLOC(HYD_Proxy_params.out, int *,
- HYD_Proxy_params.proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_Proxy_params.err, int *,
- HYD_Proxy_params.proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_Proxy_params.pid, int *,
- HYD_Proxy_params.proc_count * sizeof(int), status);
- HYDU_MALLOC(HYD_Proxy_params.exit_status, int *,
- HYD_Proxy_params.proc_count * sizeof(int), status);
-
- /* Initialize the exit status */
- for (i = 0; i < HYD_Proxy_params.proc_count; i++)
- HYD_Proxy_params.exit_status[i] = -1;
-
- /* Spawn the processes */
- for (i = 0; i < HYD_Proxy_params.proc_count; i++) {
-
- j = 0;
- tmp[j++] = MPIU_Strdup("PMI_ID=");
- status = HYDU_int_to_str(HYD_Proxy_params.pmi_id + i, &str);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- tmp[j++] = MPIU_Strdup(str);
- HYDU_FREE(str);
- tmp[j++] = NULL;
- status = HYDU_str_alloc_and_join(tmp, &str);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- HYDU_putenv(str);
- for (j = 0; tmp[j]; j++)
- HYDU_FREE(tmp[j]);
-
- if (chdir(HYD_Proxy_params.wdir) < 0)
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
- "unable to change wdir (%s)\n", HYDU_strerror(errno));
-
- for (j = 0, arg = 0; HYD_Proxy_params.args[j]; j++)
- client_args[arg++] = MPIU_Strdup(HYD_Proxy_params.args[j]);
- client_args[arg++] = NULL;
-
- /* FIXME: We need to figure out how many total number of
- * processes are there on this partition, and appropriately
- * bind them. */
- if ((i + HYD_Proxy_params.pmi_id) == 0) {
- status = HYDU_create_process(client_args, &HYD_Proxy_params.in,
- &HYD_Proxy_params.out[i], &HYD_Proxy_params.err[i],
- &HYD_Proxy_params.pid[i], i);
- }
- else {
- status = HYDU_create_process(client_args, NULL,
- &HYD_Proxy_params.out[i],
- &HYD_Proxy_params.err[i], &HYD_Proxy_params.pid[i],
- i);
- }
- HYDU_ERR_POP(status, "spawn process returned error\n");
-
- HYDU_FREE(str);
- }
-
- /* Everything is spawned, now wait for I/O */
- status = HYD_DMX_register_fd(HYD_Proxy_params.proc_count, HYD_Proxy_params.out,
- HYD_STDOUT, HYD_Proxy_stdout_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
-
- status = HYD_DMX_register_fd(HYD_Proxy_params.proc_count, HYD_Proxy_params.err,
- HYD_STDOUT, HYD_Proxy_stderr_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
-
- if (HYD_Proxy_params.pmi_id == 0) {
- status = HYDU_sock_set_nonblock(HYD_Proxy_params.in);
- HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
- stdin_fd = 0;
- status = HYDU_sock_set_nonblock(stdin_fd);
- HYDU_ERR_POP(status, "unable to set socket as non-blocking\n");
-
- HYD_Proxy_params.stdin_buf_offset = 0;
- HYD_Proxy_params.stdin_buf_count = 0;
- status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, HYD_Proxy_stdin_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
- }
-
- timeout_str = getenv("MPIEXEC_TIMEOUT");
- if (timeout_str)
- timeout = atoi(timeout_str);
- else
- timeout = -1;
-
- while (1) {
- /* Wait for some event to occur */
- status = HYD_DMX_wait_for_event(timeout);
- HYDU_ERR_POP(status, "demux engine error waiting for event\n");
-
- /* Check to see if there's any open read socket left; if there
- * are, we will just wait for more events. */
- count = 0;
- for (i = 0; i < HYD_Proxy_params.proc_count; i++) {
- if (HYD_Proxy_params.out[i] != -1 || HYD_Proxy_params.err[i] != -1) {
- count++;
- break;
- }
- }
-
- /* We are done */
- if (!count)
- break;
- }
-
- /* FIXME: If we did not break out yet, add a small usleep to yield
- * CPU here. We can not just sleep for the remaining time, as the
- * timeout value might be large and the application might exit
- * much quicker. Note that the sched_yield() call is broken on
- * newer linux kernel versions and should not be used. */
- /* Once all the sockets are closed, wait for all the processes to
- * finish. We poll here, but hopefully not for too long. */
- do {
- pid = waitpid(-1, &ret_status, WNOHANG);
-
- /* Find the pid and mark it as complete. */
- if (pid > 0)
- for (i = 0; i < HYD_Proxy_params.proc_count; i++)
- if (HYD_Proxy_params.pid[i] == pid)
- HYD_Proxy_params.exit_status[i] = WEXITSTATUS(ret_status);
-
- /* Check how many more processes are pending */
- count = 0;
- for (i = 0; i < HYD_Proxy_params.proc_count; i++) {
- if (HYD_Proxy_params.exit_status[i] == -1) {
- count++;
- break;
- }
- }
-
- if (count == 0)
- break;
-
- /* Check if there are any messages from the launcher */
- status = HYD_DMX_wait_for_event(0);
- HYDU_ERR_POP(status, "demux engine error waiting for event\n");
- } while (1);
-
- ret_status = 0;
- for (i = 0; i < HYD_Proxy_params.proc_count; i++)
- ret_status |= HYD_Proxy_params.exit_status[i];
-
- fn_exit:
- if (status != HYD_SUCCESS)
- return -1;
- else
- return ret_status;
-
- fn_fail:
- goto fn_exit;
-}
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy.h 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy.h 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,43 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#ifndef PROXY_H_INCLUDED
-#define PROXY_H_INCLUDED
-
-#include "hydra_base.h"
-#include "hydra_utils.h"
-
-struct HYD_Proxy_params {
- HYD_Env_t *global_env;
- HYD_Env_t *env_list;
- int proc_count;
- int proxy_port;
- int pmi_id;
- char *args[HYD_EXEC_ARGS];
- char *wdir;
- struct HYD_Partition_list *partition;
-
- int *pid;
- int *out;
- int *err;
- int *exit_status;
- int in;
-
- int stdin_buf_offset;
- int stdin_buf_count;
- char stdin_tmp_buf[HYD_TMPBUF_SIZE];
-};
-
-extern struct HYD_Proxy_params HYD_Proxy_params;
-extern int HYD_Proxy_listenfd;
-
-HYD_Status HYD_Proxy_get_params(int t_argc, char **t_argv);
-HYD_Status HYD_Proxy_listen_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_Proxy_stdout_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_Proxy_stderr_cb(int fd, HYD_Event_t events);
-HYD_Status HYD_Proxy_stdin_cb(int fd, HYD_Event_t events);
-
-#endif /* PROXY_H_INCLUDED */
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy_cb.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy_cb.c 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy_cb.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,159 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "hydra_utils.h"
-#include "proxy.h"
-#include "demux.h"
-#include "central.h"
-
-struct HYD_Proxy_params HYD_Proxy_params;
-int HYD_Proxy_listenfd;
-
-HYD_Status HYD_Proxy_listen_cb(int fd, HYD_Event_t events)
-{
- int accept_fd, count, i;
- enum HYD_Proxy_cmds cmd;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- if (events & HYD_STDIN)
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdout handler got stdin event\n");
-
- if (fd == HYD_Proxy_listenfd) { /* mpiexec is trying to connect */
- status = HYDU_sock_accept(fd, &accept_fd);
- HYDU_ERR_POP(status, "accept error\n");
-
- status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, HYD_Proxy_listen_cb);
- HYDU_ERR_POP(status, "unable to register fd\n");
- }
- else { /* We got a command from mpiexec */
- count = read(fd, &cmd, sizeof(enum HYD_Proxy_cmds));
- if (count < 0) {
- HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n",
- fd, HYDU_strerror(errno));
- }
- else if (count == 0) {
- /* The connection has closed */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to deregister fd\n");
- close(fd);
- goto fn_exit;
- }
-
- if (cmd == KILLALL_PROCS) { /* Got the killall command */
- for (i = 0; i < HYD_Proxy_params.proc_count; i++)
- if (HYD_Proxy_params.pid[i] != -1)
- kill(HYD_Proxy_params.pid[i], SIGKILL);
-
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to register fd\n");
- close(fd);
- }
- else {
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
- "got unrecognized command from mpiexec\n");
- }
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_Proxy_stdout_cb(int fd, HYD_Event_t events)
-{
- int closed, i;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- status = HYDU_sock_stdout_cb(fd, events, 1, &closed);
- HYDU_ERR_POP(status, "stdout callback error\n");
-
- if (closed) {
- /* The connection has closed */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to deregister fd\n");
-
- for (i = 0; i < HYD_Proxy_params.proc_count; i++)
- if (HYD_Proxy_params.out[i] == fd)
- HYD_Proxy_params.out[i] = -1;
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_Proxy_stderr_cb(int fd, HYD_Event_t events)
-{
- int closed, i;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- status = HYDU_sock_stdout_cb(fd, events, 2, &closed);
- HYDU_ERR_POP(status, "stdout callback error\n");
-
- if (closed) {
- /* The connection has closed */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to deregister fd\n");
-
- for (i = 0; i < HYD_Proxy_params.proc_count; i++)
- if (HYD_Proxy_params.err[i] == fd)
- HYD_Proxy_params.err[i] = -1;
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_Proxy_stdin_cb(int fd, HYD_Event_t events)
-{
- int closed;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- status = HYDU_sock_stdin_cb(HYD_Proxy_params.in, events, HYD_Proxy_params.stdin_tmp_buf,
- &HYD_Proxy_params.stdin_buf_count,
- &HYD_Proxy_params.stdin_buf_offset, &closed);
- HYDU_ERR_POP(status, "stdin callback error\n");
-
- if (closed) {
- /* The connection has closed */
- status = HYD_DMX_deregister_fd(fd);
- HYDU_ERR_POP(status, "unable to deregister fd\n");
-
- close(HYD_Proxy_params.in);
- close(fd);
-
- HYD_Proxy_params.in = -1;
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
Deleted: mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy_utils.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/central/proxy_utils.c 2009-03-20 19:35:46 UTC (rev 4149)
+++ mpich2/trunk/src/pm/hydra/pm/pmiserv/proxy_utils.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,117 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#include "hydra.h"
-#include "proxy.h"
-
-struct HYD_Proxy_params HYD_Proxy_params;
-
-HYD_Status HYD_Proxy_get_params(int t_argc, char **t_argv)
-{
- int argc = t_argc;
- char **argv = t_argv, *str;
- int arg, i, count;
- struct HYD_Partition_list *partition, *run;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- HYD_Proxy_params.global_env = NULL;
- HYD_Proxy_params.env_list = NULL;
- HYD_Proxy_params.partition = NULL;
-
- status = HYDU_list_global_env(&HYD_Proxy_params.global_env);
- HYDU_ERR_POP(status, "unable to get the global env list\n");
-
- while (--argc && ++argv) {
-
- /* Process count */
- if (!strcmp(*argv, "--proc-count")) {
- argv++;
- HYD_Proxy_params.proc_count = atoi(*argv);
- continue;
- }
-
- /* Proxy port */
- if (!strcmp(*argv, "--proxy-port")) {
- argv++;
- HYD_Proxy_params.proxy_port = atoi(*argv);
- continue;
- }
-
- /* PMI_ID: This is the PMI_ID for the first process;
- * everything else is incremented from here. */
- if (!strcmp(*argv, "--pmi-id")) {
- argv++;
- HYD_Proxy_params.pmi_id = atoi(*argv);
- continue;
- }
-
- /* Partition information is passed as two parameters; name
- * followed by proc count. Multiple partitions are specified
- * as multiple parameters. */
- if (!strcmp(*argv, "--partition")) {
- argv++;
- HYDU_alloc_partition(&partition);
- partition->name = MPIU_Strdup(*argv);
- argv++;
- partition->proc_count = atoi(*argv);
-
- if (!HYD_Proxy_params.partition)
- HYD_Proxy_params.partition = partition;
- else {
- for (run = HYD_Proxy_params.partition; run->next; run = run->next);
- run->next = partition;
- }
- continue;
- }
-
- /* Working directory */
- if (!strcmp(*argv, "--wdir")) {
- argv++;
- HYD_Proxy_params.wdir = MPIU_Strdup(*argv);
- continue;
- }
-
- /* Environment information is passed as a list of names; we
- * need to find the values from our environment. */
- if (!strcmp(*argv, "--environment")) {
- argv++;
- count = atoi(*argv);
- for (i = 0; i < count; i++) {
- argv++;
- str = *argv;
-
- /* Some bootstrap servers remove the quotes that we
- * added, while some others do not. For the cases
- * where they are not removed, we do it ourselves. */
- if (*str == '\'') {
- str++;
- str[strlen(str) - 1] = 0;
- }
- HYDU_putenv(str);
- }
- continue;
- }
-
- /* Fall through case is application parameters. Load
- * everything into the args variable. */
- for (arg = 0; *argv;) {
- HYD_Proxy_params.args[arg++] = MPIU_Strdup(*argv);
- ++argv;
- --argc;
- }
- HYD_Proxy_params.args[arg++] = NULL;
- break;
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
Deleted: mpich2/trunk/src/pm/hydra/pm/utils/pmcu_pmi.h
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/utils/pmcu_pmi.h 2009-03-20 22:24:07 UTC (rev 4153)
+++ mpich2/trunk/src/pm/hydra/pm/utils/pmcu_pmi.h 2009-03-20 23:39:20 UTC (rev 4154)
@@ -1,69 +0,0 @@
-/* -*- Mode: C; c-basic-offset:4 ; -*- */
-/*
- * (C) 2008 by Argonne National Laboratory.
- * See COPYRIGHT in top-level directory.
- */
-
-#ifndef PMCU_PMI_H_INCLUDED
-#define PMCU_PMI_H_INCLUDED
-
-#include "hydra.h"
-#include "hydra_utils.h"
-
-#define MAXKEYLEN 64 /* max length of key in keyval space */
-#define MAXVALLEN 256 /* max length of value in keyval space */
-#define MAXNAMELEN 256 /* max length of various names */
-#define MAXKVSNAME MAXNAMELEN /* max length of a kvsname */
-
-#define HYD_PMCU_NUM_STR 100
-
-typedef struct HYD_PMCU_pmi_kvs_pair {
- char key[MAXKEYLEN];
- char val[MAXVALLEN];
- struct HYD_PMCU_pmi_kvs_pair *next;
-} HYD_PMCU_pmi_kvs_pair_t;
-
-typedef struct HYD_PMCU_pmi_kvs {
- char kvs_name[MAXNAMELEN]; /* Name of this kvs */
- HYD_PMCU_pmi_kvs_pair_t *key_pair;
-} HYD_PMCU_pmi_kvs_t;
-
-typedef struct HYD_PMCU_pmi_pg HYD_PMCU_pmi_pg_t;
-typedef struct HYD_PMCU_pmi_process HYD_PMCU_pmi_process_t;
-
-struct HYD_PMCU_pmi_process {
- /* This is a bad design if we need to tie in an FD to a PMI
- * process. This essentially kills any chance of PMI server
- * masquerading. However, PMI v1 requires this state to be
- * maintained. */
- int fd;
- struct HYD_PMCU_pmi_pg *pg;
- struct HYD_PMCU_pmi_process *next;
-};
-
-struct HYD_PMCU_pmi_pg {
- int id;
-
- int num_procs; /* Number of processes in the group */
- int barrier_count;
-
- struct HYD_PMCU_pmi_process *process;
- HYD_PMCU_pmi_kvs_t *kvs;
-
- struct HYD_PMCU_pmi_pg *next;
-};
-
-HYD_Status HYD_PMCU_create_pg(void);
-HYD_Status HYD_PMCU_pmi_initack(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_init(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_get_maxes(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_get_appnum(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_get_my_kvsname(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_barrier_in(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_put(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_get(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_finalize(int fd, char *args[]);
-HYD_Status HYD_PMCU_pmi_get_usize(int fd, char *args[]);
-HYD_Status HYD_PMCU_finalize(void);
-
-#endif /* PMCU_PMI_H_INCLUDED */
Modified: mpich2/trunk/src/pm/hydra/pm/utils/pmi.c
===================================================================
--- mpich2/trunk/src/pm/hydra/pm/utils/pmi.c 2009-03-20 22:24:07 UTC (rev 4153)
+++ mpich2/trunk/src/pm/hydra/pm/utils/pmi.c 2009-03-20 23:39:20 UTC (rev 4154)
@@ -4,671 +4,4 @@
* See COPYRIGHT in top-level directory.
*/
-#include "hydra.h"
-#include "hydra_utils.h"
-#include "bsci.h"
-#include "pmcu_pmi.h"
-HYD_Handle handle;
-HYD_PMCU_pmi_pg_t *pg_list = NULL;
-
-static HYD_Status allocate_kvs(HYD_PMCU_pmi_kvs_t ** kvs, int pgid)
-{
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- HYDU_MALLOC(*kvs, HYD_PMCU_pmi_kvs_t *, sizeof(HYD_PMCU_pmi_kvs_t), status);
- MPIU_Snprintf((*kvs)->kvs_name, MAXNAMELEN, "kvs_%d_%d", (int) getpid(), pgid);
- (*kvs)->key_pair = NULL;
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-static HYD_Status create_pg(HYD_PMCU_pmi_pg_t ** pg, int pgid)
-{
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- HYDU_MALLOC(*pg, HYD_PMCU_pmi_pg_t *, sizeof(HYD_PMCU_pmi_pg_t), status);
- (*pg)->id = pgid;
- (*pg)->num_procs = 0;
- (*pg)->barrier_count = 0;
- (*pg)->process = NULL;
-
- status = allocate_kvs(&(*pg)->kvs, pgid);
- HYDU_ERR_POP(status, "unable to allocate kvs space\n");
-
- (*pg)->next = NULL;
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-static HYD_Status add_process_to_pg(HYD_PMCU_pmi_pg_t * pg, int fd)
-{
- HYD_PMCU_pmi_process_t *process, *tmp;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- HYDU_MALLOC(process, HYD_PMCU_pmi_process_t *, sizeof(HYD_PMCU_pmi_process_t), status);
- process->fd = fd;
- process->pg = pg;
- process->next = NULL;
- if (pg->process == NULL)
- pg->process = process;
- else {
- tmp = pg->process;
- while (tmp->next)
- tmp = tmp->next;
- tmp->next = process;
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_create_pg(void)
-{
- struct HYD_Proc_params *proc_params;
- int num_procs;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- /* Find the number of processes in the PG */
- num_procs = 0;
- proc_params = handle.proc_params;
- while (proc_params) {
- num_procs += proc_params->exec_proc_count;
- proc_params = proc_params->next;
- }
-
- status = create_pg(&pg_list, 0);
- HYDU_ERR_POP(status, "unable to create pg\n");
- pg_list->num_procs = num_procs;
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_initack(int fd, char *args[])
-{
- int id, size, debug, i;
- char *ssize, *srank, *sdebug, *tmp[HYDU_NUM_JOIN_STR], *cmd;
- struct HYD_Proc_params *proc_params;
- HYD_PMCU_pmi_pg_t *run;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- strtok(args[0], "=");
- id = atoi(strtok(NULL, "="));
-
- size = 0;
- proc_params = handle.proc_params;
- while (proc_params) {
- size += proc_params->exec_proc_count;
- proc_params = proc_params->next;
- }
- debug = handle.debug;
-
- status = HYDU_int_to_str(size, &ssize);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- status = HYDU_int_to_str(id, &srank);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- status = HYDU_int_to_str(debug, &sdebug);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- i = 0;
- tmp[i++] = "cmd=initack\ncmd=set size=";
- tmp[i++] = ssize;
- tmp[i++] = "\ncmd=set rank=";
- tmp[i++] = srank;
- tmp[i++] = "\ncmd=set debug=";
- tmp[i++] = sdebug;
- tmp[i++] = "\n";
- tmp[i++] = NULL;
-
- status = HYDU_str_alloc_and_join(tmp, &cmd);
- HYDU_ERR_POP(status, "error while joining strings\n");
-
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
-
- HYDU_FREE(ssize);
- HYDU_FREE(srank);
- HYDU_FREE(sdebug);
- HYDU_FREE(cmd);
-
- run = pg_list;
- while (run->next)
- run = run->next;
-
- /* Add the process to the last PG */
- status = add_process_to_pg(run, fd);
- HYDU_ERR_POP(status, "unable to add process to pg\n");
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_init(int fd, char *args[])
-{
- int pmi_version, pmi_subversion;
- char *tmp[HYDU_NUM_JOIN_STR];
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- strtok(args[0], "=");
- pmi_version = atoi(strtok(NULL, "="));
- strtok(args[1], "=");
- pmi_subversion = atoi(strtok(NULL, "="));
-
- if (pmi_version == 1 && pmi_subversion <= 1) {
- /* We support PMI v1.0 and 1.1 */
- tmp[0] = "cmd=response_to_init pmi_version=1 pmi_subversion=1 rc=0\n";
- status = HYDU_sock_writeline(fd, tmp[0], strlen(tmp[0]));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- }
- else {
- /* PMI version mismatch */
- HYDU_ERR_SETANDJUMP2(status, HYD_INTERNAL_ERROR,
- "PMI version mismatch; %d.%d\n", pmi_version, pmi_subversion);
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_get_maxes(int fd, char *args[])
-{
- int i;
- char *tmp[HYDU_NUM_JOIN_STR], *cmd;
- char *maxkvsname, *maxkeylen, *maxvallen;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- status = HYDU_int_to_str(MAXKVSNAME, &maxkvsname);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- status = HYDU_int_to_str(MAXKEYLEN, &maxkeylen);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- status = HYDU_int_to_str(MAXVALLEN, &maxvallen);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- i = 0;
- tmp[i++] = "cmd=maxes kvsname_max=";
- tmp[i++] = maxkvsname;
- tmp[i++] = " keylen_max=";
- tmp[i++] = maxkeylen;
- tmp[i++] = " vallen_max=";
- tmp[i++] = maxvallen;
- tmp[i++] = "\n";
- tmp[i++] = NULL;
-
- status = HYDU_str_alloc_and_join(tmp, &cmd);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- HYDU_FREE(cmd);
-
- HYDU_FREE(maxkvsname);
- HYDU_FREE(maxkeylen);
- HYDU_FREE(maxvallen);
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-static HYD_PMCU_pmi_process_t *find_process(int fd)
-{
- HYD_PMCU_pmi_pg_t *pg;
- HYD_PMCU_pmi_process_t *process = NULL;
-
- pg = pg_list;
- while (pg) {
- process = pg->process;
- while (process) {
- if (process->fd == fd)
- break;
- process = process->next;
- }
- pg = pg->next;
- }
-
- return process;
-}
-
-
-HYD_Status HYD_PMCU_pmi_get_appnum(int fd, char *args[])
-{
- char *tmp[HYDU_NUM_JOIN_STR], *cmd;
- char *sapp_num;
- int i;
- HYD_PMCU_pmi_process_t *process;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- /* Find the group id corresponding to this fd */
- process = find_process(fd);
- if (process == NULL) /* We didn't find the process */
- HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
- "unable to find process structure\n");
-
- status = HYDU_int_to_str(process->pg->id, &sapp_num);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- i = 0;
- tmp[i++] = "cmd=appnum appnum=";
- tmp[i++] = sapp_num;
- tmp[i++] = "\n";
- tmp[i++] = NULL;
-
- status = HYDU_str_alloc_and_join(tmp, &cmd);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- HYDU_FREE(cmd);
-
- HYDU_FREE(sapp_num);
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_get_my_kvsname(int fd, char *args[])
-{
- char *tmp[HYDU_NUM_JOIN_STR], *cmd;
- int i;
- HYD_PMCU_pmi_process_t *process;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- /* Find the group id corresponding to this fd */
- process = find_process(fd);
- if (process == NULL) /* We didn't find the process */
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
- "unable to find process structure for fd %d\n", fd);
-
- i = 0;
- tmp[i++] = "cmd=my_kvsname kvsname=";
- tmp[i++] = process->pg->kvs->kvs_name;
- tmp[i++] = "\n";
- tmp[i++] = NULL;
-
- status = HYDU_str_alloc_and_join(tmp, &cmd);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- HYDU_FREE(cmd);
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_barrier_in(int fd, char *args[])
-{
- HYD_PMCU_pmi_process_t *process, *run;
- char *cmd;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- /* Find the group id corresponding to this fd */
- process = find_process(fd);
- if (process == NULL) /* We didn't find the process */
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
- "unable to find process structure for fd %d\n", fd);
-
- process->pg->barrier_count++;
-
- /* All the processes have arrived at the barrier; send a
- * barrier_out message to everyone. */
- if (process->pg->barrier_count == process->pg->num_procs) {
- cmd = "cmd=barrier_out\n";
- run = process->pg->process; /* The first process in the list */
- while (run) {
- status = HYDU_sock_writeline(run->fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- run = run->next;
- }
-
- process->pg->barrier_count = 0;
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_put(int fd, char *args[])
-{
- int i;
- HYD_PMCU_pmi_process_t *process;
- HYD_PMCU_pmi_kvs_pair_t *key_pair, *run;
- char *kvsname, *key, *val, *key_pair_str = NULL;
- char *tmp[HYDU_NUM_JOIN_STR], *cmd;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- strtok(args[0], "=");
- kvsname = strtok(NULL, "=");
- strtok(args[1], "=");
- key = strtok(NULL, "=");
- strtok(args[2], "=");
- val = strtok(NULL, "=");
-
- /* Find the group id corresponding to this fd */
- process = find_process(fd);
- if (process == NULL) /* We didn't find the process */
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
- "unable to find process structure for fd %d\n", fd);
-
- if (strcmp(process->pg->kvs->kvs_name, kvsname))
- HYDU_ERR_SETANDJUMP2(status, HYD_INTERNAL_ERROR,
- "kvsname (%s) does not match this process' kvs space (%s)\n",
- kvsname, process->pg->kvs->kvs_name);
-
- HYDU_MALLOC(key_pair, HYD_PMCU_pmi_kvs_pair_t *, sizeof(HYD_PMCU_pmi_kvs_pair_t), status);
- MPIU_Snprintf(key_pair->key, MAXKEYLEN, "%s", key);
- MPIU_Snprintf(key_pair->val, MAXVALLEN, "%s", val);
- key_pair->next = NULL;
-
- i = 0;
- tmp[i++] = "cmd=put_result rc=";
- if (process->pg->kvs->key_pair == NULL) {
- process->pg->kvs->key_pair = key_pair;
- tmp[i++] = "0 msg=success";
- }
- else {
- run = process->pg->kvs->key_pair;
- while (run->next) {
- if (!strcmp(run->key, key_pair->key)) {
- tmp[i++] = "-1 msg=duplicate_key";
- key_pair_str = MPIU_Strdup(key_pair->key);
- tmp[i++] = key_pair_str;
- break;
- }
- run = run->next;
- }
- run->next = key_pair;
- tmp[i++] = "0 msg=success";
- }
- tmp[i++] = "\n";
- tmp[i++] = NULL;
-
- status = HYDU_str_alloc_and_join(tmp, &cmd);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- HYDU_FREE(cmd);
- HYDU_FREE(key_pair_str);
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_get(int fd, char *args[])
-{
- int i;
- HYD_PMCU_pmi_process_t *process;
- HYD_PMCU_pmi_kvs_pair_t *run;
- char *kvsname, *key;
- char *tmp[HYDU_NUM_JOIN_STR], *cmd, *key_val_str = NULL;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- strtok(args[0], "=");
- kvsname = strtok(NULL, "=");
- strtok(args[1], "=");
- key = strtok(NULL, "=");
-
- /* Find the group id corresponding to this fd */
- process = find_process(fd);
- if (process == NULL) /* We didn't find the process */
- HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
- "unable to find process structure for fd %d\n", fd);
-
- if (strcmp(process->pg->kvs->kvs_name, kvsname))
- HYDU_ERR_SETANDJUMP2(status, HYD_INTERNAL_ERROR,
- "kvsname (%s) does not match this process' kvs space (%s)\n",
- kvsname, process->pg->kvs->kvs_name);
-
- i = 0;
- tmp[i++] = "cmd=get_result rc=";
- if (process->pg->kvs->key_pair == NULL) {
- tmp[i++] = "-1 msg=key_";
- tmp[i++] = key;
- tmp[i++] = "_not_found value=unknown";
- }
- else {
- run = process->pg->kvs->key_pair;
- while (run) {
- if (!strcmp(run->key, key)) {
- tmp[i++] = "0 msg=success value=";
- key_val_str = MPIU_Strdup(run->val);
- tmp[i++] = key_val_str;
- break;
- }
- run = run->next;
- }
- if (run == NULL) {
- tmp[i++] = "-1 msg=key_";
- tmp[i++] = key;
- tmp[i++] = "_not_found value=unknown";
- }
- }
- tmp[i++] = "\n";
- tmp[i++] = NULL;
-
- status = HYDU_str_alloc_and_join(tmp, &cmd);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- HYDU_FREE(cmd);
- HYDU_FREE(key_val_str);
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_finalize(int fd, char *args[])
-{
- char *cmd;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- cmd = "cmd=finalize_ack\n";
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-HYD_Status HYD_PMCU_pmi_get_usize(int fd, char *args[])
-{
- int usize, i;
- char *tmp[HYDU_NUM_JOIN_STR], *cmd, *usize_str;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- status = HYD_BSCI_get_usize(&usize);
- HYDU_ERR_POP(status, "unable to get bootstrap universe size\n");
-
- status = HYDU_int_to_str(usize, &usize_str);
- HYDU_ERR_POP(status, "unable to convert int to string\n");
-
- i = 0;
- tmp[i++] = "cmd=universe_size size=";
- tmp[i++] = usize_str;
- tmp[i++] = "\n";
- tmp[i++] = NULL;
-
- status = HYDU_str_alloc_and_join(tmp, &cmd);
- HYDU_ERR_POP(status, "unable to join strings\n");
-
- status = HYDU_sock_writeline(fd, cmd, strlen(cmd));
- HYDU_ERR_POP(status, "error writing PMI line\n");
- HYDU_FREE(cmd);
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-static HYD_Status free_pmi_process_list(HYD_PMCU_pmi_process_t * process_list)
-{
- HYD_PMCU_pmi_process_t *process, *tmp;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- process = process_list;
- while (process) {
- tmp = process->next;
- HYDU_FREE(process);
- process = tmp;
- }
-
- HYDU_FUNC_EXIT();
- return status;
-}
-
-
-static HYD_Status free_pmi_kvs_list(HYD_PMCU_pmi_kvs_t * kvs_list)
-{
- HYD_PMCU_pmi_kvs_pair_t *key_pair, *tmp;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- key_pair = kvs_list->key_pair;
- while (key_pair) {
- tmp = key_pair->next;
- HYDU_FREE(key_pair);
- key_pair = tmp;
- }
- HYDU_FREE(kvs_list);
-
- HYDU_FUNC_EXIT();
- return status;
-}
-
-
-HYD_Status HYD_PMCU_finalize(void)
-{
- HYD_PMCU_pmi_pg_t *pg, *tmp;
- HYD_Status status = HYD_SUCCESS;
-
- HYDU_FUNC_ENTER();
-
- pg = pg_list;
- while (pg) {
- tmp = pg->next;
-
- status = free_pmi_process_list(pg->process);
- HYDU_ERR_POP(status, "unable to free process list\n");
-
- status = free_pmi_kvs_list(pg->kvs);
- HYDU_ERR_POP(status, "unable to free kvs list\n");
-
- HYDU_FREE(pg);
- pg = tmp;
- }
-
- fn_exit:
- HYDU_FUNC_EXIT();
- return status;
-
- fn_fail:
- goto fn_exit;
-}
More information about the mpich2-commits
mailing list