diff --git a/jobq/jobq.py b/jobq/jobq.py index 2e808608..b68034da 100755 --- a/jobq/jobq.py +++ b/jobq/jobq.py @@ -3,6 +3,7 @@ import threading import time import types +from collections import deque from pykit import threadutil @@ -22,6 +23,10 @@ class Finish(object): pass +class NotMatch(object): + pass + + class JobWorkerError(Exception): pass @@ -35,16 +40,19 @@ class WorkerGroup(object): def __init__(self, index, worker, n_thread, input_queue, dispatcher, - probe, keep_order): + probe, keep_order, + partial_order, + expire): self.index = index self.worker = worker self.n_thread = n_thread - self.input_queue = input_queue - self.output_queue = _make_q() self.dispatcher = dispatcher self.probe = probe self.keep_order = keep_order + self.partial_order = partial_order + self.expire = expire + self.in_working = {} self.threads = {} @@ -61,11 +69,23 @@ def __init__(self, index, worker, n_thread, # protect reading/writing worker_group info self.worker_group_lock = threading.RLock() + need_coordinator_partial = False + need_coordinator = False + if self.partial_order: + need_coordinator_partial = True # dispatcher mode implies keep_order - if self.dispatcher is not None or self.keep_order: + elif self.dispatcher is not None or self.keep_order: need_coordinator = True + + if need_coordinator_partial: + self.partial_queue = input_queue + self.input_queue = _make_q() + self.output_queue = _make_q(1024, partial_order) + self.dispatcher = None else: - need_coordinator = False + self.partial_queue = None + self.input_queue = input_queue + self.output_queue = _make_q() # Coordinator guarantees worker-group level first-in first-out, by put # output-queue into a queue-of-output-queue. @@ -76,11 +96,15 @@ def __init__(self, index, worker, n_thread, # Protect input.get() and ouput.put(), only used by non-dispatcher # mode self.keep_order_lock = threading.RLock() - - self.coordinator_thread = start_thread(self._coordinate) else: self.queue_of_output_q = None + if need_coordinator: + self.coordinator_thread = start_thread(self._coordinate) + elif need_coordinator_partial: + self.partial_order_lock = threading.RLock() + self.coordinator_partial_thread = start_thread(self._coordinate_partial) + # `dispatcher` is a user-defined function to distribute args to workers. # It accepts the same args passed to worker and returns a number to # indicate which worker to used. @@ -162,6 +186,13 @@ def _exec(self, input_q, output_q, thread_index): _put_rst(output_q, rst) + if self.partial_order and args is not None: + k = str(args[0]) + with self.partial_order_lock: + if k in self.in_working: + del self.in_working[k] + self.partial_queue.not_match.set() + def _exec_in_order(self, input_q, output_q, thread_index): while self.running: @@ -226,18 +257,53 @@ def _dispatch(self): self.queue_of_output_q.put(outq) inq.put(args) + def _coordinate_partial(self): + + def check_expired(): + while self.running: + now = time.time() + with self.partial_order_lock: + for k in self.in_working.keys(): + if now - self.in_working[k] > self.expire: + del self.in_working[k] + self.partial_queue.not_match.set() + time.sleep(0.1) + + start_thread(check_expired) + + while self.running: + args = self.partial_queue.get(exclude=self.in_working) + if args is Finish: + return + elif args is NotMatch: + self.partial_queue.not_match.wait() + self.partial_queue.not_match.clear() + else: + if args is not None: + with self.partial_order_lock: + self.in_working[str(args[0])] = time.time() + + _put_rst(self.input_queue, args) class JobManager(object): - def __init__(self, workers, queue_size=1024, probe=None, keep_order=False): + def __init__(self, workers, queue_size=1024, expire=None, probe=None, keep_order=False, partial_order=False): + + if expire is None: + expire = 60 * 5 if probe is None: probe = {} + if keep_order and partial_order: + raise JobWorkerError('only one of keep_order and partial_order can be set to true') + self.workers = workers - self.head_queue = _make_q(queue_size) + self.head_queue = _make_q(queue_size, partial_order) self.probe = probe self.keep_order = keep_order + self.partial_order = partial_order + self.expire = expire self.worker_groups = [] @@ -267,7 +333,8 @@ def make_worker_groups(self): wg = WorkerGroup(i, worker, n, inq, dispatcher, - self.probe, self.keep_order) + self.probe, self.keep_order, self.partial_order, + self.expire) self.worker_groups.append(wg) inq = wg.output_queue @@ -349,6 +416,11 @@ def join(self, timeout=None): wg.exiting = True ths = wg.threads.values() + if wg.partial_order: + for th in ths: + wg.partial_queue.put(Finish) + wg.coordinator_partial_thread.join(endtime - time.time()) + if wg.dispatcher is None: # put nr = len(threads) Finish for th in ths: @@ -375,9 +447,102 @@ def stat(self): return stat(self.probe) -def run(input_it, workers, keep_order=False, timeout=None, probe=None): +class PartialOrderQueue(object): + + def __init__(self, maxsize): + self.maxsize = maxsize + self._init(maxsize) + self.mutex = threading.Lock() + self.not_empty = threading.Condition(self.mutex) + self.not_full = threading.Condition(self.mutex) + self.not_match = threading.Event() + + def put(self, item, block=True, timeout=None): + if item not in [Finish, None] and (type(item) not in [tuple, list] or len(item) == 0): + raise Exception('element type must be tuple or list and not empty') + + with self.not_full: + if self.maxsize > 0: + if not block: + if self._qsize() >= self.maxsize: + raise Queue.Full + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time.monotonic() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - time.monotonic() + if remaining <= 0.0: + raise Queue.Full + self.not_full.wait(remaining) + self._put(item) + self.not_empty.notify() + self.not_match.set() + + def get(self, block=True, timeout=None, exclude=None): + with self.not_empty: + if not block: + if not self._qsize(): + raise Queue.Empty + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time.monotonic() + timeout + while not self._qsize(): + remaining = endtime - time.monotonic() + if remaining <= 0.0: + raise Queue.Empty + self.not_empty.wait(remaining) + item = self._get(exclude) + if item is not NotMatch: + self.not_full.notify() + return item + + def _init(self, maxsize): + self.queue = deque() + + def _qsize(self): + return len(self.queue) + + def _put(self, item): + self.queue.append(item) + + def _get(self, exclude): + item = NotMatch + index = None + selected = {} + + for i, v in enumerate(self.queue): + if v is Finish: + if i == 0: + index, item = i, v + break + elif v is None: + index, item = i, v + break + else: + key = str(v[0]) + if key in selected: + continue + if not exclude or key not in exclude: + index, item = i, v + break + selected[key] = True + + if index is not None: + del self.queue[index] + return item + - mgr = JobManager(workers, probe=probe, keep_order=keep_order) +def run(input_it, workers, keep_order=False, partial_order=False, timeout=None, probe=None): + + mgr = JobManager(workers, probe=probe, keep_order=keep_order, partial_order=partial_order) try: for args in input_it: @@ -446,8 +611,11 @@ def _put_non_empty(q, val): q.put(val) -def _make_q(n=1024): - return Queue.Queue(n) +def _make_q(n=1024, partial_order=False): + if partial_order: + return PartialOrderQueue(n) + else: + return Queue.Queue(n) def start_thread(exec_func, *args): diff --git a/jobq/t.sh b/jobq/t.sh index 0c18a7f7..7b6c31d2 100755 --- a/jobq/t.sh +++ b/jobq/t.sh @@ -1,6 +1,6 @@ #!/bin/sh -python2 -m unittest -v discover +python2 -m unittest discover -v # python2 -m unittest -v test.test_jobq # python2 -m unittest -v test.test_jobq.TestProbe diff --git a/jobq/test/test_jobq.py b/jobq/test/test_jobq.py index da9e57f5..e1512d9a 100755 --- a/jobq/test/test_jobq.py +++ b/jobq/test/test_jobq.py @@ -36,9 +36,25 @@ def sleep_5(args): return args +def is_partial_order(rst): + cache = {} + + for item in rst: + k = item[0] + if k not in cache: + cache[k] = item + + last = cache[k] + if last[1] > item[1]: + return False + cache[k] = item + + return True + + class TestProbe(unittest.TestCase): - def _start_jobq_in_thread(self, n_items, n_worker, keep_order=False): + def _start_jobq_in_thread(self, items, n_worker, keep_order=False, partial_order=False): def _sleep_1(args): time.sleep(0.1) @@ -48,11 +64,12 @@ def _nothing(args): return args probe = {} - th = threading.Thread(target=lambda: jobq.run(range(n_items), + th = threading.Thread(target=lambda: jobq.run(items, [(_sleep_1, n_worker), _nothing], probe=probe, keep_order=keep_order, + partial_order=partial_order, )) th.daemon = True th.start() @@ -67,13 +84,12 @@ def test_probe_single_thread(self): (0.2, 0, 'all done'), ) - th, probe = self._start_jobq_in_thread(3, 1) + th, probe = self._start_jobq_in_thread(range(3), 1) for sleep_time, doing, case_mes in cases: time.sleep(sleep_time) stat = jobq.stat(probe) - self.assertEqual(doing, stat['doing'], case_mes) # qsize() is not reliable. do not test the value of it. @@ -100,7 +116,7 @@ def test_probe_3_thread(self): (0.4, 0, 'all done'), ) - th, probe = self._start_jobq_in_thread(10, 3) + th, probe = self._start_jobq_in_thread(range(10), 3) for sleep_time, doing, case_mes in cases: @@ -124,7 +140,7 @@ def test_probe_3_thread_keep_order(self): (0.4, 0, 'all done'), ) - th, probe = self._start_jobq_in_thread(10, 3, keep_order=True) + th, probe = self._start_jobq_in_thread(range(10), 3, keep_order=True) for sleep_time, doing, case_mes in cases: @@ -140,6 +156,28 @@ def test_probe_3_thread_keep_order(self): th.join() + def test_probe_3_thread_partial_order(self): + cases = ( + (0.05, 3, '_sleep_1 is working on 1st 3 items'), + (0.1, 3, '_sleep_1 is working on 2nd 3 items'), + (0.4, 0, 'all done'), + ) + + th, probe = self._start_jobq_in_thread(([i, 0] for i in range(10)), 3, partial_order=True) + + for sleep_time, doing, case_mes in cases: + time.sleep(sleep_time) + stat = jobq.stat(probe) + + self.assertEqual(doing, stat['doing'], case_mes) + + # use the last stat + + workers = stat['workers'] + self.assertEqual(2, len(workers)) + + th.join() + class TestDispatcher(unittest.TestCase): @@ -264,6 +302,37 @@ def collect(args): self.assertEqual([0], rst) +class TestExpire(unittest.TestCase): + + def test_expire(self): + + def _sleep_1(args): + sleep_got.append(args) + time.sleep(0.5) + return args + + def collect(args): + rst.append(args) + + rst = [] + sleep_got = [] + + jm = jobq.JobManager([(_sleep_1, 3), collect], expire=0.2, partial_order=True) + + n = 6 + for i in range(n): + jm.put((i % 2, i)) + + time.sleep(0.1) + self.assertEqual(set([(0, 0), (1, 1)]), set(sleep_got)) + + time.sleep(0.3) + self.assertEqual(3, len(sleep_got)) + + jm.join() + self.assertEqual(set([(i % 2, i) for i in range(n)]), set(rst)) + + class TestJobManager(unittest.TestCase): def test_manager(self): @@ -434,6 +503,42 @@ def _change_thread_nr(): for th in ths: th.join() + def test_set_thread_num_partial_order(self): + + def _pass(args): + return args + + rst = [] + + jm = jobq.JobManager([_pass, rst.append], partial_order=True) + + setter = {'running': True} + + def _change_thread_nr(): + while setter['running']: + jm.set_thread_num(_pass, random.randint(1, 4)) + time.sleep(0.5) + + ths = [] + for ii in range(3): + th = threadutil.start_daemon_thread(_change_thread_nr) + ths.append(th) + + n = 10240 + for i in range(n): + jm.put([i % 2, i]) + + jm.join() + + self.assertEqual(n, len(rst)) + + self.assertTrue(is_partial_order(rst)) + + setter['running'] = False + + for th in ths: + th.join() + class TestJobQ(unittest.TestCase): @@ -506,6 +611,36 @@ def collect(args): jobq.run(inp, workers + [collect], keep_order=True) self.assertEqual(out, rst) + def add_list(args): + args[1] += 1 + return args + + def multi2_list(args): + args[1] *= 2 + return args + + def multi2_list_sleep(args): + time.sleep(0.02) + args[1] *= 2 + return args + + cases = ( + (list([i % 2, i] for i in range(100)), [add_list, (multi2_list_sleep, 10)], + list([i % 2, (i + 1) * 2] for i in range(100)) + ), + (list([i % 50, i] for i in range(1024 * 10)), [add_list, (multi2_list, 10)], + list([i % 50, (i + 1) * 2] for i in range(1024 * 10)) + ), + ) + for inp, workers, out in cases: + rst = [] + jobq.run(inp, workers + [collect], partial_order=True) + self.assertTrue(is_partial_order(rst)) + + out.sort() + rst.sort() + self.assertEqual(out, rst) + def test_generator(self): def gen(args): @@ -514,6 +649,7 @@ def gen(args): time.sleep(0.1) def collect(args): + time.sleep(random.uniform(0.005, 0.02)) rst.append(args) rst = [] @@ -528,6 +664,17 @@ def collect(args): self.assertEqual(9, len(rst), 'nr of elts') + def _gen(args): + k = args[0] + for i in range(3): + yield (k, i) + + rst = [] + jobq.run([(k, 0) for k in range(3)], [(_gen, 3), (collect, 3)], partial_order=True) + self.assertEqual(set([(k, v) for k in range(3) for v in range(3)]), set(rst), + "generator should get all") + self.assertTrue(is_partial_order(rst)) + class TestDefaultTimeout(unittest.TestCase):