[Swift-commit] cog r3494
swift at ci.uchicago.edu
swift at ci.uchicago.edu
Sun Oct 28 21:35:10 CDT 2012
------------------------------------------------------------------------
r3494 | hategan | 2012-10-28 21:33:33 -0500 (Sun, 28 Oct 2012) | 1 line
added command line client
------------------------------------------------------------------------
Index: modules/provider-coaster-c-client/configure.ac
===================================================================
--- modules/provider-coaster-c-client/configure.ac (revision 3493)
+++ modules/provider-coaster-c-client/configure.ac (working copy)
@@ -23,7 +23,7 @@
CXXFLAGS=
# Checks for header files.
-AC_CHECK_HEADERS([arpa/inet.h fcntl.h netdb.h netinet/in.h stdlib.h string.h sys/socket.h sys/time.h unistd.h])
+AC_CHECK_HEADERS([arpa/inet.h fcntl.h netdb.h netinet/in.h stdlib.h string.h sys/socket.h sys/time.h unistd.h getopt.h])
# Libs
AC_CHECK_LIB(pthread, pthread_create)
Index: modules/provider-coaster-c-client/.autotools
===================================================================
--- modules/provider-coaster-c-client/.autotools (revision 3493)
+++ modules/provider-coaster-c-client/.autotools (working copy)
@@ -30,11 +30,6 @@
<option id="program-suffix" value=""/>
<option id="program-transform-name" value=""/>
<option id="enable-maintainer-mode" value="false"/>
-<flag id="CFLAGS">
-<flagvalue id="cflags-debug" value="false"/>
-<flagvalue id="cflags-gprof" value="false"/>
-<flagvalue id="cflags-gcov" value="false"/>
-</flag>
<option id="user" value=""/>
<option id="autogen" value="autogen.sh"/>
<option id="autogenOpts" value=""/>
Index: modules/provider-coaster-c-client/src/RunCoasterJob.cpp
===================================================================
--- modules/provider-coaster-c-client/src/RunCoasterJob.cpp (revision 0)
+++ modules/provider-coaster-c-client/src/RunCoasterJob.cpp (revision 3494)
@@ -0,0 +1,268 @@
+
+#include <getopt.h>
+#include <stdlib.h>
+#include <list>
+#include <string.h>
+
+#include "CoasterError.h"
+#include "CoasterLoop.h"
+#include "CoasterClient.h"
+#include "Job.h"
+#include "Settings.h"
+#include "Logger.h"
+
+using namespace std;
+
+static struct option long_options[] = {
+ {"service", required_argument, 0, 's'},
+ {"job-manager", required_argument, 0, 'j'},
+ {"env", required_argument, 0, 'e'},
+ {"attr", required_argument, 0, 'a'},
+ {"option", required_argument, 0, 'o'},
+ {"stdout", required_argument, 0, 'O'},
+ {"stderr", required_argument, 0, 'E'},
+ {"verbosity", required_argument, 0, 'v'},
+ {"help", no_argument, 0, 'h'},
+ {0, 0, 0, 0}
+};
+
+struct KV {
+ char* key;
+ char* value;
+};
+
+void parsePair(char* s, KV* pair);
+void parseArguments(int argc, char* argv[]);
+void checkArguments();
+void configureLogging();
+int runJob();
+void displayHelp();
+
+char* serviceUrl = NULL;
+list<char*> envs;
+list<char*> attrs;
+list<char*> options;
+char* executable = NULL;
+list<char*> args;
+char* jobManager = NULL;
+char* stdoutLoc = NULL;
+char* stderrLoc = NULL;
+char* verbosity = NULL;
+
+int main(int argc, char* argv[]) {
+ try {
+ parseArguments(argc, argv);
+ checkArguments();
+ configureLogging();
+ return runJob();
+ }
+ catch (exception& e) {
+ cerr << "Exception caught: " << e.what() << endl;
+ return EXIT_FAILURE;
+ }
+}
+
+void parseArguments(int argc, char* argv[]) {
+ int oindex, c;
+ while (true) {
+ c = getopt_long(argc, argv, "hs:e:a:o:j:O:E:v:", long_options, &oindex);
+ if (c == -1) {
+ break;
+ }
+ switch (c) {
+ case 's':
+ serviceUrl = optarg;
+ break;
+ case 'e':
+ envs.push_back(optarg);
+ break;
+ case 'a':
+ attrs.push_back(optarg);
+ break;
+ case 'o':
+ options.push_back(optarg);
+ break;
+ case 'j':
+ jobManager = optarg;
+ break;
+ case 'O':
+ stdoutLoc = optarg;
+ break;
+ case 'E':
+ stderrLoc = optarg;
+ break;
+ case 'v':
+ verbosity = optarg;
+ break;
+ case 'h':
+ displayHelp();
+ exit(0);
+ break;
+ case '?':
+ if (optopt == 'c' || optopt == 'e' || optopt == 'a' || optopt == 'o'
+ || optopt == 'j' || optopt == 'O' || optopt == 'E') {
+ cerr << "Missing argument for option '-" << (char) optopt << "'" << endl;
+ exit(1);
+ }
+ else {
+ exit(1);
+ }
+ default:
+ cerr << "Unknown error parsing command line" << endl;
+ exit(1);
+ }
+ }
+
+ int index = optind;
+ if (index == argc) {
+ cerr << "Missing executable" << endl;
+ exit(1);
+ }
+ executable = argv[index++];
+ while (index < argc) {
+ args.push_back(argv[index++]);
+ }
+}
+
+void displayHelp() {
+ cout << "Usage: run-coaster-job [OPTIONS] EXECUTABLE [ARGUMENTS]" << endl;
+ cout << "where:" << endl << endl;
+ cout << "\tEXECUTABLE The executable to run (e.g. '/bin/date')" << endl;
+ cout << "\tARGUMENTS A list of arguments to pass to the EXECUTABLE" << endl;
+ cout << "\tOPTIONS An option. Short options use a space as separator" << endl;
+ cout << "\t between option name and value, while long options" << endl;
+ cout << "\t use the equal sign (e.g. --service=localhost). " << endl;
+ cout << "\t Can be one of the following:" << endl << endl;
+ cout << "\t--help|-h Displays this message" << endl << endl;
+ cout << "\t--service|-s <host>[:<port>]" << endl;
+ cout << "\t REQUIRED. The location of the coaster service." << endl << endl;
+ cout << "\t--job-manager|-j <value>" << endl;
+ cout << "\t The job manager to use." << endl << endl;
+ cout << "\t--env|-e <name>=<value>" << endl;
+ cout << "\t An environment variable to be passed to the " << endl;
+ cout << "\t executable. Can be used more than once." << endl << endl;
+ cout << "\t--attr|-a <name>=<value>" << endl;
+ cout << "\t A job attribute (such as 'maxwalltime')." << endl;
+ cout << "\t Can be used more than once." << endl << endl;
+ cout << "\t--option|-o <name>=<value>" << endl;
+ cout << "\t An option to pass to the coaster service " << endl;
+ cout << "\t (such as 'slots=10'). Can be used more than" << endl;
+ cout << "\t once." << endl << endl;
+ cout << "\t--stdout|-O <file>" << endl;
+ cout << "\t If present, the job standard output will be" << endl;
+ cout << "\t redirected to the specified remote file." << endl << endl;
+ cout << "\t--stderr|-E <file>" << endl;
+ cout << "\t If present, the job standard error will be" << endl;
+ cout << "\t redirected to the specified remote file." << endl << endl;
+ cout << "\t--verbosity|-v ['d'|'i'] " << endl;
+ cout << "\t Controls the verbosity of the logging messages" << endl;
+ cout << "\t printed on stdout. By default only WARN and ERROR" << endl;
+ cout << "\t levels are printed. 'i' further enables INFO" << endl;
+ cout << "\t message, while 'd' enables all messages." << endl << endl;
+ cout << endl;
+}
+
+void checkArguments() {
+ if (serviceUrl == NULL) {
+ cerr << "Missing service argument" << endl;
+ exit(1);
+ }
+ if (jobManager == NULL) {
+ jobManager = (char*) malloc(16);
+ strcpy(jobManager, "local");
+ }
+ if (verbosity == NULL) {
+ verbosity = (char*) malloc(2);
+ strcpy(verbosity, "w");
+ }
+ else {
+ if (strcmp(verbosity, "i") && strcmp(verbosity, "d")) {
+ cerr << "Invalid verbosity value (" << verbosity << "). Valid values are 'd' and 'i'" << endl;
+ exit(1);
+ }
+ }
+}
+
+void configureLogging() {
+ switch (*verbosity) {
+ case 'w':
+ Logger::singleton().setThreshold(Logger::WARN);
+ break;
+ case 'i':
+ Logger::singleton().setThreshold(Logger::INFO);
+ break;
+ case 'd':
+ Logger::singleton().setThreshold(Logger::DEBUG);
+ break;
+ }
+}
+
+int runJob() {
+ CoasterLoop loop;
+ loop.start();
+
+ CoasterClient client(serviceUrl, loop);
+ client.start();
+
+ Settings s;
+ list<char*>::iterator i;
+ KV pair;
+ for (i = options.begin(); i != options.end(); i++) {
+ parsePair(*i, &pair);
+ string* skey = new string(pair.key);
+ s.set(*skey, pair.value);
+ }
+
+ client.setOptions(s);
+
+ Job j(executable);
+ for (i = args.begin(); i != args.end(); i++) {
+ j.addArgument(*i);
+ }
+
+ for (i = attrs.begin(); i != attrs.end(); i++) {
+ parsePair(*i, &pair);
+ j.setAttribute(pair.key, pair.value);
+ }
+
+ for (i = envs.begin(); i != envs.end(); i++) {
+ parsePair(*i, &pair);
+ j.setEnv(pair.key, pair.value);
+ }
+
+ if (stdoutLoc != NULL) {
+ string* str = new string(stdoutLoc);
+ j.setStdoutLocation(*str);
+ }
+
+ if (stderrLoc != NULL) {
+ string* str = new string(stderrLoc);
+ j.setStderrLocation(*str);
+ }
+
+ client.submit(j);
+
+ client.waitForJob(j);
+
+ if (j.getStatus()->getStatusCode() == FAILED) {
+ cerr << "Job failed: " << *j.getStatus()->getMessage() << endl;
+ return 2;
+ }
+
+ cout << "Job completed" << endl;
+ return EXIT_SUCCESS;
+}
+
+void parsePair(char* s, KV* pair) {
+ pair->key = s;
+ while (*s) {
+ if (*s == '=') {
+ *s = 0;
+ pair->value = s + 1;
+ return;
+ }
+ s++;
+ }
+ cerr << "Invalid argument value: " << s << endl;
+ exit(1);
+}
Index: modules/provider-coaster-c-client/src/CoasterClientTest.cpp
===================================================================
--- modules/provider-coaster-c-client/src/CoasterClientTest.cpp (revision 3493)
+++ modules/provider-coaster-c-client/src/CoasterClientTest.cpp (working copy)
@@ -36,6 +36,8 @@
client.waitForJob(j2);
list<Job*>* doneJobs = client.getAndPurgeDoneJobs();
+ delete doneJobs;
+
if (j1.getStatus()->getStatusCode() == FAILED) {
cerr << "Job 1 failed: " << *j1.getStatus()->getMessage() << endl;
}
Index: modules/provider-coaster-c-client/src/CoasterLoop.cpp
===================================================================
--- modules/provider-coaster-c-client/src/CoasterLoop.cpp (revision 3493)
+++ modules/provider-coaster-c-client/src/CoasterLoop.cpp (working copy)
@@ -90,8 +90,8 @@
// can read or has data to write
// try to read from each socket
- if (loop->readSockets(&myrfds)) {
- // no channel sockets had anything to read, so there is data to write
+ if (ret > 0 && loop->readSockets(&myrfds)) {
+ // wake pipe has stuff to read
LogDebug << "Write requested" << endl;
{ Lock::Scoped l(loop->lock);
// synchronize when copying wfds since they are concurrently updated
@@ -105,7 +105,9 @@
// can be written to
int ret = select(loop->getMaxFD() + 1, NULL, &mywfds, NULL, &timeout);
checkSelectError(ret);
- loop->writeSockets(&mywfds);
+ if (ret > 0) {
+ loop->writeSockets(&mywfds);
+ }
}
loop->checkHeartbeats();
}
@@ -142,7 +144,7 @@
void checkSelectError(int ret) {
if (ret < 0) {
if (errno == EBADF) {
- // at least one fd is invalid/has an error
+ // TODO at least one fd is invalid/has an error
}
else {
throw CoasterError(string("Error in select: ") + strerror(errno));
@@ -246,6 +248,7 @@
void CoasterLoop::requestWrite(CoasterChannel* channel, int count) { Lock::Scoped l(lock);
LogDebug << "Channel " << channel << " requests " << count << " writes." << endl;
if (!FD_ISSET(channel->getSockFD(), &wfds)) {
+ // TODO there is nothing to remove a socket from wfds and there should be
FD_SET(channel->getSockFD(), &wfds);
updateMaxFD();
}
Index: modules/provider-coaster-c-client/src/Job.cpp
===================================================================
--- modules/provider-coaster-c-client/src/Job.cpp (revision 3493)
+++ modules/provider-coaster-c-client/src/Job.cpp (working copy)
@@ -194,6 +194,10 @@
}
void Job::setStatus(JobStatus* newStatus) {
+ // Since the client can process a job status while another
+ // status is coming in, a status cannot be deleted when a new status comes in.
+ // Instead, all statuses get chained and all get de-allocated
+ // when the job is de-allocated.
newStatus->setPreviousStatus(status);
status = newStatus;
}
Index: modules/provider-coaster-c-client/src/Makefile.am
===================================================================
--- modules/provider-coaster-c-client/src/Makefile.am (revision 3493)
+++ modules/provider-coaster-c-client/src/Makefile.am (working copy)
@@ -1,6 +1,8 @@
-bin_PROGRAMS = CoasterClientTest
-CoasterClientTest_SOURCES = CoasterClientTest.cpp
-CoasterClientTest_LDADD = libcoasterclient.la
+bin_PROGRAMS = coaster-client-test run-coaster-job
+coaster_client_test_SOURCES = CoasterClientTest.cpp
+run_coaster_job_SOURCES = RunCoasterJob.cpp
+coaster_client_test_LDADD = libcoasterclient.la
+run_coaster_job_LDADD = libcoasterclient.la
lib_LTLIBRARIES = libcoasterclient.la
Index: modules/provider-coaster-c-client/src/RemoteCoasterException.cpp
===================================================================
--- modules/provider-coaster-c-client/src/RemoteCoasterException.cpp (revision 3493)
+++ modules/provider-coaster-c-client/src/RemoteCoasterException.cpp (working copy)
@@ -15,6 +15,7 @@
string* getString(const char* s, int len, int& ptr, int n);
RemoteCoasterException::RemoteCoasterException(const char* data, int len) {
+ // These are actual serialized java exceptions
/*int cnt = 1;
char t[8];
for (int i = 0; i < len; i++) {
More information about the Swift-commit
mailing list