From 9ced3929590f63a08546746662c0202a8e3c6d1f Mon Sep 17 00:00:00 2001 From: timbu Date: Fri, 9 Dec 2016 17:27:55 +0000 Subject: [PATCH 1/2] sketch of multiple pools --- nameko/constants.py | 2 ++ nameko/containers.py | 39 ++++++++++++++++++++++++++++++++++----- nameko/extensions.py | 4 ++++ nameko/rpc.py | 5 ++++- nameko/web/handlers.py | 3 ++- 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/nameko/constants.py b/nameko/constants.py index c504508f2..8132839fe 100644 --- a/nameko/constants.py +++ b/nameko/constants.py @@ -5,6 +5,7 @@ HEARTBEAT_CONFIG_KEY = 'HEARTBEAT' MAX_WORKERS_CONFIG_KEY = 'max_workers' +MAX_WORKERS_PER_GROUP_CONFIG_KEY = 'MAX_WORKERS_PER_GROUP' PARENT_CALLS_CONFIG_KEY = 'parent_calls_tracked' DEFAULT_MAX_WORKERS = 10 @@ -12,6 +13,7 @@ DEFAULT_SERIALIZER = 'json' DEFAULT_RETRY_POLICY = {'max_retries': 3} DEFAULT_HEARTBEAT = 60 +DEFAULT_WORKER_GROUP = 'DEFAULT' CALL_ID_STACK_CONTEXT_KEY = 'call_id_stack' AUTH_TOKEN_CONTEXT_KEY = 'auth_token' diff --git a/nameko/containers.py b/nameko/containers.py index aad0db9ce..ad27a4047 100644 --- a/nameko/containers.py +++ b/nameko/containers.py @@ -5,7 +5,7 @@ import uuid import warnings from collections import deque -from logging import getLogger +from logging import getLogger, DEBUG import eventlet import six @@ -14,7 +14,8 @@ from greenlet import GreenletExit # pylint: disable=E0611 from nameko.constants import ( CALL_ID_STACK_CONTEXT_KEY, DEFAULT_MAX_WORKERS, - DEFAULT_PARENT_CALLS_TRACKED, DEFAULT_SERIALIZER, MAX_WORKERS_CONFIG_KEY, + DEFAULT_PARENT_CALLS_TRACKED, DEFAULT_SERIALIZER, DEFAULT_WORKER_GROUP, + MAX_WORKERS_CONFIG_KEY, MAX_WORKERS_PER_GROUP_CONFIG_KEY, PARENT_CALLS_CONFIG_KEY, SERIALIZER_CONFIG_KEY) from nameko.exceptions import ConfigurationError, ContainerBeingKilled from nameko.extensions import ( @@ -142,6 +143,9 @@ def __init__(self, service_cls, config, worker_ctx_cls=None): self.max_workers = ( config.get(MAX_WORKERS_CONFIG_KEY) or DEFAULT_MAX_WORKERS) + self.max_workers_per_group = config.get( + MAX_WORKERS_PER_GROUP_CONFIG_KEY) + self.serializer = config.get( SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER) @@ -157,6 +161,8 @@ def __init__(self, service_cls, config, worker_ctx_cls=None): self.dependencies.add(bound) self.subextensions.update(iter_extensions(bound)) + self._worker_groups = {DEFAULT_WORKER_GROUP} + for method_name, method in inspect.getmembers(service_cls, is_method): entrypoints = getattr(method, ENTRYPOINT_EXTENSIONS_ATTR, []) for entrypoint in entrypoints: @@ -165,7 +171,11 @@ def __init__(self, service_cls, config, worker_ctx_cls=None): self.subextensions.update(iter_extensions(bound)) self.started = False - self._worker_pool = GreenPool(size=self.max_workers) + + self._worker_pools = {} + for group in self._worker_groups: + size = self.max_workers_per_group.get(group) or self.max_workers + self._worker_pools[group] = GreenPool(size=size) self._worker_threads = {} self._managed_threads = {} @@ -238,7 +248,7 @@ def stop(self): # there might still be some running workers, which we have to # wait for to complete before we can stop dependencies - self._worker_pool.waitall() + self._wait_for_worker_pools() # it should be safe now to stop any dependency as there is no # active worker which could be using it @@ -320,6 +330,9 @@ def wait(self): """ return self._died.wait() + def register_worker_group(self, worker_group): + self._worker_groups.add(worker_group) + def spawn_worker(self, entrypoint, args, kwargs, context_data=None, handle_result=None): """ Spawn a worker thread for running the service method decorated @@ -345,12 +358,20 @@ def spawn_worker(self, entrypoint, args, kwargs, self, service, entrypoint, args, kwargs, data=context_data) _log.debug('spawning %s', worker_ctx) - gt = self._worker_pool.spawn( + worker_pool = self._get_worker_pool_for_entrypoint(entrypoint) + gt = worker_pool.spawn( self._run_worker, worker_ctx, handle_result ) gt.link(self._handle_worker_thread_exited, worker_ctx) self._worker_threads[worker_ctx] = gt + + if _log.isEnabledFor(DEBUG): + for key, pool in self._worker_pools.items(): + _log.debug( + 'worker-pool:%s has %s free of %s' % ( + key, pool.free(), pool.size)) + return worker_ctx def spawn_managed_thread(self, fn, protected=None, identifier=None): @@ -383,6 +404,14 @@ def spawn_managed_thread(self, fn, protected=None, identifier=None): gt.link(self._handle_managed_thread_exited, identifier) return gt + def _get_worker_pool_for_entrypoint(self, entrypoint): + key = entrypoint.worker_group or DEFAULT_WORKER_GROUP + return self._worker_pools[key] + + def _wait_for_worker_pools(self): + for pool in self._worker_pools.values(): + pool.waitall() + def _run_worker(self, worker_ctx, handle_result): _log.debug('setting up %s', worker_ctx) diff --git a/nameko/extensions.py b/nameko/extensions.py index 0d95294aa..c5d394f6a 100644 --- a/nameko/extensions.py +++ b/nameko/extensions.py @@ -263,6 +263,7 @@ def register_entrypoint(fn, entrypoint): class Entrypoint(Extension): method_name = None + worker_group = None def bind(self, container, method_name): """ Get an instance of this Entrypoint to bind to `container` with @@ -270,6 +271,9 @@ def bind(self, container, method_name): """ instance = super(Entrypoint, self).bind(container) instance.method_name = method_name + if instance.worker_group: + container.register_worker_group(instance.worker_group) + return instance def check_signature(self, args, kwargs): diff --git a/nameko/rpc.py b/nameko/rpc.py index e84b58846..471c5e9c8 100644 --- a/nameko/rpc.py +++ b/nameko/rpc.py @@ -128,7 +128,9 @@ class Rpc(Entrypoint, HeaderDecoder): rpc_consumer = RpcConsumer() - def __init__(self, expected_exceptions=(), sensitive_variables=()): + def __init__( + self, expected_exceptions=(), sensitive_variables=(), worker_group=None + ): """ Mark a method to be exposed over rpc :Parameters: @@ -148,6 +150,7 @@ def __init__(self, expected_exceptions=(), sensitive_variables=()): """ self.expected_exceptions = expected_exceptions self.sensitive_variables = sensitive_variables + self.worker_group = worker_group def setup(self): self.rpc_consumer.register_provider(self) diff --git a/nameko/web/handlers.py b/nameko/web/handlers.py index d9a2c9f58..90a2bccf0 100644 --- a/nameko/web/handlers.py +++ b/nameko/web/handlers.py @@ -17,10 +17,11 @@ class HttpRequestHandler(Entrypoint): server = WebServer() - def __init__(self, method, url, expected_exceptions=()): + def __init__(self, method, url, expected_exceptions=(), worker_group=None): self.method = method self.url = url self.expected_exceptions = expected_exceptions + self.worker_group = worker_group def get_url_rule(self): return Rule(self.url, methods=[self.method]) From bec511f3aac146b6ba11b548f2e8cb2b9bfb09f4 Mon Sep 17 00:00:00 2001 From: timbu Date: Mon, 16 Jan 2017 14:22:27 +0000 Subject: [PATCH 2/2] remove rpc from sketch --- nameko/rpc.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/nameko/rpc.py b/nameko/rpc.py index 471c5e9c8..e84b58846 100644 --- a/nameko/rpc.py +++ b/nameko/rpc.py @@ -128,9 +128,7 @@ class Rpc(Entrypoint, HeaderDecoder): rpc_consumer = RpcConsumer() - def __init__( - self, expected_exceptions=(), sensitive_variables=(), worker_group=None - ): + def __init__(self, expected_exceptions=(), sensitive_variables=()): """ Mark a method to be exposed over rpc :Parameters: @@ -150,7 +148,6 @@ def __init__( """ self.expected_exceptions = expected_exceptions self.sensitive_variables = sensitive_variables - self.worker_group = worker_group def setup(self): self.rpc_consumer.register_provider(self)