diff --git a/flocker/control/_diffing.py b/flocker/control/_diffing.py index 038d628fde..ea20ba5c0f 100644 --- a/flocker/control/_diffing.py +++ b/flocker/control/_diffing.py @@ -7,6 +7,8 @@ flocker configuration or the flocker state. """ +from eliot import MessageType, Field + from pyrsistent import ( PClass, PMap, @@ -16,7 +18,12 @@ pvector_field, ) -from zope.interface import Interface, implementer +# XXX The _EvolverProxy (or something similar) should be provided by +# Pyrsistent, but meanwhile we'll import this useful private function. +# https://github.com/tobgu/pyrsistent/issues/89 +from pyrsistent._transformations import _get + +from zope.interface import Attribute, Interface, implementer, classImplements class _IDiffChange(Interface): @@ -42,6 +49,19 @@ def apply(obj): """ +@implementer(_IDiffChange) +class _Replace(PClass): + """ + A ``_IDiffChange`` that returns a new root object. + + :ivar value: The item to be the new root object for subsequent operations. + """ + value = field() + + def apply(self, obj): + return _TransformProxy(self.value) + + @implementer(_IDiffChange) class _Remove(PClass): """ @@ -67,16 +87,18 @@ class _Set(PClass): A ``_IDiffChange`` that sets a field in a ``PClass`` or sets a key in a ``PMap``. - :ivar path: The path in the nested object to the field/key to be set to a - new value. - - :ivar value: The value to set the field/key to. + :ivar path: The path in the nested object which supports `set` operations. + :ivar key: The key to set. + :ivar value: The value to set. """ path = pvector_field(object) + key = field() value = field() def apply(self, obj): - return obj.transform(self.path, self.value) + return obj.transform( + self.path, lambda o: o.set(self.key, self.value) + ) @implementer(_IDiffChange) @@ -95,6 +117,300 @@ def apply(self, obj): return obj.transform(self.path, lambda x: x.add(self.item)) +_sentinel = object() + + +class _IEvolvable(Interface): + """ + An interface to mark classes that provide a ``Pyrsistent`` style + ``evolver`` method. + """ + def evolver(): + """ + :returns: A mutable version of the underlying object. + """ + +classImplements(PSet, _IEvolvable) +classImplements(PMap, _IEvolvable) +classImplements(PClass, _IEvolvable) + + +class _ISetType(Interface): + """ + The operations that can be performed when transforming a ``PSet`` object. + """ + def add(item): + """ + Add ``item`` to set. + """ + + def remove(item): + """ + Remove ``item`` from set. + """ + +classImplements(PSet, _ISetType) + + +class _IRecordType(Interface): + """ + The operations that can be performed when transforming a ``PSet`` object. + """ + def set(key, value): + """ + Add or replace the ``key`` in a ``PMap`` with ``value``. + """ + + def remove(item): + """ + Remove the ``key`` in a ``PMap``. + """ + +classImplements(PMap, _IRecordType) +classImplements(PClass, _IRecordType) + + +class _IRecursiveEvolverProxy(Interface): + """ + An interface which allows a structure of nested ``PClass``, ``PMap``, and + ``PSet`` to be evolved recursively. + """ + _original = Attribute( + "The root Pyrsistent object that is being evolved. " + "Must provide ``_IEvolvable``" + ) + _children = Attribute( + "A collection of child ``_IRecursiveEvolverProxy`` objects." + ) + + def commit(): + """ + Recursively persist the structure rooted at ``_original`` starting with + leaf nodes. + + :returns: The persisted immutable structure. + """ + + +@implementer(_IRecursiveEvolverProxy) +@implementer(_ISetType) +class _EvolverProxyForSet(object): + """ + A proxy for recursively evolving a ``PSet``. + """ + def __init__(self, original): + """ + :param _ISetType original: See ``_IRecursiveEvolverProxy._original``. + """ + self._original = original + self._evolver = original.evolver() + self._children = {} + + def add(self, item): + """ + Add ``item`` to the ``original`` ``Pset`` or if the item is itself a + Pyrsistent object, add a new proxy for that item so that further + operations can be performed on it without triggering invariant checks + until the tree is finally committed. + + :param item: An object to be added to the ``PSet`` wrapped by this + proxy. + :returns: ``self`` + """ + if _IEvolvable.providedBy(item): + self._children[item] = _proxy_for_evolvable_object(item) + else: + self._evolver.add(item) + return self + + def remove(self, item): + """ + Remove the ``item`` in an evolver of the ``original`` ``PSet``, and if + the item is an uncommitted ``_EvolverProxy`` remove it from the list of + children so that the item is not persisted when the structure is + finally committed. + + :param item: The object to be removed from the wrapped ``PSet`` + :returns: ``self`` + """ + self._children.pop(item, None) + # Attempt to remove the item from the evolver too. It may be something + # that was replaced rather than added by a previous ``set`` operation. + try: + self._evolver.remove(item) + except KeyError: + pass + return self + + def commit(self): + for segment, child_evolver_proxy in self._children.items(): + child = child_evolver_proxy.commit() + self._evolver.add(child) + return self._evolver.persistent() + + +@implementer(_IRecursiveEvolverProxy) +@implementer(_IRecordType) +class _EvolverProxyForRecord(object): + """ + A proxy for recursively evolving a ``PMap`` or ``PClass``. + """ + def __init__(self, original): + """ + :param _IRecordType original: See + ``_IRecursiveEvolverProxy._original``. + """ + self._original = original + self._evolver = original.evolver() + self._children = {} + + def set(self, key, item): + """ + Set the ``item`` in an evolver of the ``original`` ``PMap`` or + ``PClass`` or if the item is itself a Pyrsistent object, add a new + proxy for that item so that further operations can be performed on it + without triggering invariant checks until the tree is finally + committed. + + :param item: An object to be added or set on the ``PMap`` wrapped by + this proxy. + :returns: ``self`` + """ + if _IEvolvable.providedBy(item): + # This will replace any existing proxy. + self._children[key] = _proxy_for_evolvable_object(item) + else: + self._evolver.set(key, item) + return self + + def remove(self, key): + """ + Remove the ``key`` in an evolver of the ``original`` ``PMap``, or + ``PClass`` and if the item is an uncommitted ``_EvolverProxy`` remove + it from the list of children so that the item is not persisted when the + structure is finally committed. + + :param key: The key to be removed from the wrapped ``PMap`` + :returns: ``self`` + """ + self._children.pop(key, None) + # Attempt to remove the item from the evolver too. It may be something + # that was replaced rather than added by a previous ``set`` operation. + try: + self._evolver.remove(key) + except KeyError: + pass + return self + + def commit(self): + for segment, child_evolver_proxy in self._children.items(): + child = child_evolver_proxy.commit() + self._evolver.set(segment, child) + return self._evolver.persistent() + + +def _proxy_for_evolvable_object(obj): + """ + :returns: an ``_IRecursiveEvolverProxy`` suitable for the type of ``obj``. + """ + if not _IEvolvable.providedBy(obj): + raise TypeError( + "{!r} does not provide {}".format( + obj, + _IEvolvable.__name__ + ) + ) + if _ISetType.providedBy(obj): + return _EvolverProxyForSet(obj) + elif _IRecordType.providedBy(obj): + return _EvolverProxyForRecord(obj) + else: + raise TypeError("Object '{}' does not provide a supported interface") + + +def _get_or_add_proxy_child(parent_proxy, segment): + """ + Returns a proxy wrapper around the ``_IEvolvable`` object corresponding to + ``segment``. A new proxy is created if one does not already exist and it is + added to ``parent_proxy._children``. + + :param _IParentProxy parent_proxy: The parent. + :param unicode segment: The label in a ``path`` supplied to ``transform``. + :returns: + """ + child = parent_proxy._children.get(segment) + if child is not None: + return child + child = _get(parent_proxy._original, segment, _sentinel) + if child is _sentinel: + raise KeyError( + "Attribute or key '{}' not found in {}".format( + segment, parent_proxy._original + ) + ) + proxy_for_child = _proxy_for_evolvable_object(child) + parent_proxy._children[segment] = proxy_for_child + return proxy_for_child + + +@implementer(_IRecursiveEvolverProxy) +class _TransformProxy(object): + """ + This attempts to bunch a the ``transform`` operations performed when + applying a sequence of diffs into a single transaction so that related + attributes can be ``set`` without triggering an in invariant error. + Leaf nodes are persisted first and in isolation, so as not to trigger + invariant errors in ancestor nodes. + """ + def __init__(self, original): + """ + :param _IEvolvable original: The root object to which transformations + will be applied. + """ + self._root = _proxy_for_evolvable_object(original) + + def transform(self, path, operation): + """ + Traverse each segment of ``path`` to create a hierarchy of + ``_EvolverProxy`` objects and perform the ``operation`` on the + resulting leaf proxy object. This will infact perform the operation on + an evolver of the original Pyrsistent object. + + The object corresponding to the last segment of ``path`` must provide + the ``_IEvolvable`` interface. + + :param PVector path: The path relative to ``original`` which will be + operated on. + :param callable operation: A function to be applied to an evolver of + the object at ``path`` + :returns: ``self`` + """ + target = self._root + for segment in path: + target = _get_or_add_proxy_child(target, segment) + operation(target) + return self + + def commit(self): + return self._root.commit() + + +TARGET_OBJECT = Field( + u"target_object", repr, + u"The object to which the diff was applied." +) +CHANGES = Field( + u"changes", repr, + u"The changes being applied." +) + +DIFF_COMMIT_ERROR = MessageType( + u"flocker:control:Diff:commit_error", + [TARGET_OBJECT, CHANGES], + u"The target and changes that failed to apply." +) + + @implementer(_IDiffChange) class Diff(PClass): """ @@ -111,9 +427,19 @@ class Diff(PClass): changes = pvector_field(object) def apply(self, obj): + proxy = _TransformProxy(original=obj) for c in self.changes: - obj = c.apply(obj) - return obj + proxy = c.apply(proxy) + try: + return proxy.commit() + except: + # Imported here to avoid circular dependencies. + from ._persistence import wire_encode + DIFF_COMMIT_ERROR( + target_object=wire_encode(obj), + changes=wire_encode(self.changes), + ).write() + raise def _create_diffs_for_sets(current_path, set_a, set_b): @@ -176,7 +502,7 @@ def _create_diffs_for_mappings(current_path, mapping_a, mapping_b): ) for key in b_keys.difference(a_keys): resulting_diffs.append( - _Set(path=current_path.append(key), value=mapping_b[key]) + _Set(path=current_path, key=key, value=mapping_b[key]) ) for key in a_keys.difference(b_keys): resulting_diffs.append( @@ -204,8 +530,6 @@ def _create_diffs_for(current_path, subobj_a, subobj_b): """ if subobj_a == subobj_b: return pvector([]) - elif type(subobj_a) != type(subobj_b): - return pvector([_Set(path=current_path, value=subobj_b)]) elif isinstance(subobj_a, PClass) and isinstance(subobj_b, PClass): a_dict = subobj_a._to_dict() b_dict = subobj_b._to_dict() @@ -219,7 +543,20 @@ def _create_diffs_for(current_path, subobj_a, subobj_b): # If the objects are not equal, and there is no intelligent way to recurse # inside the objects to make a smaller diff, simply set the current path # to the object in b. - return pvector([_Set(path=current_path, value=subobj_b)]) + if len(current_path) > 0: + return pvector([ + _Set( + path=current_path[:-1], + key=current_path[-1], + value=subobj_b + ) + ]) + # Or if there's no path we're replacing the root object to which subsequent + # ``_IDiffChange`` operations will be applied. + else: + return pvector([ + _Replace(value=subobj_b) + ]) def create_diff(object_a, object_b): @@ -261,5 +598,5 @@ def compose_diffs(iterable_of_diffs): # Ensure that the representation of a ``Diff`` is entirely serializable: DIFF_SERIALIZABLE_CLASSES = [ - _Set, _Remove, _Add, Diff + _Set, _Remove, _Add, Diff, _Replace ] diff --git a/flocker/control/test/test_diffing.py b/flocker/control/test/test_diffing.py index 8ba39db626..0970c64701 100644 --- a/flocker/control/test/test_diffing.py +++ b/flocker/control/test/test_diffing.py @@ -7,16 +7,24 @@ from json import dumps from uuid import uuid4 +from eliot.testing import capture_logging, assertHasMessage from hypothesis import given import hypothesis.strategies as st -from pyrsistent import PClass, field, pmap, pset +from pyrsistent import PClass, field, pmap, pset, InvariantException +from twisted.python.monkey import MonkeyPatcher -from .._diffing import create_diff, compose_diffs +from .._diffing import ( + create_diff, + compose_diffs, + DIFF_COMMIT_ERROR, + _TransformProxy, +) from .._persistence import wire_encode, wire_decode from .._model import Node, Port from ..testtools import ( application_strategy, deployment_strategy, + node_strategy, related_deployments_strategy ) @@ -183,7 +191,6 @@ def test_different_objects(self): object_a = DiffTestObj(a=pset(xrange(1000))) object_b = pmap({'1': 34}) diff = create_diff(object_a, object_b) - self.assertThat( wire_decode(wire_encode(diff)).apply(object_a), Equals(object_b) @@ -202,3 +209,286 @@ def test_different_uuids(self): wire_decode(wire_encode(diff)).apply(object_a), Equals(object_b) ) + + +class DiffTestObjInvariant(PClass): + """ + Simple pyrsistent object with an invariant that spans multiple fields. + + Diffs which swap the values of the fields will trigger ``InvariantError` + unless ``_perform_invariant_check`` is set to ``False`` or the diff is + applied to an evolver object. + """ + _perform_invariant_check = True + a = field() + b = field() + + def __invariant__(self): + if self._perform_invariant_check and self.a == self.b: + return (False, "a must not equal b") + else: + return (True, "") + + +class InvariantDiffTests(TestCase): + """ + Tests for creating and applying diffs to objects with invariant checks. + """ + def test_straight_swap(self): + """ + A diff composed of two separate ``set`` operations can be applied to an + object without triggering an invariant exception. + """ + o1 = DiffTestObjInvariant( + a=1, + b=2, + ) + o2 = DiffTestObjInvariant( + a=2, + b=1, + ) + diff = create_diff(o1, o2) + self.expectThat(len(diff.changes), Equals(2)) + self.assertEqual( + o2, + diff.apply(o1) + ) + + def test_deep_swap(self): + """ + A diff composed of two separate ``set`` operations can be applied to a + nested object without triggering an invariant exception. + """ + a = DiffTestObjInvariant( + a=1, + b=2, + ) + b = DiffTestObjInvariant( + a=3, + b=4, + ) + o1 = DiffTestObjInvariant( + a=a, + b=b, + ) + o2 = o1.transform( + ['a'], + DiffTestObjInvariant( + a=2, + b=1, + ) + ) + diff = create_diff(o1, o2) + self.expectThat(len(diff.changes), Equals(2)) + self.assertEqual( + o2, + diff.apply(o1) + ) + + @capture_logging(assertHasMessage, DIFF_COMMIT_ERROR) + def test_error_logging(self, logger): + """ + Failures while applying a diff emit a log message containing the full + diff. + """ + o1 = DiffTestObjInvariant( + a=1, + b=2, + ) + patcher = MonkeyPatcher() + patcher.addPatch( + DiffTestObjInvariant, + '_perform_invariant_check', + False + ) + patcher.patch() + try: + o2 = o1.set('b', 1) + finally: + patcher.restore() + diff = create_diff(o1, o2) + self.assertRaises( + InvariantException, + diff.apply, + o1, + ) + + def test_application_add(self): + """ + A diff on a Node, which *adds* and application with a volume *and* the + manifestation for the volume, can be applied without triggering an + invariant error on the Node. + """ + node2 = node_strategy( + min_number_of_applications=1, + stateful_applications=True, + ).example() + application = node2.applications.values()[0] + node1 = node2.transform( + ['applications'], + lambda o: o.remove(application.name) + ).transform( + ['manifestations'], + lambda o: o.remove(application.volume.manifestation.dataset_id) + ) + diff = create_diff(node1, node2) + self.assertEqual( + node2, + diff.apply(node1), + ) + + def test_application_modify(self): + """ + A diff on a Node, which adds a volume to an *existing* application + volume *and* the manifestation for the volume, can be applied without + triggering an invariant error on the Node. + """ + node2 = node_strategy( + min_number_of_applications=1, + stateful_applications=True, + ).example() + application = node2.applications.values()[0] + volume = application.volume + node1 = node2.transform( + ['applications', application.name], + lambda o: o.set('volume', None) + ).transform( + ['manifestations'], + lambda o: o.remove(volume.manifestation.dataset_id) + ) + diff = create_diff(node1, node2) + self.assertEqual( + node2, + diff.apply(node1), + ) + + +class TransformProxyTests(TestCase): + """ + Tests for ``_TransformProxy``. + """ + def test_type_error(self): + """ + The wrapped object must provide _IEvolvable. + """ + e = self.assertRaises( + TypeError, + _TransformProxy, + 1 + ) + self.assertEqual( + '1 does not provide _IEvolvable', + e.message, + ) + + def test_commit_no_change(self): + """ + ``commit`` returns the original object if no changes have been + performed. + """ + original = pmap() + self.assertIs(original, _TransformProxy(original).commit()) + + def test_transform_keyerror(self): + """ + ``transform`` raises ``KeyError`` if the supplied ``path`` is not + found. + """ + e = self.assertRaises( + KeyError, + _TransformProxy(pmap()).transform, + ['a'], 1 + ) + self.assertEqual( + "Attribute or key 'a' not found in pmap({})", + e.message, + ) + + def test_transform_typeerror(self): + """ + ``transform`` raises ``TypeError`` if the object at the supplied + ``path`` does not provide ``_IEvolvable``. + """ + proxy = _TransformProxy(pmap({'a': 1})) + + e = self.assertRaises( + TypeError, + proxy.transform, + ['a'], 2, + ) + self.assertEqual( + "1 does not provide _IEvolvable", + e.message + ) + + def test_transform_empty_path(self): + """ + If ``transform`` is supplied with an empty path, the operation is + performed on the root object. + """ + proxy = _TransformProxy(pmap({'a': 1})) + proxy.transform([], lambda o: o.set('a', 2)) + self.assertEqual( + pmap({'a': 2}), + proxy.commit(), + ) + + def test_transform_deep_path(self): + """ + If ``transform`` is supplied with a path containing multiple segments, + the operation is performed on the object corresponding to the last + segment. + """ + proxy = _TransformProxy( + pmap({ + 'a': pmap({ + 'b': pmap({ + 'c': 1 + }) + }) + }) + ) + proxy.transform(['a', 'b'], lambda o: o.set('c', 2)) + self.assertEqual( + pmap({ + 'a': pmap({ + 'b': pmap({ + 'c': 2 + }) + }) + }), + proxy.commit(), + ) + + def test_transform_deep_evolver(self): + """ + ``transform`` can perform operations on nested objects that have + invariant constraints, without triggering the InvariantException. + """ + proxy = _TransformProxy( + pmap({ + 'a': pmap({ + 'b': pmap({ + 'c': DiffTestObjInvariant( + a=1, b=2 + ) + }) + }) + }) + ) + # If these operations were performed directly on the Pyrsistent + # structure it'd trigger InvariantException. + proxy.transform(['a', 'b', 'c'], lambda o: o.set('a', 2)) + proxy.transform(['a', 'b', 'c'], lambda o: o.set('b', 1)) + self.assertEqual( + pmap({ + 'a': pmap({ + 'b': pmap({ + 'c': DiffTestObjInvariant( + a=2, b=1 + ) + }) + }) + }), + proxy.commit(), + ) diff --git a/flocker/control/testtools.py b/flocker/control/testtools.py index 32248a24fd..520b415a80 100644 --- a/flocker/control/testtools.py +++ b/flocker/control/testtools.py @@ -3,12 +3,14 @@ """ Tools for testing :py:module:`flocker.control`. """ +from uuid import uuid4 from zope.interface.verify import verifyObject from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet.ssl import ClientContextFactory from twisted.internet.task import Clock +from twisted.python.filepath import FilePath from twisted.test.proto_helpers import MemoryReactor from ..testtools import TestCase @@ -21,10 +23,13 @@ from ._registry import IStatePersister, InMemoryStatePersister from ._model import ( Application, + AttachedVolume, + Dataset, DatasetAlreadyOwned, Deployment, DockerImage, Lease, + Manifestation, Node, PersistentState, Port, @@ -214,7 +219,7 @@ def docker_image_strategy( @composite -def application_strategy(draw, min_number_of_ports=0): +def application_strategy(draw, min_number_of_ports=0, stateful=False): """ A hypothesis strategy to generate an ``Application`` @@ -227,7 +232,8 @@ def application_strategy(draw, min_number_of_ports=0): max_value=max(8, min_number_of_ports+1) ) ) - return Application( + dataset_id = unicode(uuid4()) + application = Application( name=draw(unique_name_strategy()), image=draw(docker_image_strategy()), ports=frozenset( @@ -235,13 +241,30 @@ def application_strategy(draw, min_number_of_ports=0): internal_port=8000+i, external_port=8000+i+1 ) for i in xrange(num_ports) - ) + ), ) + if stateful: + application = application.set( + 'volume', + AttachedVolume( + manifestation=Manifestation( + dataset=Dataset( + dataset_id=dataset_id, + deleted=False, + ), + primary=True, + ), + mountpoint=FilePath('/flocker').child(dataset_id) + ) + ) + return application @composite def node_strategy( draw, + min_number_of_applications=0, + stateful_applications=False, uuid=st.uuids(), applications=application_strategy() ): @@ -253,17 +276,24 @@ def node_strategy( :param applications: The strategy to use to generate the applications on the Node. """ - applications = draw(st.lists( - application_strategy(), - min_size=0, - average_size=2, - max_size=5 - )) + applications = { + a.name: a for a in + draw( + st.lists( + application_strategy(stateful=stateful_applications), + min_size=min_number_of_applications, + average_size=2, + max_size=5 + ) + ) + } return Node( uuid=draw(uuid), - applications={ - a.name: a - for a in applications + applications=applications, + manifestations={ + a.volume.manifestation.dataset_id: a.volume.manifestation + for a in applications.values() + if a.volume is not None } ) @@ -341,7 +371,7 @@ def deployment_strategy( draw( lease_strategy( dataset_id=st.just(dataset_id), - node_uuid=st.just(node_uuid) + node_id=st.just(node_uuid) ) ) for dataset_id, node_uuid in ( dataset_id_node_mapping.items()[i] for i in lease_indexes