Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 42 additions & 46 deletions batch_job/src/vine_factory.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,18 @@ static char *batch_submit_options = NULL;
static const char *password_file = 0;
char *password;

/* A wrapper command applied to the worker itself. */
static char *wrapper_command = 0;

/* Additional files required by the worker wrapper. */
struct list *wrapper_inputs = 0;

/* The executable name of the worker itself. */
static char *worker_command = 0;

/* A wrapper command to be applied to every task executed. */
static char *task_wrapper = 0;

/* Unique number assigned to each worker instance for troubleshooting. */
static int worker_instance = 0;

Expand Down Expand Up @@ -525,54 +531,38 @@ static int submit_worker( struct batch_queue *queue )
debug_worker_options = string_format("-d all -o %s",worker_log_file);
}

char *features_string = make_features_string(features_table);
char *transfer_port_arg = transfer_port_range ? string_format("--transfer-port %s", transfer_port_range) : "";

/* The basic command differs whether using a project name or simple host:port. */

if(using_catalog) {
static char *submission_regex_escaped = NULL;
submission_regex_escaped = submission_regex ? string_escape_shell(submission_regex) : NULL;

cmd = string_format(
"./%s --parent-death -M %s -t %d -C '%s' %s %s %s %s %s %s %s %s %s",
worker_command,
submission_regex_escaped,
worker_timeout,
catalog_host,
debug_workers ? debug_worker_options : "",
factory_name ? string_format("--from-factory \"%s\"", factory_name) : "",
password_file ? "-P pwfile" : "",
resource_args ? resource_args : "",
manual_ssl_option ? "--ssl" : "",
features_string,
single_shot ? "--single-shot" : "",
transfer_port_arg,
extra_worker_args ? extra_worker_args : ""
);

free(submission_regex_escaped);
cmd = string_format("./%s -M %s", worker_command, submission_regex);
} else {
cmd = string_format(
"./%s --parent-death %s %d -t %d -C '%s' %s %s %s %s %s %s %s %s",
worker_command,
manager_host,
manager_port,
worker_timeout,
catalog_host,
debug_workers ? debug_worker_options : "",
password_file ? "-P pwfile" : "",
resource_args ? resource_args : "",
manual_ssl_option ? "--ssl" : "",
features_string,
single_shot ? "--single-shot" : "",
transfer_port_arg,
extra_worker_args ? extra_worker_args : ""
);
}
cmd = string_format("./%s %s %d", worker_command, manager_host, manager_port);
}

/* Add a formatted argument to a string s, reallocating and returning s. */
#define ADD_ARG2(s,format,value) s = string_combine(s,string_format(format,value))
#define ADD_ARG1(s,format) s = string_combine(s,string_format(format))

/* Add each of the fixed options. */
/* Be careful that each format string begins with a space. */
ADD_ARG2(cmd," -t %d",worker_timeout);
ADD_ARG2(cmd," -C '%s'",catalog_host);
ADD_ARG1(cmd," --parent-death");

/* Add each of the optional features. */
if(debug_workers) ADD_ARG2(cmd," %s",debug_worker_options);
if(factory_name) ADD_ARG2(cmd," --from-factory \"%s\"",factory_name);
if(password_file) ADD_ARG1(cmd," -P pwfile");
if(resource_args) ADD_ARG2(cmd," %s",resource_args);
if(manual_ssl_option) ADD_ARG1(cmd," --ssl");
if(single_shot) ADD_ARG1(cmd," --single-shot");
if(task_wrapper) ADD_ARG2(cmd," --task-wrapper \"%s\"",task_wrapper);
if(transfer_port_range) ADD_ARG2(cmd," --transfer-port %s",transfer_port_range);
if(extra_worker_args) ADD_ARG2(cmd," %s",extra_worker_args);

char *features_string = make_features_string(features_table);
ADD_ARG2(cmd," %s",features_string);
free(features_string);
if(transfer_port_range) {
free(transfer_port_arg);
}

