[Swift-commit] cog r3494

swift at ci.uchicago.edu swift at ci.uchicago.edu
Sun Oct 28 21:35:10 CDT 2012


------------------------------------------------------------------------
r3494 | hategan | 2012-10-28 21:33:33 -0500 (Sun, 28 Oct 2012) | 1 line

added command line client
------------------------------------------------------------------------
Index: modules/provider-coaster-c-client/configure.ac
===================================================================
--- modules/provider-coaster-c-client/configure.ac	(revision 3493)
+++ modules/provider-coaster-c-client/configure.ac	(working copy)
@@ -23,7 +23,7 @@
 CXXFLAGS=
 
 # Checks for header files.
-AC_CHECK_HEADERS([arpa/inet.h fcntl.h netdb.h netinet/in.h stdlib.h string.h sys/socket.h sys/time.h unistd.h])
+AC_CHECK_HEADERS([arpa/inet.h fcntl.h netdb.h netinet/in.h stdlib.h string.h sys/socket.h sys/time.h unistd.h getopt.h])
 
 # Libs
 AC_CHECK_LIB(pthread, pthread_create)
Index: modules/provider-coaster-c-client/.autotools
===================================================================
--- modules/provider-coaster-c-client/.autotools	(revision 3493)
+++ modules/provider-coaster-c-client/.autotools	(working copy)
@@ -30,11 +30,6 @@
 <option id="program-suffix" value=""/>
 <option id="program-transform-name" value=""/>
 <option id="enable-maintainer-mode" value="false"/>
-<flag id="CFLAGS">
-<flagvalue id="cflags-debug" value="false"/>
-<flagvalue id="cflags-gprof" value="false"/>
-<flagvalue id="cflags-gcov" value="false"/>
-</flag>
 <option id="user" value=""/>
 <option id="autogen" value="autogen.sh"/>
 <option id="autogenOpts" value=""/>
