diff --git a/ec2/templates/docker-compose.yml b/ec2/templates/docker-compose.yml index 3beb3ae1..644879ae 100644 --- a/ec2/templates/docker-compose.yml +++ b/ec2/templates/docker-compose.yml @@ -65,6 +65,7 @@ nextbackenddocker: - rabbitmqredis:RABBITREDIS - mongodb:MONGODB - minionworker:MINIONWORKER + - minionredis:6379 environment: - PYTHONUNBUFFERED=TRUE - PYTHONPATH=:/next_backend diff --git a/local/docker-compose.yml.pre b/local/docker-compose.yml.pre index 36ceeaed..1637195b 100644 --- a/local/docker-compose.yml.pre +++ b/local/docker-compose.yml.pre @@ -66,6 +66,7 @@ nextbackenddocker: - rabbitmqredis:RABBITREDIS - mongodb:MONGODB - minionworker:MINIONWORKER + - minionredis:6379 environment: - PYTHONUNBUFFERED=TRUE - PYTHONPATH=:/next_backend diff --git a/next/apps/Butler.py b/next/apps/Butler.py index cfb7a84b..308c530c 100644 --- a/next/apps/Butler.py +++ b/next/apps/Butler.py @@ -1,6 +1,16 @@ import os +import sys import StringIO from functools import wraps +import pickle +import numpy as np +import traceback +import time +import ast +import inspect +from pprint import pprint +from functools import partial +import itertools import redis @@ -8,122 +18,186 @@ import next.utils as utils +def mem_except_wrap(func): + def f(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + utils.debug_print("Butler.Collection.Memory.get exception: {}".format(e)) + utils.debug_print(traceback.format_exc()) + return f + + class Memory(object): def __init__(self, collection='', exp_uid='', uid_prefix=''): self.key_prefix = collection + uid_prefix.format(exp_uid=exp_uid) self.cache = None self.max_entry_size = 500000000 # 500MB - - def check_prefix(self): - if self.key_prefix == '': - utils.debug_print("butler.memory is deprecated." - " Change to butler.experiment.memory or butler.algorithm.memory, etc." - " wherever appropriate") - - def ensure_connection(self): - try: - if self.cache is None: - self.cache = redis.StrictRedis(host=constants.MINIONREDIS_HOST, port=constants.MINIONREDIS_PORT) - except Exception as e: - raise Exception("Butler.Collection.Memory could not connect with RedisDB: {}".format(e)) - - def num_entries(self, size): + fns = inspect.getmembers(self, predicate=inspect.ismethod) + for name, fn in fns: + if name[0] == '_' or 'ensure_connection' in name: + continue + self.name = self._check_prefix(self._ensure_connection(fn)) + + def _check_prefix(self, fn=None): + def f(*args, **kwargs): + if self.key_prefix == '': + utils.debug_print("butler.memory is deprecated." + " Change to butler.experiment.memory or butler.algorithm.memory, etc." + " wherever appropriate") + return fn(*args, **kwargs) + return f + + def _ensure_connection(self, fn=None): + if self.cache is None: + self.cache = redis.StrictRedis(host='minionredis_1') + return fn + + + def _num_entries(self, size): if size % self.max_entry_size == 0: return size / self.max_entry_size else: return (size / self.max_entry_size) + 1 - def set(self, key, value): - self.check_prefix() + @mem_except_wrap + def set(self, key=None, value=None, uid='', verbose=True): + key = self.key_prefix + key + uid + l = sys.getsizeof(value) + if l < self.max_entry_size and not isinstance(value, np.ndarray): + return self.cache.set(key, value) + value = pickle.dumps(value) + n = self._num_entries(l) + if verbose: + utils.debug_print("Butler.py memory setting {} in {} entries".format(l, n)) + for i in range(n): + k = key + ":" + str(i) + self.cache.set(k, value[i*self.max_entry_size:(i+1)*self.max_entry_size]) + return self.cache.set(key, "__redis_set__" + "{}:{}".format(str(n), str(l))) + + @mem_except_wrap + def set_file(self, key, f, verbose=False): key = self.key_prefix + key - try: - self.ensure_connection() - l = len(value) - n = self.num_entries(l) - utils.debug_print("Setting {} in {} entries".format(l, n)) - for i in range(n): - k = key + ":" + str(i) - self.cache.set(k, value[i*self.max_entry_size:(i+1)*self.max_entry_size]) - return self.cache.set(key, "{}:{}".format(str(n), str(l))) - except Exception as e: - utils.debug_print("Butler.Collection.Memory.set exception: {}".format(e)) - return False + f.seek(0, os.SEEK_END) + l = f.tell() + f.seek(0, 0) + n = self._num_entries(l) + if verbose: + utils.debug_print("butler.py memory setting {} bytes in {} entries".format(l, n)) + for i in range(n): + k = key + ":" + str(i) + v = f.read(self.max_entry_size) + self.cache.set(k, v) + return self.cache.set(key, "{}:{}".format(str(n), str(l))) + + @mem_except_wrap + def get(self, key=None, uid='', verbose=False): + key = self.key_prefix + key + uid + d = self.cache.get(key) + msg = '__redis_set__' + pickled = isinstance(d, str) and len(d) >= len(msg) and d[:len(msg)] == msg + if not pickled: + return self._from_db_fmt(d) + + d = d[len(msg):] + n, l = d.split(":") + l = int(l) + n = int(n) + ans = "" + if verbose: + utils.debug_print("Getting {} bytes in {} entries".format(l, n)) + for i in range(n): + k = key + ":" + str(i) + ans += self.cache.get(k) + return pickle.loads(ans) - def set_file(self, key, f): - self.check_prefix() + @mem_except_wrap + def get_file(self, key): key = self.key_prefix + key - try: - self.ensure_connection() - f.seek(0, os.SEEK_END) - l = f.tell() - f.seek(0, 0) - n = self.num_entries(l) - utils.debug_print("Setting {} bytes in {} entries".format(l, n)) - for i in range(n): - k = key + ":" + str(i) - v = f.read(self.max_entry_size) - self.cache.set(k, v) - return self.cache.set(key, "{}:{}".format(str(n), str(l))) - except Exception as e: - utils.debug_print("Butler.Collection.Memory.set_file exception: {}".format(e)) - return False + d = self.cache.get(key) + f = StringIO.StringIO() + n, l = d.split(":") + l = int(l) + n = int(n) + utils.debug_print("Getting {} bytes in {} entries".format(l, n)) + for i in range(n): + k = key + ":" + str(i) + f.write(self.cache.get(k)) + f.seek(0, 0) + return f + + @mem_except_wrap + def lock(self, name, **kwargs): + name = self.key_prefix + name + return self.cache.lock(name, **kwargs) + + @mem_except_wrap + def exists(self, key, uid=''): + key = self.key_prefix + key + uid + return self.cache.exists(key) + + @mem_except_wrap + def append(self, key=None, value=None, uid=''): + if not isinstance(value, list): + value = [value] + if not self.exists(key, uid=uid): + old = "[]" + else: + old_key = self.key_prefix + key + uid + old = self.cache.get(old_key) + old = ast.literal_eval(old) - def get(self, key): - self.check_prefix() - try: - self.ensure_connection() - key = self.key_prefix + key - d = self.cache.get(key) - n, l = d.split(":") - l = int(l) - n = int(n) - ans = "" - utils.debug_print("Getting {} bytes in {} entries".format(l, n)) - for i in range(n): - k = key + ":" + str(i) - ans += self.cache.get(k) - return ans - except Exception as e: - utils.debug_print("Butler.Collection.Memory.get exception: {}".format(e)) - return None + key = self.key_prefix + key + uid + return self.cache.set(key, old + value) - def get_file(self, key): - self.check_prefix() - try: - self.ensure_connection() - key = self.key_prefix + key - d = self.cache.get(key) - f = StringIO.StringIO() - n, l = d.split(":") - l = int(l) - n = int(n) - utils.debug_print("Getting {} bytes in {} entries".format(l, n)) - for i in range(n): - k = key + ":" + str(i) - f.write(self.cache.get(k)) - f.seek(0, 0) - return f - except Exception as e: - utils.debug_print("Butler.Collection.Memory.get_file exception: {}".format(e)) - return None + @mem_except_wrap + def increment(self, key=None, value=1, uid=''): + key = key + uid + key = self.key_prefix + key + return self.cache.incr(key, amount=value) + + @mem_except_wrap + def set_many(self, key_value_dict=None, uid=''): + with self.cache.pipeline() as pipe: + for k, v in key_value_dict.items(): + k = self.key_prefix + k + uid + v = self._to_db_fmt(v) + pipe.set(k, v) + ret = pipe.execute() + return ret + + @mem_except_wrap + def get_many(self, key=None, uid=''): + keys = [k for k in key] + with self.cache.pipeline() as pipe: + out = [] + for key in keys: + key = self.key_prefix + key + uid + pipe.get(key) + values = pipe.execute() + return {k: self._from_db_fmt(v) for k, v in zip(keys, values)} + + @mem_except_wrap + def pipeline(self): + """ + Returns redis pipeline. Get values with pipe.execute(), + close with pipe.close() + """ + return self.cache.pipeline() - def lock(self, name, **kwargs): - try: - self.ensure_connection() - name = self.key_prefix + name - return self.cache.lock(name, **kwargs) - except Exception as e: - utils.debug_print("Butler.Collection.Memory.lock exception: {}".format(e)) - return None + def _to_db_fmt(self, v): + if isinstance(v, np.ndarray): + return pickle.dumps(v) + return v - def exists(self, key): + def _from_db_fmt(self, v): try: - self.ensure_connection() - key = self.key_prefix + key - return self.cache.exists(key) - except Exception as e: - utils.debug_print("Butler.Collection.Memory.exists exception: {}".format(e)) - return None + return ast.literal_eval(v) + except: + try: + return pickle.loads(v) + except: + return v class Collection(object): @@ -194,6 +268,9 @@ def get(self, uid="", key=None, pattern=None, exp=None): else: return self.db.get_docs_with_filter(self.collection, pattern) + def get_many(self, **kwargs): + return self.get(**kwargs) + @timed(op_type='get') def get_and_delete(self, uid="", key=None, exp=None): """ diff --git a/next/apps/tests/test_memory.py b/next/apps/tests/test_memory.py new file mode 100644 index 00000000..9760b084 --- /dev/null +++ b/next/apps/tests/test_memory.py @@ -0,0 +1,82 @@ +import sys +import pytest +import numpy as np +import itertools +try: + from next.apps.Butler import Memory +except: + sys.path.append('..') + sys.path.append('../..') + sys.path.append('../../..') + sys.path.append('../../../..') + from Butler import Memory + + +def test_ensure_connection(): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + mem._ensure_connection() + + +@pytest.mark.parametrize("value", [ + 'bar', 1, 2, {'dict': True}, ('tuple', True), + [1, 2, 3], ['1', '2'] +]) +def test_set_and_get(value): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + mem.set('foo-gs', value) + assert mem.get('foo-gs') == value + + +def test_exists(): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + mem.set('foo-exist', 'bar') + assert mem.exists('foo-exist') + + +def test_append(): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + mem.set('foo-append2', [1, 2, 3]) + assert mem.append('foo-append2', [4]) + assert mem.get('foo-append2') == [1, 2, 3, 4] + +def test_append_no_initial(): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + for i in itertools.count(start=0, step=1): + key = 'foo-append' + str(hash(i)) + if mem.exists(key): + continue + mem.append(key, [4]) + assert mem.get(key) == [4] + break + + +def test_get_ndarray(): + x = np.linspace(0, 1) + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + mem.set('x', x) + y = mem.get('x') + assert np.allclose(x, y) + + +def test_increment(): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + mem.set('foo-incr', 0) + assert mem.increment('foo-incr') == 1 + assert mem.increment('foo-incr', value=2) == 3 + +def test_set_many(): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + values = {'a': 1, 'b': 2} + mem.set_many(values) + + out = {k: mem.get(k) for k in values} + assert out == values + +def test_get_many(): + mem = Memory(exp_uid=hash(1), uid_prefix='abc') + values = {'a': 1, 'b': 2} + for k, v in values.items(): + mem.set(k, v) + keys = list(values.keys()) + out = mem.get_many(keys) + assert values == out diff --git a/next/base_docker_image/requirements.txt b/next/base_docker_image/requirements.txt index 399a50a4..cc82ebb6 100644 --- a/next/base_docker_image/requirements.txt +++ b/next/base_docker_image/requirements.txt @@ -30,6 +30,7 @@ ply==3.10 pymongo==3.4.0 pyparsing==2.2.0 python-dateutil==2.6.1 +pytest==3.2.1 pytz==2017.2 PyYAML==3.12 redis==2.10.5