Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Options:

--port PORT port to listen on (default: 1234)
--gpus GPUS comma separated list of available gpu device ids
--ignore_gpus GPUS comma separated list of externally reserved gpu device ids
--threads THREADS number of available threads/cores
--memory MEMORY available main memory in mb
--abort_on_time_limit kill jobs if time limit is exceeded
Expand Down Expand Up @@ -159,6 +160,8 @@ Options:

-l, --local
Execute the script locally
-dp --depends_on JOB_ID
Jobs in the script are dependent on the execution of the currently running job with the id JOB_ID (from qstat)
-b BLOCK, --block BLOCK
Only submit/execute the specified block
-s BLOCK SUBTASK_ID, --subtask BLOCK SUBTASK_ID
Expand Down
7 changes: 4 additions & 3 deletions queue/block_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/python3

import re
import re, pathlib as pl
from typing import List, Dict, Tuple

class Block(object):

Expand Down Expand Up @@ -42,7 +43,7 @@ class Block_Parser(object):
def __init__(self):
self.blocks = []

def _parse_block(self, block):
def _parse_block(self, block: List[str]) -> Block:
# convert gpu=True/gpu=False to gpus=1/gpus=0 for compatibility with old scripts
block[0] = re.sub('gpu=true', 'gpus=1', block[0])
block[0] = re.sub('gpu=false', 'gpus=0', block[0])
Expand All @@ -61,7 +62,7 @@ def _parse_block(self, block):
parsed_block.check()
return parsed_block

def parse(self, script):
def parse(self, script: pl.Path) -> List[Block]:
try:
f = open(script, 'r')
content = f.read().split('\n')
Expand Down
21 changes: 11 additions & 10 deletions queue/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import os
import signal
import datetime
import random
import pathlib as pl
import argparse
import socket
from block_parser import Block_Parser
from typing import Tuple, List, Dict


### JOB ###
class Job(object):

def __init__(self, requests, job_id, subtask_id, depends_on, user):
def __init__(self, requests: Dict, job_id: int, subtask_id: int, depends_on: List[int], user: str):
# requests information (name, requested resources, subtasks, script)
self.requests = requests
# job name
Expand All @@ -39,7 +40,7 @@ def __init__(self, requests, job_id, subtask_id, depends_on, user):
### EXECUTABLE JOB ###
class Executable_Job(Job):

def __init__(self, requests, job_id, subtask_id, depends_on, user, server_address):
def __init__(self, requests: Dict, job_id: int, subtask_id: int, depends_on: List[int], user: str, server_address: Tuple[str,int]):
super( Executable_Job, self ).__init__(requests, job_id, subtask_id, depends_on, user)
self.tmp_script_file = None
self.is_running = False
Expand All @@ -53,7 +54,7 @@ def __init__(self, requests, job_id, subtask_id, depends_on, user, server_addres
self.qlog = None


def write_log(self, msg):
def write_log(self, msg: str):
if self.qlog == None:
print(msg)
else:
Expand Down Expand Up @@ -101,10 +102,10 @@ def finalize(self):
now = datetime.datetime.now()
elapsed = now - self.time
self.write_log('----------------- end -----------------')
self.write_log('%s' % (now.strftime('%a %b %d %Y %H:%M:%S')) )
self.write_log('elapsed time: %s:%s:%s' % ( str(elapsed.days * 24 + (elapsed.seconds / 3600)).zfill(3),
str((elapsed.seconds / 60) % 60).zfill(2),
str(elapsed.seconds % 60).zfill(2) ))
self.write_log('%s' % (now.strftime('%a %b %d %Y %H:%M:%S')))
self.write_log('elapsed time: %s:%s:%s' % (str(elapsed.days * 24 + (elapsed.seconds // 3600)).zfill(3),
str((elapsed.seconds // 60) % 60).zfill(2),
str(elapsed.seconds % 60).zfill(2)))
self.write_log('---------------------------------------')
if not self.qlog == None:
self.qlog.close()
Expand Down Expand Up @@ -153,7 +154,7 @@ def run(self, script_params = []):
self.finalize()


def wait_for_message(self, pid):
def wait_for_message(self, pid: int):
while True:
connection, from_address = self.listener.accept()
received = connection.recv(1024).decode()
Expand Down Expand Up @@ -220,7 +221,7 @@ def main():
arg_parser.add_argument('--user', type=str, default='dummy', help='user who submitted the job')
args = arg_parser.parse_args()

script = args.script[0]
script = pl.Path(args.script[0])
script_params = args.script[1:]
server_address = (args.server_ip, args.server_port)
if args.depends_on == '':
Expand Down
Loading