Index: modules/provider-coaster-c-client/src/RunCoasterJob.cpp
===================================================================
--- modules/provider-coaster-c-client/src/RunCoasterJob.cpp	(revision 0)
+++ modules/provider-coaster-c-client/src/RunCoasterJob.cpp	(revision 3494)
@@ -0,0 +1,268 @@
+
+#include <getopt.h>
+#include <stdlib.h>
+#include <list>
+#include <string.h>
+
+#include "CoasterError.h"
+#include "CoasterLoop.h"
+#include "CoasterClient.h"
+#include "Job.h"
+#include "Settings.h"
+#include "Logger.h"
+
+using namespace std;
+
+static struct option long_options[] = {
+   {"service", required_argument, 0, 's'},
+   {"job-manager", required_argument, 0, 'j'},
+   {"env", required_argument, 0, 'e'},
+   {"attr",  required_argument, 0, 'a'},
+   {"option",  required_argument, 0, 'o'},
+   {"stdout",  required_argument, 0, 'O'},
+   {"stderr",  required_argument, 0, 'E'},
+   {"verbosity",  required_argument, 0, 'v'},
+   {"help",  no_argument, 0, 'h'},
+   {0, 0, 0, 0}
+};
+
+struct KV {
+	char* key;
+	char* value;
+};
+
+void parsePair(char* s, KV* pair);
+void parseArguments(int argc, char* argv[]);
+void checkArguments();
+void configureLogging();
+int runJob();
+void displayHelp();
+
+char* serviceUrl = NULL;
+list<char*> envs;
+list<char*> attrs;
+list<char*> options;
+char* executable = NULL;
+list<char*> args;
+char* jobManager = NULL;
+char* stdoutLoc = NULL;
+char* stderrLoc = NULL;
+char* verbosity = NULL;
+
+int main(int argc, char* argv[]) {
+	try {
+		parseArguments(argc, argv);
+		checkArguments();
+		configureLogging();
+		return runJob();
+	}
+	catch (exception& e) {
+		cerr << "Exception caught: " << e.what() << endl;
+		return EXIT_FAILURE;
+	}
+}
+
+void parseArguments(int argc, char* argv[]) {
+	int oindex, c;
+	while (true) {
+		c = getopt_long(argc, argv, "hs:e:a:o:j:O:E:v:", long_options, &oindex);
+		if (c == -1) {
+			break;
+		}
+		switch (c) {
+			case 's':
+				serviceUrl = optarg;
+				break;
+			case 'e':
+				envs.push_back(optarg);
+				break;
+			case 'a':
+				attrs.push_back(optarg);
+				break;
+			case 'o':
+				options.push_back(optarg);
+				break;
+			case 'j':
+				jobManager = optarg;
+				break;
+			case 'O':
+				stdoutLoc = optarg;
+				break;
+			case 'E':
+				stderrLoc = optarg;
+				break;
+			case 'v':
+				verbosity = optarg;
+				break;
+			case 'h':
+				displayHelp();
+				exit(0);
+				break;
+			case '?':
+				if (optopt == 'c' || optopt == 'e' || optopt == 'a' || optopt == 'o'
+						|| optopt == 'j' || optopt == 'O' || optopt == 'E') {
+					cerr << "Missing argument for option '-" << (char) optopt << "'" << endl;
+					exit(1);
+				}
+				else {
+					exit(1);
+				}
+			default:
+				cerr << "Unknown error parsing command line" << endl;
+				exit(1);
+		}
+	}
+
+	int index = optind;
+	if (index == argc) {
+		cerr << "Missing executable" << endl;
+		exit(1);
+	}
+	executable = argv[index++];
+	while (index < argc) {
+		args.push_back(argv[index++]);
+	}
+}
+
+void displayHelp() {
+	cout << "Usage: run-coaster-job [OPTIONS] EXECUTABLE [ARGUMENTS]" << endl;
+	cout << "where:" << endl << endl;
+	cout << "\tEXECUTABLE   The executable to run (e.g. '/bin/date')" << endl;
+	cout << "\tARGUMENTS    A list of arguments to pass to the EXECUTABLE" << endl;
+	cout << "\tOPTIONS      An option. Short options use a space as separator" << endl;
+	cout << "\t             between option name and value, while long options" << endl;
+	cout <<	"\t             use the equal sign (e.g. --service=localhost). " << endl;
+	cout << "\t             Can be one of the following:" << endl << endl;
+	cout << "\t--help|-h     Displays this message" << endl << endl;
+	cout << "\t--service|-s <host>[:<port>]" << endl;
+	cout <<	"\t              REQUIRED. The location of the coaster service." << endl << endl;
+	cout << "\t--job-manager|-j <value>" << endl;
+	cout << "\t              The job manager to use." << endl << endl;
+	cout << "\t--env|-e <name>=<value>" << endl;
+	cout << "\t              An environment variable to be passed to the " << endl;
+	cout << "\t              executable. Can be used more than once." << endl << endl;
+	cout << "\t--attr|-a <name>=<value>" << endl;
+	cout << "\t              A job attribute (such as 'maxwalltime')." << endl;
+	cout << "\t              Can be used more than once." << endl << endl;
+	cout << "\t--option|-o <name>=<value>" << endl;
+	cout << "\t              An option to pass to the coaster service " << endl;
+	cout << "\t              (such as 'slots=10'). Can be used more than" << endl;
+	cout << "\t              once." << endl << endl;
+	cout << "\t--stdout|-O <file>" << endl;
+	cout << "\t              If present, the job standard output will be" << endl;
+	cout << "\t              redirected to the specified remote file." << endl << endl;
+	cout << "\t--stderr|-E <file>" << endl;
+	cout << "\t              If present, the job standard error will be" << endl;
+	cout << "\t              redirected to the specified remote file." << endl << endl;
+	cout << "\t--verbosity|-v ['d'|'i'] " << endl;
+	cout << "\t              Controls the verbosity of the logging messages" << endl;
+	cout << "\t              printed on stdout. By default only WARN and ERROR" << endl;
+	cout << "\t              levels are printed. 'i' further enables INFO" << endl;
+	cout << "\t              message, while 'd' enables all messages." << endl << endl;
+	cout << endl;
+}
+
+void checkArguments() {
+	if (serviceUrl == NULL) {
+		cerr << "Missing service argument" << endl;
+		exit(1);
+	}
+	if (jobManager == NULL) {
+		jobManager = (char*) malloc(16);
+		strcpy(jobManager, "local");
+	}
+	if (verbosity == NULL) {
+		verbosity = (char*) malloc(2);
+		strcpy(verbosity, "w");
+	}
+	else {
+		if (strcmp(verbosity, "i") && strcmp(verbosity, "d")) {
+			cerr << "Invalid verbosity value (" << verbosity << "). Valid values are 'd' and 'i'" << endl;
+			exit(1);
+		}
+	}
+}
+
+void configureLogging() {
+	switch (*verbosity) {
+		case 'w':
+			Logger::singleton().setThreshold(Logger::WARN);
+			break;
+		case 'i':
+			Logger::singleton().setThreshold(Logger::INFO);
+			break;
+		case 'd':
+			Logger::singleton().setThreshold(Logger::DEBUG);
+			break;
+	}
+}
+
+int runJob() {
+	CoasterLoop loop;
+	loop.start();
+
+	CoasterClient client(serviceUrl, loop);
+	client.start();
+
+	Settings s;
+	list<char*>::iterator i;
+	KV pair;
+	for (i = options.begin(); i != options.end(); i++) {
+		parsePair(*i, &pair);
+		string* skey = new string(pair.key);
+		s.set(*skey, pair.value);
+	}
+
+	client.setOptions(s);
+
+	Job j(executable);
+	for (i = args.begin(); i != args.end(); i++) {
+		j.addArgument(*i);
+	}
+
+	for (i = attrs.begin(); i != attrs.end(); i++) {
+		parsePair(*i, &pair);
+		j.setAttribute(pair.key, pair.value);
+	}
+
+	for (i = envs.begin(); i != envs.end(); i++) {
+		parsePair(*i, &pair);
+		j.setEnv(pair.key, pair.value);
+	}
+
+	if (stdoutLoc != NULL) {
+		string* str = new string(stdoutLoc);
+		j.setStdoutLocation(*str);
+	}
+
+	if (stderrLoc != NULL) {
+		string* str = new string(stderrLoc);
+		j.setStderrLocation(*str);
+	}
+
+	client.submit(j);
+
+	client.waitForJob(j);
+
+	if (j.getStatus()->getStatusCode() == FAILED) {
+		cerr << "Job failed: " << *j.getStatus()->getMessage() << endl;
+		return 2;
+	}
+
+	cout << "Job completed" << endl;
+	return EXIT_SUCCESS;
+}
+
+void parsePair(char* s, KV* pair) {
+	pair->key = s;
+	while (*s) {
+		if (*s == '=') {
+			*s = 0;
+			pair->value = s + 1;
+			return;
+		}
+		s++;
+	}
+	cerr << "Invalid argument value: " << s << endl;
+	exit(1);
+}
Index: modules/provider-coaster-c-client/src/CoasterClientTest.cpp
===================================================================
--- modules/provider-coaster-c-client/src/CoasterClientTest.cpp	(revision 3493)
+++ modules/provider-coaster-c-client/src/CoasterClientTest.cpp	(working copy)
@@ -36,6 +36,8 @@
 		client.waitForJob(j2);
 		list<Job*>* doneJobs = client.getAndPurgeDoneJobs();
 
