Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 182 additions & 14 deletions jobq/jobq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time
import types
from collections import deque

from pykit import threadutil

Expand All @@ -22,6 +23,10 @@ class Finish(object):
pass


class NotMatch(object):
pass


class JobWorkerError(Exception):
pass

Expand All @@ -35,16 +40,19 @@ class WorkerGroup(object):
def __init__(self, index, worker, n_thread,
input_queue,
dispatcher,
probe, keep_order):
probe, keep_order,
partial_order,
expire):

self.index = index
self.worker = worker
self.n_thread = n_thread
self.input_queue = input_queue
self.output_queue = _make_q()
self.dispatcher = dispatcher
self.probe = probe
self.keep_order = keep_order
self.partial_order = partial_order
self.expire = expire
self.in_working = {}

self.threads = {}

Expand All @@ -61,11 +69,23 @@ def __init__(self, index, worker, n_thread,
# protect reading/writing worker_group info
self.worker_group_lock = threading.RLock()

need_coordinator_partial = False
need_coordinator = False
if self.partial_order:
need_coordinator_partial = True
# dispatcher mode implies keep_order
if self.dispatcher is not None or self.keep_order:
elif self.dispatcher is not None or self.keep_order:
need_coordinator = True

if need_coordinator_partial:
self.partial_queue = input_queue
self.input_queue = _make_q()
self.output_queue = _make_q(1024, partial_order)
self.dispatcher = None
else:
need_coordinator = False
self.partial_queue = None
self.input_queue = input_queue
self.output_queue = _make_q()

# Coordinator guarantees worker-group level first-in first-out, by put
# output-queue into a queue-of-output-queue.
Expand All @@ -76,11 +96,15 @@ def __init__(self, index, worker, n_thread,
# Protect input.get() and ouput.put(), only used by non-dispatcher
# mode
self.keep_order_lock = threading.RLock()

self.coordinator_thread = start_thread(self._coordinate)
else:
self.queue_of_output_q = None

if need_coordinator:
self.coordinator_thread = start_thread(self._coordinate)
elif need_coordinator_partial:
self.partial_order_lock = threading.RLock()
self.coordinator_partial_thread = start_thread(self._coordinate_partial)

# `dispatcher` is a user-defined function to distribute args to workers.
# It accepts the same args passed to worker and returns a number to
# indicate which worker to used.
Expand Down Expand Up @@ -162,6 +186,13 @@ def _exec(self, input_q, output_q, thread_index):

_put_rst(output_q, rst)

if self.partial_order and args is not None:
k = str(args[0])
with self.partial_order_lock:
if k in self.in_working:
del self.in_working[k]
self.partial_queue.not_match.set()

def _exec_in_order(self, input_q, output_q, thread_index):

while self.running:
Expand Down Expand Up @@ -226,18 +257,53 @@ def _dispatch(self):
self.queue_of_output_q.put(outq)
inq.put(args)

def _coordinate_partial(self):

def check_expired():
while self.running:
now = time.time()
with self.partial_order_lock:
for k in self.in_working.keys():
if now - self.in_working[k] > self.expire:
del self.in_working[k]
self.partial_queue.not_match.set()
time.sleep(0.1)

start_thread(check_expired)

while self.running:
args = self.partial_queue.get(exclude=self.in_working)
if args is Finish:
return
elif args is NotMatch:
self.partial_queue.not_match.wait()
self.partial_queue.not_match.clear()
else:
if args is not None:
with self.partial_order_lock:
self.in_working[str(args[0])] = time.time()

_put_rst(self.input_queue, args)

class JobManager(object):

def __init__(self, workers, queue_size=1024, probe=None, keep_order=False):
def __init__(self, workers, queue_size=1024, expire=None, probe=None, keep_order=False, partial_order=False):

if expire is None:
expire = 60 * 5

if probe is None:
probe = {}

if keep_order and partial_order:
raise JobWorkerError('only one of keep_order and partial_order can be set to true')

self.workers = workers
self.head_queue = _make_q(queue_size)
self.head_queue = _make_q(queue_size, partial_order)
self.probe = probe
self.keep_order = keep_order
self.partial_order = partial_order
self.expire = expire

self.worker_groups = []

Expand Down Expand Up @@ -267,7 +333,8 @@ def make_worker_groups(self):

wg = WorkerGroup(i, worker, n, inq,
dispatcher,
self.probe, self.keep_order)
self.probe, self.keep_order, self.partial_order,
self.expire)

self.worker_groups.append(wg)
inq = wg.output_queue
Expand Down Expand Up @@ -349,6 +416,11 @@ def join(self, timeout=None):
wg.exiting = True
ths = wg.threads.values()

if wg.partial_order:
for th in ths:
wg.partial_queue.put(Finish)
wg.coordinator_partial_thread.join(endtime - time.time())

if wg.dispatcher is None:
# put nr = len(threads) Finish
for th in ths:
Expand All @@ -375,9 +447,102 @@ def stat(self):
return stat(self.probe)


def run(input_it, workers, keep_order=False, timeout=None, probe=None):
class PartialOrderQueue(object):

def __init__(self, maxsize):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.not_match = threading.Event()

def put(self, item, block=True, timeout=None):
if item not in [Finish, None] and (type(item) not in [tuple, list] or len(item) == 0):
raise Exception('element type must be tuple or list and not empty')

with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Queue.Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.monotonic() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise Queue.Full
self.not_full.wait(remaining)
self._put(item)
self.not_empty.notify()
self.not_match.set()

def get(self, block=True, timeout=None, exclude=None):
with self.not_empty:
if not block:
if not self._qsize():
raise Queue.Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.monotonic() + timeout
while not self._qsize():
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise Queue.Empty
self.not_empty.wait(remaining)
item = self._get(exclude)
if item is not NotMatch:
self.not_full.notify()
return item

def _init(self, maxsize):
self.queue = deque()

def _qsize(self):
return len(self.queue)

def _put(self, item):
self.queue.append(item)

def _get(self, exclude):
item = NotMatch
index = None
selected = {}

for i, v in enumerate(self.queue):
if v is Finish:
if i == 0:
index, item = i, v
break
elif v is None:
index, item = i, v
break
else:
key = str(v[0])
if key in selected:
continue
if not exclude or key not in exclude:
index, item = i, v
break
selected[key] = True

if index is not None:
del self.queue[index]
return item


mgr = JobManager(workers, probe=probe, keep_order=keep_order)
def run(input_it, workers, keep_order=False, partial_order=False, timeout=None, probe=None):

mgr = JobManager(workers, probe=probe, keep_order=keep_order, partial_order=partial_order)

try:
for args in input_it:
Expand Down Expand Up @@ -446,8 +611,11 @@ def _put_non_empty(q, val):
q.put(val)


def _make_q(n=1024):
return Queue.Queue(n)
def _make_q(n=1024, partial_order=False):
if partial_order:
return PartialOrderQueue(n)
else:
return Queue.Queue(n)


def start_thread(exec_func, *args):
Expand Down
2 changes: 1 addition & 1 deletion jobq/t.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/sh

python2 -m unittest -v discover
python2 -m unittest discover -v

# python2 -m unittest -v test.test_jobq
# python2 -m unittest -v test.test_jobq.TestProbe
Loading