if(wrapper_command) {
// Note that we don't use string_wrap_command here,
Expand Down Expand Up @@ -1379,8 +1369,9 @@ static void show_help(const char *cmd)
printf(" %-30s Extra options to give to worker.\n", "-E,--extra-options=<options>");
printf(" %-30s Port range for worker-worker transfers (e.g. 10000:11000). Passed as --transfer-port.\n", "--transfer-port=<port|min:max>");
printf(" %-30s Alternate binary instead of vine_worker.\n", "--worker-binary=<file>");
printf(" %-30s Wrap factory with this command prefix.\n","--wrapper");
printf(" %-30s Add this input file needed by the wrapper.\n","--wrapper-input");
printf(" %-30s Wrap worker with this command prefix.\n","--wrapper");
printf(" %-30s Wrap tasks with this command prefix.\n","--task-wrapper");
printf(" %-30s Add this input file needed by a task or worker wrapper.\n","--wrapper-input");
printf(" %-30s Run each worker inside this poncho environment.\n","--poncho-env=<file.tar.gz>");

printf("\nOptions specific to batch systems:\n");
Expand Down Expand Up @@ -1409,6 +1400,7 @@ enum{ LONG_OPT_CORES = 255,
LONG_OPT_WRAPPER,
LONG_OPT_WRAPPER_INPUT,
LONG_OPT_WORKER_BINARY,
LONG_OPT_TASK_WRAPPER,
LONG_OPT_K8S_IMAGE,
LONG_OPT_K8S_WORKER_IMAGE,
LONG_OPT_CATALOG,
Expand Down Expand Up @@ -1472,6 +1464,7 @@ static const struct option long_options[] = {
{"wrapper",required_argument, 0, LONG_OPT_WRAPPER},
{"wrapper-input",required_argument, 0, LONG_OPT_WRAPPER_INPUT},
{"ssl",no_argument, 0, LONG_OPT_USE_SSL},
{"task-wrapper",required_argument, 0, LONG_OPT_TASK_WRAPPER},
{"tls-sni", required_argument, 0, LONG_OPT_TLS_SNI},
{"factory-name",required_argument, 0, LONG_OPT_FACTORY_NAME},
{"single-shot", no_argument, 0, LONG_OPT_SINGLE_SHOT},
Expand Down Expand Up @@ -1621,6 +1614,9 @@ int main(int argc, char *argv[])
case LONG_OPT_WORKER_BINARY:
worker_command = xxstrdup(optarg);
break;
case LONG_OPT_TASK_WRAPPER:
task_wrapper = xxstrdup(optarg);
break;
case 'P':
password_file = optarg;
if(copy_file_to_buffer(optarg, &password, NULL) < 0) {
Expand Down
68 changes: 37 additions & 31 deletions batch_job/src/work_queue_factory.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,18 @@ static char *batch_submit_options = NULL;
static const char *password_file = 0;
char *password;

/* A wrapper command applied to the worker itself. */
static char *wrapper_command = 0;

/* Additional files required by the worker wrapper. */
struct list *wrapper_inputs = 0;

/* The executable name of the worker itself. */
static char *worker_command = 0;

/* A wrapper command to be applied to every task executed. */
static char *task_wrapper = 0;

static char *runos_os = 0;

/* -1 means 'not specified' */
Expand Down Expand Up @@ -446,39 +452,33 @@ static int submit_worker( struct batch_queue *queue )
worker = string_format("./%s", worker_command);
}

char *features_string = make_features_string(features_table);
/* The basic command differs whether using a project name or simple host:port. */

if(using_catalog) {
cmd = string_format(
"%s -M %s -t %d -C '%s' -d all -o worker.log %s %s %s %s %s %s",
worker,
submission_regex,
worker_timeout,
catalog_host,
factory_name ? string_format("--from-factory \"%s\"", factory_name) : "",
password_file ? "-P pwfile" : "",
resource_args ? resource_args : "",
manual_ssl_option ? "--ssl" : "",
features_string,
extra_worker_args ? extra_worker_args : ""
);
}
else {
cmd = string_format(
"%s %s %d -t %d -C '%s' -d all -o worker.log %s %s %s %s %s",
worker,
manager_host,
manager_port,
worker_timeout,
catalog_host,
password_file ? "-P pwfile" : "",
resource_args ? resource_args : "",
manual_ssl_option ? "--ssl" : "",
features_string,
extra_worker_args ? extra_worker_args : ""
);
cmd = string_format("./%s -M %s", worker, submission_regex);
} else {
cmd = string_format("./%s %s %d", worker, manager_host, manager_port);
}

/* Add a formatted argument to a string s, reallocating and returning s. */
#define ADD_ARG2(s,format,value) s = string_combine(s,string_format(format,value))
#define ADD_ARG1(s,format) s = string_combine(s,string_format(format))

/* Add each of the fixed options. */
/* Be careful that each format string begins with a space. */
ADD_ARG2(cmd," -t %d",worker_timeout);
ADD_ARG2(cmd," -C '%s'",catalog_host);

/* Add each of the optional features. */
if(factory_name) ADD_ARG2(cmd," --from-factory \"%s\"",factory_name);
if(password_file) ADD_ARG1(cmd," -P pwfile");
if(resource_args) ADD_ARG2(cmd," %s",resource_args);
if(manual_ssl_option) ADD_ARG1(cmd," --ssl");
if(task_wrapper) ADD_ARG2(cmd," --task-wrapper \"%s\"",task_wrapper);
if(extra_worker_args) ADD_ARG2(cmd," %s",extra_worker_args);

char *features_string = make_features_string(features_table);
ADD_ARG2(cmd," %s",features_string);
free(features_string);

if(wrapper_command) {
Expand Down Expand Up @@ -1236,8 +1236,9 @@ static void show_help(const char *cmd)
printf(" %-30s Environment variable to add to worker.\n", "--env=<variable=value>");
printf(" %-30s Extra options to give to worker.\n", "-E,--extra-options=<options>");
printf(" %-30s Alternate binary instead of work_queue_worker.\n", "--worker-binary=<file>");
printf(" %-30s Wrap factory with this command prefix.\n","--wrapper");
printf(" %-30s Add this input file needed by the wrapper.\n","--wrapper-input");
printf(" %-30s Wrap worker with this command prefix.\n","--wrapper");
printf(" %-30s Wrap tasks with this command prefix.\n","--task-wrapper");
printf(" %-30s Add this input file needed by a task or worker wrapper.\n","--wrapper-input");
printf(" %-30s Use runos tool to create environment (ND only).\n","--runos=<img>");
printf(" %-30s Run each worker inside this poncho environment.\n","--poncho-env=<file.tar.gz>");

Expand Down Expand Up @@ -1266,6 +1267,7 @@ enum{ LONG_OPT_CORES = 255,
LONG_OPT_WRAPPER,
LONG_OPT_WRAPPER_INPUT,
LONG_OPT_WORKER_BINARY,
LONG_OPT_TASK_WRAPPER,
LONG_OPT_K8S_IMAGE,
LONG_OPT_K8S_WORKER_IMAGE,
LONG_OPT_CATALOG,
Expand Down Expand Up @@ -1317,6 +1319,7 @@ static const struct option long_options[] = {
{"runos", required_argument, 0, LONG_OPT_RUN_OS},
{"scratch-dir", required_argument, 0, 'S' },
{"tasks-per-worker", required_argument, 0, LONG_OPT_TASKS_PER_WORKER},
{"task-wrapper", required_argument, 0, LONG_OPT_TASK_WRAPPER},
{"timeout", required_argument, 0, 't'},
{"version", no_argument, 0, 'v'},
{"worker-binary", required_argument, 0, LONG_OPT_WORKER_BINARY},
Expand Down Expand Up @@ -1469,6 +1472,9 @@ int main(int argc, char *argv[])
case LONG_OPT_WORKER_BINARY:
worker_command = xxstrdup(optarg);
break;
case LONG_OPT_TASK_WRAPPER:
task_wrapper = xxstrdup(optarg);
break;
case 'P':
password_file = optarg;
if(copy_file_to_buffer(optarg, &password, NULL) < 0) {
Expand Down
7 changes: 5 additions & 2 deletions doc/man/m4/vine_factory.m4
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ OPTION_ARG_LONG(transfer-port,port) Port range for worker-worker transfers (e.g.
OPTION_ARG_LONG(worker-binary,file)
Alternate binary instead of vine_worker.
OPTION_ARG_LONG(wrapper,cmd)
Wrap factory with this command prefix.
OPTION_ARG_LONG(wrapper-input,file) Add this input file needed by the wrapper.
Wrap worker with this command prefix.
OPTION_ARG_LONG(task-wrapper,cmd)
Wrap tasks with this command prefix.
OPTION_ARG_LONG(wrapper-input,file)
Add this input file needed by a task or worker wrapper.
OPTION_ARG_LONG(python-env,file.tar.gz) Run each worker inside this python environment.
OPTIONS_END

Expand Down
7 changes: 5 additions & 2 deletions doc/man/m4/work_queue_factory.m4
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ OPTION_ARG(E,extra-options,options)
OPTION_ARG_LONG(worker-binary,file)
Alternate binary instead of work_queue_worker.
OPTION_ARG_LONG(wrapper,cmd)
Wrap factory with this command prefix.
OPTION_ARG_LONG(wrapper-input,file) Add this input file needed by the wrapper.
Wrap worker with this command prefix.
OPTION_ARG_LONG(task-wrapper,cmd)
Wrap tasks with this command prefix.
OPTION_ARG_LONG(wrapper-input,file)
Add this input file needed by a task or worker wrapper.
OPTION_ARG_LONG(python-env,file.tar.gz) Run each worker inside this python environment.
OPTIONS_END

Expand Down
25 changes: 16 additions & 9 deletions taskvine/src/worker/vine_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,22 +391,29 @@ int vine_process_execute(struct vine_process *p)
/* Finally, add things that were explicitly given in the task description. */
export_environment(p);

/* Library task passes the file descriptors to talk to the manager via
* the command line plus the worker pid to wake the worker up
* so it requires a special execl. */
if (p->type != VINE_PROCESS_TYPE_LIBRARY) {
execl("/bin/sh", "sh", "-c", p->task->command_line, (char *)0);
} else {
char *final_command = string_format("%s --in-pipe-fd %d --out-pipe-fd %d --task-id %d --library-cores %d --function-slots %d --worker-pid %d",
p->task->command_line,
/* All process types begin with the provided command line */
char *final_command = strdup(p->task->command_line);

/* Library tasks get additional arguments to configure communication and convey resources. */
if (p->type == VINE_PROCESS_TYPE_LIBRARY) {
final_command = string_format("%s --in-pipe-fd %d --out-pipe-fd %d --task-id %d --library-cores %d --function-slots %d --worker-pid %d",
final_command,
in_pipe_fd,
out_pipe_fd,
p->task->task_id,
(int)p->task->resources_requested->cores,
p->task->function_slots_total,
getppid());
execl("/bin/sh", "sh", "-c", final_command, (char *)0);
}

/* Only standard and library processes are affected by task wrappers. */
if (p->type == VINE_PROCESS_TYPE_STANDARD || p->type == VINE_PROCESS_TYPE_LIBRARY) {
if (options->task_wrapper) {
final_command = string_wrap_command(final_command, options->task_wrapper);
}
}

execl("/bin/sh", "sh", "-c", final_command, (char *)0);
_exit(127); // Failed to execute the cmd.

/* NOTREACHED */
Expand Down
6 changes: 6 additions & 0 deletions taskvine/src/worker/vine_worker_options.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ void vine_worker_options_show_help(const char *cmd, struct vine_worker_options *
printf(" %-30s Defaults to %d.\n", "", options->disk_percent);

printf(" %-30s Use loop devices for task sandboxes (default=disabled, requires root access).\n", "--disk-allocation");
printf(" %-30s Apply a wrapper command to each task executed.\n", "--task-wrapper");
printf(" %-30s Specifies a user-defined feature the worker provides. May be specified several times.\n", "--feature");
printf(" %-30s Set the maximum number of seconds the worker may be active. (in s).\n", "--wall-time=<s>");

Expand Down Expand Up @@ -174,6 +175,7 @@ enum {
LONG_OPT_WORKSPACE,
LONG_OPT_KEEP_WORKSPACE,
LONG_OPT_MAX_TRANSFER_PROCS,
LONG_OPT_TASK_WRAPPER,
};

static const struct option long_options[] = {{"advertise", no_argument, 0, 'a'},
Expand Down Expand Up @@ -216,6 +218,7 @@ static const struct option long_options[] = {{"advertise", no_argument, 0, 'a'},
{"transfer-port", required_argument, 0, LONG_OPT_TRANSFER_PORT},
{"max-transfer-procs", required_argument, 0, LONG_OPT_MAX_TRANSFER_PROCS},
{"contact-hostport", required_argument, 0, LONG_OPT_CONTACT_HOSTPORT},
{"task-wrapper", required_argument, 0, LONG_OPT_TASK_WRAPPER},
{0, 0, 0, 0}};

static void vine_worker_options_get_env(const char *name, int64_t *manual_option)
Expand Down Expand Up @@ -466,6 +469,9 @@ void vine_worker_options_get(struct vine_worker_options *options, int argc, char
case LONG_OPT_MAX_TRANSFER_PROCS:
options->max_transfer_procs = atoi(optarg);
break;
case LONG_OPT_TASK_WRAPPER:
options->task_wrapper = optarg;
break;
default:
vine_worker_options_show_help(argv[0], options);
exit(1);
Expand Down
10 changes: 7 additions & 3 deletions taskvine/src/worker/vine_worker_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ struct vine_worker_options {
/* Maximum number of concurrent worker transfer requests made by worker */
int max_transfer_procs;

/* Explicit contact host (address or hostname) for transfers bewteen workers. */
char *reported_transfer_host;
int reported_transfer_port;
/* Explicit contact host (address or hostname) for transfers bewteen workers. */
char *reported_transfer_host;
int reported_transfer_port;

/* Wrapper command to be applied to beginning of each task command. */
const char *task_wrapper;

};

struct vine_worker_options * vine_worker_options_create();
Expand Down
10 changes: 10 additions & 0 deletions taskvine/test/TR_vine_valgrind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ export CORES=4
export TASKS=20
export VALGRIND="valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=definite,indirect,possible --track-origins=yes --track-fds=yes"

# valgrind creates a named fifo in $PWD/temp.
# This is not permitted when using AFS file system.
# Change the location to /tmp in that case.

location=$(echo "pwd" | cut -d/ -f2)
if $location eq afs
then
export VALGRIND="${VALGRIND} --vgdb-prefix=/tmp/valgrind-taskvine-fifo-$$"
fi

check_needed()
{
if ! ${VALGRIND} --version > /dev/null 2>&1
Expand Down
Loading
Loading