From 59d4fccbb644426b6484de55b06bbb2d6c8d2dfb Mon Sep 17 00:00:00 2001 From: lzle Date: Fri, 15 Apr 2022 18:41:01 +0800 Subject: [PATCH 1/6] jobq support partial order --- jobq/jobq.py | 119 ++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 98 insertions(+), 21 deletions(-) diff --git a/jobq/jobq.py b/jobq/jobq.py index 2e808608..8c67a1ff 100755 --- a/jobq/jobq.py +++ b/jobq/jobq.py @@ -1,3 +1,4 @@ +import copy import logging import sys import threading @@ -35,16 +36,17 @@ class WorkerGroup(object): def __init__(self, index, worker, n_thread, input_queue, dispatcher, - probe, keep_order): + probe, keep_order, + partial_order): self.index = index self.worker = worker self.n_thread = n_thread - self.input_queue = input_queue - self.output_queue = _make_q() self.dispatcher = dispatcher self.probe = probe self.keep_order = keep_order + self.partial_order = partial_order + self.exec_cache = {} self.threads = {} @@ -61,6 +63,15 @@ def __init__(self, index, worker, n_thread, # protect reading/writing worker_group info self.worker_group_lock = threading.RLock() + if self.partial_order: + self.buffer_queue = input_queue + self.input_queue = _make_q() + self.output_queue = _make_q(0, partial_order) + else: + self.buffer_queue = None + self.input_queue = input_queue + self.output_queue = _make_q() + # dispatcher mode implies keep_order if self.dispatcher is not None or self.keep_order: need_coordinator = True @@ -76,15 +87,16 @@ def __init__(self, index, worker, n_thread, # Protect input.get() and ouput.put(), only used by non-dispatcher # mode self.keep_order_lock = threading.RLock() - - self.coordinator_thread = start_thread(self._coordinate) else: self.queue_of_output_q = None + if need_coordinator or self.partial_order: + self.coordinator_thread = start_thread(self._coordinate) + # `dispatcher` is a user-defined function to distribute args to workers. # It accepts the same args passed to worker and returns a number to # indicate which worker to used. - if self.dispatcher is not None: + if self.dispatcher is not None and not self.partial_order: self.dispatch_queues = [] for i in range(self.n_thread): self.dispatch_queues.append({ @@ -157,6 +169,9 @@ def _exec(self, input_q, output_q, thread_index): with self.probe['probe_lock']: self.probe['out'] += 1 + if self.partial_order and args[0] in self.exec_cache: + del self.exec_cache[args[0]] + # If rst is an iterator, it procures more than one args to next job. # In order to be accurate, we only count an iterator as one. @@ -202,15 +217,28 @@ def _exec_in_order(self, input_q, output_q, thread_index): def _coordinate(self): while self.running: + if not self.partial_order: + outq = self.queue_of_output_q.get() + if outq is Finish: + return - outq = self.queue_of_output_q.get() - if outq is Finish: - return + _put_rst(self.output_queue, outq.get()) - _put_rst(self.output_queue, outq.get()) + else: + now = time.time() + for key in self.exec_cache.keys(): + if now - self.exec_cache[key] > 60 * 2: + del self.exec_cache[key] + exec_cache = copy.copy(self.exec_cache) + for args in self.buffer_queue.get(exec_cache): + if args is Finish: + return + self.exec_cache[args[0]] = time.time() + _put_rst(self.input_queue, args) + + time.sleep(0.01) def _dispatch(self): - while self.running: args = self.input_queue.get() @@ -226,18 +254,21 @@ def _dispatch(self): self.queue_of_output_q.put(outq) inq.put(args) - class JobManager(object): - def __init__(self, workers, queue_size=1024, probe=None, keep_order=False): + def __init__(self, workers, queue_size=1024, probe=None, keep_order=False, partial_order=False): if probe is None: probe = {} + if keep_order and partial_order: + raise JobWorkerError('only one of keep_order and partial_order can be set to true') + self.workers = workers - self.head_queue = _make_q(queue_size) + self.head_queue = _make_q(queue_size, partial_order) self.probe = probe self.keep_order = keep_order + self.partial_order = partial_order self.worker_groups = [] @@ -266,11 +297,12 @@ def make_worker_groups(self): worker, n, dispatcher = worker wg = WorkerGroup(i, worker, n, inq, - dispatcher, - self.probe, self.keep_order) + dispatcher, + self.probe, self.keep_order, self.partial_order) self.worker_groups.append(wg) inq = wg.output_queue + # print(inq) def set_thread_num(self, worker, n): @@ -349,6 +381,11 @@ def join(self, timeout=None): wg.exiting = True ths = wg.threads.values() + if wg.partial_order: + for th in ths: + wg.buffer_queue.put(Finish) + wg.coordinator_thread.join(endtime - time.time()) + if wg.dispatcher is None: # put nr = len(threads) Finish for th in ths: @@ -375,9 +412,47 @@ def stat(self): return stat(self.probe) -def run(input_it, workers, keep_order=False, timeout=None, probe=None): +class PartialOrderQueue(object): + + def __init__(self): + self.buffer = [] + self.lock = threading.Lock() + + def put(self, args): + + if args is None: + return - mgr = JobManager(workers, probe=probe, keep_order=keep_order) + if not Finish and (type(args) not in [tuple, list] or len(args) == 0): + raise Exception('element type must be tuple or list and not empty') + + with self.lock: + self.buffer.append(args) + + def get(self, exec_cache): + ret = [] + selected = [] + with self.lock: + for args in self.buffer: + if args is Finish: + if len(self.buffer) == 1: + ret.append(args) + else: + key = args[0] + if key not in selected and key not in exec_cache: + selected.append(key) + ret.append(args) + + for args in ret: + self.buffer.remove(args) + + for args in ret: + yield args + + +def run(input_it, workers, keep_order=False, partial_order=False, timeout=None, probe=None): + + mgr = JobManager(workers, probe=probe, keep_order=keep_order, partial_order=partial_order) try: for args in input_it: @@ -386,7 +461,6 @@ def run(input_it, workers, keep_order=False, timeout=None, probe=None): finally: mgr.join(timeout=timeout) - def stat(probe): with probe['probe_lock']: @@ -446,8 +520,11 @@ def _put_non_empty(q, val): q.put(val) -def _make_q(n=1024): - return Queue.Queue(n) +def _make_q(n=1024, partial_order=False): + if partial_order: + return PartialOrderQueue() + else: + return Queue.Queue(n) def start_thread(exec_func, *args): From 67fc476b1140dbeb6110e9faaa89add6c126eaae Mon Sep 17 00:00:00 2001 From: lzle Date: Wed, 27 Apr 2022 16:40:02 +0800 Subject: [PATCH 2/6] optimize code --- jobq/jobq.py | 188 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 122 insertions(+), 66 deletions(-) diff --git a/jobq/jobq.py b/jobq/jobq.py index 8c67a1ff..ebe4d066 100755 --- a/jobq/jobq.py +++ b/jobq/jobq.py @@ -46,7 +46,7 @@ def __init__(self, index, worker, n_thread, self.probe = probe self.keep_order = keep_order self.partial_order = partial_order - self.exec_cache = {} + self.in_working = {} self.threads = {} @@ -63,21 +63,24 @@ def __init__(self, index, worker, n_thread, # protect reading/writing worker_group info self.worker_group_lock = threading.RLock() + need_handle_buffer = False + need_coordinator = False if self.partial_order: + need_handle_buffer = True + # dispatcher mode implies keep_order + elif self.dispatcher is not None or self.keep_order: + need_coordinator = True + + if need_handle_buffer: self.buffer_queue = input_queue self.input_queue = _make_q() - self.output_queue = _make_q(0, partial_order) + self.output_queue = _make_q(1024, partial_order) + self.dispatcher = None else: self.buffer_queue = None self.input_queue = input_queue self.output_queue = _make_q() - # dispatcher mode implies keep_order - if self.dispatcher is not None or self.keep_order: - need_coordinator = True - else: - need_coordinator = False - # Coordinator guarantees worker-group level first-in first-out, by put # output-queue into a queue-of-output-queue. if need_coordinator: @@ -90,13 +93,16 @@ def __init__(self, index, worker, n_thread, else: self.queue_of_output_q = None - if need_coordinator or self.partial_order: + if need_coordinator: self.coordinator_thread = start_thread(self._coordinate) + elif need_handle_buffer: + self.buffer_lock = threading.RLock() + self.buffer_thread = start_thread(self._handle_buffer) # `dispatcher` is a user-defined function to distribute args to workers. # It accepts the same args passed to worker and returns a number to # indicate which worker to used. - if self.dispatcher is not None and not self.partial_order: + if self.dispatcher is not None: self.dispatch_queues = [] for i in range(self.n_thread): self.dispatch_queues.append({ @@ -169,8 +175,11 @@ def _exec(self, input_q, output_q, thread_index): with self.probe['probe_lock']: self.probe['out'] += 1 - if self.partial_order and args[0] in self.exec_cache: - del self.exec_cache[args[0]] + if self.partial_order: + k = str(args[0]) + with self.buffer_lock: + if k in self.in_working: + del self.in_working[k] # If rst is an iterator, it procures more than one args to next job. # In order to be accurate, we only count an iterator as one. @@ -217,26 +226,12 @@ def _exec_in_order(self, input_q, output_q, thread_index): def _coordinate(self): while self.running: - if not self.partial_order: - outq = self.queue_of_output_q.get() - if outq is Finish: - return - _put_rst(self.output_queue, outq.get()) + outq = self.queue_of_output_q.get() + if outq is Finish: + return - else: - now = time.time() - for key in self.exec_cache.keys(): - if now - self.exec_cache[key] > 60 * 2: - del self.exec_cache[key] - exec_cache = copy.copy(self.exec_cache) - for args in self.buffer_queue.get(exec_cache): - if args is Finish: - return - self.exec_cache[args[0]] = time.time() - _put_rst(self.input_queue, args) - - time.sleep(0.01) + _put_rst(self.output_queue, outq.get()) def _dispatch(self): while self.running: @@ -254,6 +249,24 @@ def _dispatch(self): self.queue_of_output_q.put(outq) inq.put(args) + def _handle_buffer(self): + + while self.running: + now = time.time() + with self.buffer_lock: + for k in self.in_working.keys(): + if now - self.in_working[k] > 60 * 2: + del self.in_working[k] + exclude = copy.copy(self.in_working) + args = self.buffer_queue.get(exclude=exclude) + if args is Finish: + return + elif args is None: + time.sleep(0.001) + else: + self.in_working[str(args[0])] = time.time() + _put_rst(self.input_queue, args) + class JobManager(object): def __init__(self, workers, queue_size=1024, probe=None, keep_order=False, partial_order=False): @@ -297,12 +310,11 @@ def make_worker_groups(self): worker, n, dispatcher = worker wg = WorkerGroup(i, worker, n, inq, - dispatcher, - self.probe, self.keep_order, self.partial_order) + dispatcher, + self.probe, self.keep_order, self.partial_order) self.worker_groups.append(wg) inq = wg.output_queue - # print(inq) def set_thread_num(self, worker, n): @@ -384,7 +396,7 @@ def join(self, timeout=None): if wg.partial_order: for th in ths: wg.buffer_queue.put(Finish) - wg.coordinator_thread.join(endtime - time.time()) + wg.buffer_thread.join(endtime - time.time()) if wg.dispatcher is None: # put nr = len(threads) Finish @@ -414,41 +426,84 @@ def stat(self): class PartialOrderQueue(object): - def __init__(self): - self.buffer = [] - self.lock = threading.Lock() - - def put(self, args): - - if args is None: - return + def __init__(self, maxsize): + self.maxsize = maxsize + self.queue = [] + self.mutex = threading.Lock() + self.not_empty = threading.Condition(self.mutex) + self.not_full = threading.Condition(self.mutex) - if not Finish and (type(args) not in [tuple, list] or len(args) == 0): + def put(self, item, block=True, timeout=None): + if item not in [Finish, None] and (type(item) not in [tuple, list] or len(item) == 0): raise Exception('element type must be tuple or list and not empty') - with self.lock: - self.buffer.append(args) - - def get(self, exec_cache): - ret = [] - selected = [] - with self.lock: - for args in self.buffer: - if args is Finish: - if len(self.buffer) == 1: - ret.append(args) + with self.not_full: + if self.maxsize > 0: + if not block: + if self._qsize() >= self.maxsize: + raise Queue.Full + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") else: - key = args[0] - if key not in selected and key not in exec_cache: - selected.append(key) - ret.append(args) - - for args in ret: - self.buffer.remove(args) - - for args in ret: - yield args - + endtime = time.monotonic() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - time.monotonic() + if remaining <= 0.0: + raise Queue.Full + self.not_full.wait(remaining) + self._put(item) + self.not_empty.notify() + + def get(self, block=True, timeout=None, exclude=None): + with self.not_empty: + if not block: + if not self._qsize(): + raise Queue.Empty + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time.monotonic() + timeout + while not self._qsize(): + remaining = endtime - time.monotonic() + if remaining <= 0.0: + raise Queue.Empty + self.not_empty.wait(remaining) + item = self._get(exclude) + if item is not None: + self.not_full.notify() + return item + + def _qsize(self): + return len(self.queue) + + def _put(self, item): + self.queue.append(item) + + def _get(self, exclude): + item = None + index = None + for i, v in enumerate(self.queue): + if v is Finish: + if i == 0: + index, item = i, v + break + elif v is None: + index, item = i, v + break + else: + key = str(v[0]) + if not exclude or key not in exclude: + index, item = i, v + break + if index is not None: + del self.queue[index] + return item def run(input_it, workers, keep_order=False, partial_order=False, timeout=None, probe=None): @@ -461,6 +516,7 @@ def run(input_it, workers, keep_order=False, partial_order=False, timeout=None, finally: mgr.join(timeout=timeout) + def stat(probe): with probe['probe_lock']: @@ -522,7 +578,7 @@ def _put_non_empty(q, val): def _make_q(n=1024, partial_order=False): if partial_order: - return PartialOrderQueue() + return PartialOrderQueue(n) else: return Queue.Queue(n) From a15944c7bad5679509a02656a267a185014a4d7f Mon Sep 17 00:00:00 2001 From: lzle Date: Sat, 7 May 2022 17:06:06 +0800 Subject: [PATCH 3/6] imporve performace --- jobq/jobq.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/jobq/jobq.py b/jobq/jobq.py index ebe4d066..bd7045ec 100755 --- a/jobq/jobq.py +++ b/jobq/jobq.py @@ -1,4 +1,3 @@ -import copy import logging import sys import threading @@ -180,6 +179,7 @@ def _exec(self, input_q, output_q, thread_index): with self.buffer_lock: if k in self.in_working: del self.in_working[k] + self.buffer_queue.not_match.set() # If rst is an iterator, it procures more than one args to next job. # In order to be accurate, we only count an iterator as one. @@ -234,6 +234,7 @@ def _coordinate(self): _put_rst(self.output_queue, outq.get()) def _dispatch(self): + while self.running: args = self.input_queue.get() @@ -251,20 +252,27 @@ def _dispatch(self): def _handle_buffer(self): + def check_expired(): + while self.running: + now = time.time() + with self.buffer_lock: + for k in self.in_working.keys(): + if now - self.in_working[k] > 60 * 2: + del self.in_working[k] + self.buffer_queue.not_match.set() + time.sleep(0.1) + + start_thread(check_expired) + while self.running: - now = time.time() - with self.buffer_lock: - for k in self.in_working.keys(): - if now - self.in_working[k] > 60 * 2: - del self.in_working[k] - exclude = copy.copy(self.in_working) - args = self.buffer_queue.get(exclude=exclude) + args = self.buffer_queue.get(exclude=self.in_working) if args is Finish: return elif args is None: - time.sleep(0.001) + self.buffer_queue.not_match.wait() else: - self.in_working[str(args[0])] = time.time() + with self.buffer_lock: + self.in_working[str(args[0])] = time.time() _put_rst(self.input_queue, args) class JobManager(object): @@ -432,6 +440,7 @@ def __init__(self, maxsize): self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) + self.not_match = threading.Event() def put(self, item, block=True, timeout=None): if item not in [Finish, None] and (type(item) not in [tuple, list] or len(item) == 0): @@ -456,6 +465,7 @@ def put(self, item, block=True, timeout=None): self.not_full.wait(remaining) self._put(item) self.not_empty.notify() + self.not_match.set() def get(self, block=True, timeout=None, exclude=None): with self.not_empty: @@ -477,6 +487,8 @@ def get(self, block=True, timeout=None, exclude=None): item = self._get(exclude) if item is not None: self.not_full.notify() + else: + self.not_match.clear() return item def _qsize(self): From 60ba84d5800db119a2ddc0c93a01ea945a45b103 Mon Sep 17 00:00:00 2001 From: lzle Date: Wed, 11 May 2022 10:37:36 +0800 Subject: [PATCH 4/6] add argument expriation --- jobq/jobq.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/jobq/jobq.py b/jobq/jobq.py index bd7045ec..9b1792f9 100755 --- a/jobq/jobq.py +++ b/jobq/jobq.py @@ -36,7 +36,8 @@ def __init__(self, index, worker, n_thread, input_queue, dispatcher, probe, keep_order, - partial_order): + partial_order, + expire): self.index = index self.worker = worker @@ -45,6 +46,7 @@ def __init__(self, index, worker, n_thread, self.probe = probe self.keep_order = keep_order self.partial_order = partial_order + self.expire = expire self.in_working = {} self.threads = {} @@ -257,7 +259,7 @@ def check_expired(): now = time.time() with self.buffer_lock: for k in self.in_working.keys(): - if now - self.in_working[k] > 60 * 2: + if now - self.in_working[k] > self.expire: del self.in_working[k] self.buffer_queue.not_match.set() time.sleep(0.1) @@ -277,7 +279,7 @@ def check_expired(): class JobManager(object): - def __init__(self, workers, queue_size=1024, probe=None, keep_order=False, partial_order=False): + def __init__(self, workers, queue_size=1024, expire=3000, probe=None, keep_order=False, partial_order=False): if probe is None: probe = {} @@ -290,6 +292,7 @@ def __init__(self, workers, queue_size=1024, probe=None, keep_order=False, parti self.probe = probe self.keep_order = keep_order self.partial_order = partial_order + self.expire = expire self.worker_groups = [] @@ -319,7 +322,8 @@ def make_worker_groups(self): wg = WorkerGroup(i, worker, n, inq, dispatcher, - self.probe, self.keep_order, self.partial_order) + self.probe, self.keep_order, self.partial_order, + self.expire) self.worker_groups.append(wg) inq = wg.output_queue From 13955b918260a42978b05abd78caa1ec007d6eaf Mon Sep 17 00:00:00 2001 From: lzle Date: Sat, 28 May 2022 19:52:31 +0800 Subject: [PATCH 5/6] add unittest for partial order --- jobq/jobq.py | 42 +++++++---- jobq/t.sh | 2 +- jobq/test/test_jobq.py | 159 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 182 insertions(+), 21 deletions(-) diff --git a/jobq/jobq.py b/jobq/jobq.py index 9b1792f9..c3ee6380 100755 --- a/jobq/jobq.py +++ b/jobq/jobq.py @@ -22,6 +22,10 @@ class Finish(object): pass +class NotMatch(object): + pass + + class JobWorkerError(Exception): pass @@ -176,18 +180,18 @@ def _exec(self, input_q, output_q, thread_index): with self.probe['probe_lock']: self.probe['out'] += 1 - if self.partial_order: + # If rst is an iterator, it procures more than one args to next job. + # In order to be accurate, we only count an iterator as one. + + _put_rst(output_q, rst) + + if self.partial_order and args is not None: k = str(args[0]) with self.buffer_lock: if k in self.in_working: del self.in_working[k] self.buffer_queue.not_match.set() - # If rst is an iterator, it procures more than one args to next job. - # In order to be accurate, we only count an iterator as one. - - _put_rst(output_q, rst) - def _exec_in_order(self, input_q, output_q, thread_index): while self.running: @@ -270,16 +274,22 @@ def check_expired(): args = self.buffer_queue.get(exclude=self.in_working) if args is Finish: return - elif args is None: + elif args is NotMatch: self.buffer_queue.not_match.wait() + self.buffer_queue.not_match.clear() else: - with self.buffer_lock: - self.in_working[str(args[0])] = time.time() + if args is not None: + with self.buffer_lock: + self.in_working[str(args[0])] = time.time() + _put_rst(self.input_queue, args) class JobManager(object): - def __init__(self, workers, queue_size=1024, expire=3000, probe=None, keep_order=False, partial_order=False): + def __init__(self, workers, queue_size=1024, expire=None, probe=None, keep_order=False, partial_order=False): + + if expire is None: + expire = 60 * 5 if probe is None: probe = {} @@ -489,10 +499,8 @@ def get(self, block=True, timeout=None, exclude=None): raise Queue.Empty self.not_empty.wait(remaining) item = self._get(exclude) - if item is not None: + if item is not NotMatch: self.not_full.notify() - else: - self.not_match.clear() return item def _qsize(self): @@ -502,8 +510,10 @@ def _put(self, item): self.queue.append(item) def _get(self, exclude): - item = None + item = NotMatch index = None + selected = {} + for i, v in enumerate(self.queue): if v is Finish: if i == 0: @@ -514,9 +524,13 @@ def _get(self, exclude): break else: key = str(v[0]) + if key in selected: + continue if not exclude or key not in exclude: index, item = i, v break + selected[key] = True + if index is not None: del self.queue[index] return item diff --git a/jobq/t.sh b/jobq/t.sh index 0c18a7f7..7b6c31d2 100755 --- a/jobq/t.sh +++ b/jobq/t.sh @@ -1,6 +1,6 @@ #!/bin/sh -python2 -m unittest -v discover +python2 -m unittest discover -v # python2 -m unittest -v test.test_jobq # python2 -m unittest -v test.test_jobq.TestProbe diff --git a/jobq/test/test_jobq.py b/jobq/test/test_jobq.py index da9e57f5..e1512d9a 100755 --- a/jobq/test/test_jobq.py +++ b/jobq/test/test_jobq.py @@ -36,9 +36,25 @@ def sleep_5(args): return args +def is_partial_order(rst): + cache = {} + + for item in rst: + k = item[0] + if k not in cache: + cache[k] = item + + last = cache[k] + if last[1] > item[1]: + return False + cache[k] = item + + return True + + class TestProbe(unittest.TestCase): - def _start_jobq_in_thread(self, n_items, n_worker, keep_order=False): + def _start_jobq_in_thread(self, items, n_worker, keep_order=False, partial_order=False): def _sleep_1(args): time.sleep(0.1) @@ -48,11 +64,12 @@ def _nothing(args): return args probe = {} - th = threading.Thread(target=lambda: jobq.run(range(n_items), + th = threading.Thread(target=lambda: jobq.run(items, [(_sleep_1, n_worker), _nothing], probe=probe, keep_order=keep_order, + partial_order=partial_order, )) th.daemon = True th.start() @@ -67,13 +84,12 @@ def test_probe_single_thread(self): (0.2, 0, 'all done'), ) - th, probe = self._start_jobq_in_thread(3, 1) + th, probe = self._start_jobq_in_thread(range(3), 1) for sleep_time, doing, case_mes in cases: time.sleep(sleep_time) stat = jobq.stat(probe) - self.assertEqual(doing, stat['doing'], case_mes) # qsize() is not reliable. do not test the value of it. @@ -100,7 +116,7 @@ def test_probe_3_thread(self): (0.4, 0, 'all done'), ) - th, probe = self._start_jobq_in_thread(10, 3) + th, probe = self._start_jobq_in_thread(range(10), 3) for sleep_time, doing, case_mes in cases: @@ -124,7 +140,7 @@ def test_probe_3_thread_keep_order(self): (0.4, 0, 'all done'), ) - th, probe = self._start_jobq_in_thread(10, 3, keep_order=True) + th, probe = self._start_jobq_in_thread(range(10), 3, keep_order=True) for sleep_time, doing, case_mes in cases: @@ -140,6 +156,28 @@ def test_probe_3_thread_keep_order(self): th.join() + def test_probe_3_thread_partial_order(self): + cases = ( + (0.05, 3, '_sleep_1 is working on 1st 3 items'), + (0.1, 3, '_sleep_1 is working on 2nd 3 items'), + (0.4, 0, 'all done'), + ) + + th, probe = self._start_jobq_in_thread(([i, 0] for i in range(10)), 3, partial_order=True) + + for sleep_time, doing, case_mes in cases: + time.sleep(sleep_time) + stat = jobq.stat(probe) + + self.assertEqual(doing, stat['doing'], case_mes) + + # use the last stat + + workers = stat['workers'] + self.assertEqual(2, len(workers)) + + th.join() + class TestDispatcher(unittest.TestCase): @@ -264,6 +302,37 @@ def collect(args): self.assertEqual([0], rst) +class TestExpire(unittest.TestCase): + + def test_expire(self): + + def _sleep_1(args): + sleep_got.append(args) + time.sleep(0.5) + return args + + def collect(args): + rst.append(args) + + rst = [] + sleep_got = [] + + jm = jobq.JobManager([(_sleep_1, 3), collect], expire=0.2, partial_order=True) + + n = 6 + for i in range(n): + jm.put((i % 2, i)) + + time.sleep(0.1) + self.assertEqual(set([(0, 0), (1, 1)]), set(sleep_got)) + + time.sleep(0.3) + self.assertEqual(3, len(sleep_got)) + + jm.join() + self.assertEqual(set([(i % 2, i) for i in range(n)]), set(rst)) + + class TestJobManager(unittest.TestCase): def test_manager(self): @@ -434,6 +503,42 @@ def _change_thread_nr(): for th in ths: th.join() + def test_set_thread_num_partial_order(self): + + def _pass(args): + return args + + rst = [] + + jm = jobq.JobManager([_pass, rst.append], partial_order=True) + + setter = {'running': True} + + def _change_thread_nr(): + while setter['running']: + jm.set_thread_num(_pass, random.randint(1, 4)) + time.sleep(0.5) + + ths = [] + for ii in range(3): + th = threadutil.start_daemon_thread(_change_thread_nr) + ths.append(th) + + n = 10240 + for i in range(n): + jm.put([i % 2, i]) + + jm.join() + + self.assertEqual(n, len(rst)) + + self.assertTrue(is_partial_order(rst)) + + setter['running'] = False + + for th in ths: + th.join() + class TestJobQ(unittest.TestCase): @@ -506,6 +611,36 @@ def collect(args): jobq.run(inp, workers + [collect], keep_order=True) self.assertEqual(out, rst) + def add_list(args): + args[1] += 1 + return args + + def multi2_list(args): + args[1] *= 2 + return args + + def multi2_list_sleep(args): + time.sleep(0.02) + args[1] *= 2 + return args + + cases = ( + (list([i % 2, i] for i in range(100)), [add_list, (multi2_list_sleep, 10)], + list([i % 2, (i + 1) * 2] for i in range(100)) + ), + (list([i % 50, i] for i in range(1024 * 10)), [add_list, (multi2_list, 10)], + list([i % 50, (i + 1) * 2] for i in range(1024 * 10)) + ), + ) + for inp, workers, out in cases: + rst = [] + jobq.run(inp, workers + [collect], partial_order=True) + self.assertTrue(is_partial_order(rst)) + + out.sort() + rst.sort() + self.assertEqual(out, rst) + def test_generator(self): def gen(args): @@ -514,6 +649,7 @@ def gen(args): time.sleep(0.1) def collect(args): + time.sleep(random.uniform(0.005, 0.02)) rst.append(args) rst = [] @@ -528,6 +664,17 @@ def collect(args): self.assertEqual(9, len(rst), 'nr of elts') + def _gen(args): + k = args[0] + for i in range(3): + yield (k, i) + + rst = [] + jobq.run([(k, 0) for k in range(3)], [(_gen, 3), (collect, 3)], partial_order=True) + self.assertEqual(set([(k, v) for k in range(3) for v in range(3)]), set(rst), + "generator should get all") + self.assertTrue(is_partial_order(rst)) + class TestDefaultTimeout(unittest.TestCase): From 40aa1d499efe73e0aad381bf27c6a8056248d4d8 Mon Sep 17 00:00:00 2001 From: lzle Date: Wed, 1 Jun 2022 16:02:57 +0800 Subject: [PATCH 6/6] update --- jobq/jobq.py | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/jobq/jobq.py b/jobq/jobq.py index c3ee6380..b68034da 100755 --- a/jobq/jobq.py +++ b/jobq/jobq.py @@ -3,6 +3,7 @@ import threading import time import types +from collections import deque from pykit import threadutil @@ -68,21 +69,21 @@ def __init__(self, index, worker, n_thread, # protect reading/writing worker_group info self.worker_group_lock = threading.RLock() - need_handle_buffer = False + need_coordinator_partial = False need_coordinator = False if self.partial_order: - need_handle_buffer = True + need_coordinator_partial = True # dispatcher mode implies keep_order elif self.dispatcher is not None or self.keep_order: need_coordinator = True - if need_handle_buffer: - self.buffer_queue = input_queue + if need_coordinator_partial: + self.partial_queue = input_queue self.input_queue = _make_q() self.output_queue = _make_q(1024, partial_order) self.dispatcher = None else: - self.buffer_queue = None + self.partial_queue = None self.input_queue = input_queue self.output_queue = _make_q() @@ -100,9 +101,9 @@ def __init__(self, index, worker, n_thread, if need_coordinator: self.coordinator_thread = start_thread(self._coordinate) - elif need_handle_buffer: - self.buffer_lock = threading.RLock() - self.buffer_thread = start_thread(self._handle_buffer) + elif need_coordinator_partial: + self.partial_order_lock = threading.RLock() + self.coordinator_partial_thread = start_thread(self._coordinate_partial) # `dispatcher` is a user-defined function to distribute args to workers. # It accepts the same args passed to worker and returns a number to @@ -187,10 +188,10 @@ def _exec(self, input_q, output_q, thread_index): if self.partial_order and args is not None: k = str(args[0]) - with self.buffer_lock: + with self.partial_order_lock: if k in self.in_working: del self.in_working[k] - self.buffer_queue.not_match.set() + self.partial_queue.not_match.set() def _exec_in_order(self, input_q, output_q, thread_index): @@ -256,30 +257,30 @@ def _dispatch(self): self.queue_of_output_q.put(outq) inq.put(args) - def _handle_buffer(self): + def _coordinate_partial(self): def check_expired(): while self.running: now = time.time() - with self.buffer_lock: + with self.partial_order_lock: for k in self.in_working.keys(): if now - self.in_working[k] > self.expire: del self.in_working[k] - self.buffer_queue.not_match.set() + self.partial_queue.not_match.set() time.sleep(0.1) start_thread(check_expired) while self.running: - args = self.buffer_queue.get(exclude=self.in_working) + args = self.partial_queue.get(exclude=self.in_working) if args is Finish: return elif args is NotMatch: - self.buffer_queue.not_match.wait() - self.buffer_queue.not_match.clear() + self.partial_queue.not_match.wait() + self.partial_queue.not_match.clear() else: if args is not None: - with self.buffer_lock: + with self.partial_order_lock: self.in_working[str(args[0])] = time.time() _put_rst(self.input_queue, args) @@ -417,8 +418,8 @@ def join(self, timeout=None): if wg.partial_order: for th in ths: - wg.buffer_queue.put(Finish) - wg.buffer_thread.join(endtime - time.time()) + wg.partial_queue.put(Finish) + wg.coordinator_partial_thread.join(endtime - time.time()) if wg.dispatcher is None: # put nr = len(threads) Finish @@ -450,7 +451,7 @@ class PartialOrderQueue(object): def __init__(self, maxsize): self.maxsize = maxsize - self.queue = [] + self._init(maxsize) self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) @@ -503,6 +504,9 @@ def get(self, block=True, timeout=None, exclude=None): self.not_full.notify() return item + def _init(self, maxsize): + self.queue = deque() + def _qsize(self): return len(self.queue) @@ -535,6 +539,7 @@ def _get(self, exclude): del self.queue[index] return item + def run(input_it, workers, keep_order=False, partial_order=False, timeout=None, probe=None): mgr = JobManager(workers, probe=probe, keep_order=keep_order, partial_order=partial_order)