+		delete doneJobs;
+
 		if (j1.getStatus()->getStatusCode() == FAILED) {
 			cerr << "Job 1 failed: " << *j1.getStatus()->getMessage() << endl;
 		}
Index: modules/provider-coaster-c-client/src/CoasterLoop.cpp
===================================================================
--- modules/provider-coaster-c-client/src/CoasterLoop.cpp	(revision 3493)
+++ modules/provider-coaster-c-client/src/CoasterLoop.cpp	(working copy)
@@ -90,8 +90,8 @@
 
 		// can read or has data to write
 		// try to read from each socket
-		if (loop->readSockets(&myrfds)) {
-			// no channel sockets had anything to read, so there is data to write
+		if (ret > 0 && loop->readSockets(&myrfds)) {
+			// wake pipe has stuff to read
 			LogDebug << "Write requested" << endl;
 			{ Lock::Scoped l(loop->lock);
 				// synchronize when copying wfds since they are concurrently updated
@@ -105,7 +105,9 @@
 			// can be written to
 			int ret = select(loop->getMaxFD() + 1, NULL, &mywfds, NULL, &timeout);
 			checkSelectError(ret);
-			loop->writeSockets(&mywfds);
+			if (ret > 0) {
+				loop->writeSockets(&mywfds);
+			}
 		}
 		loop->checkHeartbeats();
 	}
@@ -142,7 +144,7 @@
 void checkSelectError(int ret) {
 	if (ret < 0) {
 		if (errno == EBADF) {
-			// at least one fd is invalid/has an error
+			// TODO at least one fd is invalid/has an error
 		}
 		else {
 			throw CoasterError(string("Error in select: ") + strerror(errno));
@@ -246,6 +248,7 @@
 void CoasterLoop::requestWrite(CoasterChannel* channel, int count) { Lock::Scoped l(lock);
 	LogDebug << "Channel " << channel << " requests " << count << " writes." << endl;
 	if (!FD_ISSET(channel->getSockFD(), &wfds)) {
+		// TODO there is nothing to remove a socket from wfds and there should be
 		FD_SET(channel->getSockFD(), &wfds);
 		updateMaxFD();
 	}
Index: modules/provider-coaster-c-client/src/Job.cpp
===================================================================
--- modules/provider-coaster-c-client/src/Job.cpp	(revision 3493)
+++ modules/provider-coaster-c-client/src/Job.cpp	(working copy)
@@ -194,6 +194,10 @@
 }
 
 void Job::setStatus(JobStatus* newStatus) {
+	// Since the client can process a job status while another
+	// status is coming in, a status cannot be deleted when a new status comes in.
+	// Instead, all statuses get chained and all get de-allocated
+	// when the job is de-allocated.
 	newStatus->setPreviousStatus(status);
 	status = newStatus;
 }
Index: modules/provider-coaster-c-client/src/Makefile.am
===================================================================
--- modules/provider-coaster-c-client/src/Makefile.am	(revision 3493)
+++ modules/provider-coaster-c-client/src/Makefile.am	(working copy)
@@ -1,6 +1,8 @@
-bin_PROGRAMS = CoasterClientTest
-CoasterClientTest_SOURCES = CoasterClientTest.cpp
-CoasterClientTest_LDADD = libcoasterclient.la
+bin_PROGRAMS = coaster-client-test run-coaster-job
+coaster_client_test_SOURCES = CoasterClientTest.cpp
+run_coaster_job_SOURCES = RunCoasterJob.cpp
+coaster_client_test_LDADD = libcoasterclient.la
+run_coaster_job_LDADD = libcoasterclient.la
 
 lib_LTLIBRARIES = libcoasterclient.la
 
Index: modules/provider-coaster-c-client/src/RemoteCoasterException.cpp
===================================================================
--- modules/provider-coaster-c-client/src/RemoteCoasterException.cpp	(revision 3493)
+++ modules/provider-coaster-c-client/src/RemoteCoasterException.cpp	(working copy)
@@ -15,6 +15,7 @@
 string* getString(const char* s, int len, int& ptr, int n);
 
 RemoteCoasterException::RemoteCoasterException(const char* data, int len) {
+	// These are actual serialized java exceptions
 	/*int cnt = 1;
 	char t[8];
 	for (int i = 0; i < len; i++) {



More information about the Swift-commit mailing list