diff --git a/source/ftrack_api/event/hub_proxy.py b/source/ftrack_api/event/hub_proxy.py new file mode 100644 index 00000000..fb756b3a --- /dev/null +++ b/source/ftrack_api/event/hub_proxy.py @@ -0,0 +1,86 @@ +# :coding: utf-8 +# :copyright: Copyright (c) 2024 ftrack + +"""Session-scoped EventHub proxy for tracking subscriptions.""" + +from __future__ import absolute_import + +from builtins import object + + +class SessionEventHubProxy(object): + """Proxy for EventHub that tracks subscriptions per session. + + This wrapper intercepts subscribe() and unsubscribe() calls to maintain + a session-local list of subscriber IDs. When the session is closed, only + the subscribers created by that session are unsubscribed, leaving other + sessions' subscribers intact on the shared EventHub. + """ + + def __init__(self, event_hub, session): + """Initialize the proxy. + + Args: + event_hub: The underlying EventHub instance (may be shared). + session: The Session instance that owns this proxy. + """ + self._event_hub = event_hub + self._session = session + + def subscribe(self, subscription, callback, subscriber=None, priority=100): + """Subscribe to events and track the subscriber ID for this session. + + All arguments are passed through to the underlying EventHub.subscribe(). + The returned subscriber ID is tracked for cleanup when this session closes. + + Args: + subscription (str): The subscription expression. + callback (callable): The callback to invoke when matching events occur. + subscriber (dict): Optional subscriber metadata. + priority (int): Optional priority (lower = earlier execution). + + Returns: + str: The subscriber identifier. + """ + subscriber_id = self._event_hub.subscribe( + subscription, callback, subscriber=subscriber, priority=priority + ) + + # Track this subscriber for session cleanup + if not hasattr(self._session, '_session_subscribers'): + self._session._session_subscribers = [] + self._session._session_subscribers.append(subscriber_id) + + return subscriber_id + + def unsubscribe(self, subscriber_identifier): + """Unsubscribe and remove from session tracking. + + Args: + subscriber_identifier (str): The subscriber identifier to unsubscribe. + """ + self._event_hub.unsubscribe(subscriber_identifier) + + # Remove from session tracking + if hasattr(self._session, '_session_subscribers'): + if subscriber_identifier in self._session._session_subscribers: + self._session._session_subscribers.remove( + subscriber_identifier) + + def __getattr__(self, name): + """Delegate all other method/attribute access to the underlying EventHub. + + This allows the proxy to be used transparently as if it were the + actual EventHub instance. + + Args: + name (str): The attribute name to access. + + Returns: + The attribute from the underlying EventHub. + """ + return getattr(self._event_hub, name) + + def __repr__(self): + """Return string representation.""" + return ''.format(self._event_hub) diff --git a/source/ftrack_api/event/hub_registry.py b/source/ftrack_api/event/hub_registry.py new file mode 100644 index 00000000..bab68071 --- /dev/null +++ b/source/ftrack_api/event/hub_registry.py @@ -0,0 +1,221 @@ +# :coding: utf-8 +# :copyright: Copyright (c) 2024 ftrack + +"""EventHub singleton registry for connection sharing across sessions.""" + +from __future__ import absolute_import + +import threading +import weakref +import logging + + +class EventHubRegistry(object): + """Global registry for shared EventHub instances. + + Manages EventHub instances to ensure that sessions with identical + credentials can share the same WebSocket connection to the server. + + This prevents connection exhaustion when multiple Session instances + are created with the same credentials. + """ + + def __init__(self): + """Initialize the registry.""" + self._hubs = {} # (server_url, api_user, api_key, plugin_paths_key) -> EventHub + self._hub_sessions = {} # hub_key -> WeakSet of session weakrefs + self._hub_connect_locks = {} # hub_key -> Lock (for first connection) + self._hub_plugins_discovered = {} # hub_key -> bool (plugins discovered?) + self._lock = threading.RLock() + self.logger = logging.getLogger( + __name__ + '.' + self.__class__.__name__) + + def get_or_create( + self, + server_url, + api_user, + api_key, + plugin_paths=None, + headers=None, + cookies=None, + auto_connect=False, + ): + """Get existing EventHub or create new one for credentials. + + Args: + server_url (str): The ftrack server URL. + api_user (str): The API user to authenticate as. + api_key (str): The API key to authenticate with. + plugin_paths (list): Optional list of plugin paths (affects hub key). + headers (dict): Optional custom headers. + cookies (dict): Optional custom cookies. + auto_connect (bool): Whether to automatically connect the hub. + + Returns: + tuple: (EventHub, hub_key) - Shared EventHub instance and its key. + + Note: + Headers and cookies are only used when creating a new hub. + If a hub already exists for these credentials, the provided + headers/cookies are ignored (the existing hub is returned). + + Plugin paths are included in the hub key to ensure sessions with + different plugin configurations use separate hubs, preventing + duplicate plugin subscriptions. + """ + # Create key from credentials and plugin paths + # Include plugin_paths to avoid plugin subscription conflicts + plugin_paths_key = tuple(sorted(plugin_paths)) if plugin_paths else () + key = (server_url, api_user, api_key, plugin_paths_key) + + with self._lock: + if key not in self._hubs: + # Import here to avoid circular dependency + import ftrack_api.event.hub + + # Create new hub + hub = ftrack_api.event.hub.EventHub( + server_url, api_user, api_key, headers=headers, cookies=cookies + ) + + self._hubs[key] = hub + self._hub_sessions[key] = weakref.WeakSet() + self._hub_connect_locks[key] = threading.Lock() + self._hub_plugins_discovered[key] = False + + self.logger.debug( + 'Created new shared EventHub for {0}@{1}'.format( + api_user, server_url + ) + ) + + # Connect if requested + if auto_connect: + self._ensure_connected(key, hub) + else: + existing_hub = self._hubs[key] + self.logger.debug( + 'Reusing existing EventHub for {0}@{1} ({2} active sessions)'.format( + api_user, server_url, len(self._hub_sessions[key]) + ) + ) + + # If auto_connect requested and hub not connected, connect it + if auto_connect and not existing_hub.connected: + self._ensure_connected(key, existing_hub) + + hub = existing_hub + + return hub, key + + def _ensure_connected(self, hub_key, hub): + """Ensure the hub is connected (thread-safe). + + Args: + hub_key: The hub registry key. + hub: The EventHub instance to connect. + """ + # Use per-hub lock to ensure only one thread initiates connection + with self._hub_connect_locks[hub_key]: + if not hub.connected and not hub._connection_initialised: + hub.init_connection() + # Connect in background thread + connect_thread = threading.Thread(target=hub.connect) + connect_thread.daemon = True + connect_thread.start() + + def register_session(self, hub_key, session): + """Register a session as using this hub. + + Args: + hub_key: The hub registry key (server_url, api_user, api_key). + session: The session object (WeakSet will create weak ref automatically). + """ + with self._lock: + if hub_key in self._hub_sessions: + self._hub_sessions[hub_key].add(session) + + def on_session_deleted(self, hub_key): + """Callback when a session is garbage collected. + + Args: + hub_key: The hub registry key. + """ + with self._lock: + if hub_key not in self._hub_sessions: + return + + # WeakSet automatically removes dead references + active_sessions = len(self._hub_sessions[hub_key]) + + if active_sessions == 0: + # No more sessions using this hub - disconnect and clean up + hub = self._hubs.pop(hub_key, None) + self._hub_sessions.pop(hub_key, None) + self._hub_connect_locks.pop(hub_key, None) + self._hub_plugins_discovered.pop(hub_key, None) + + if hub: + try: + if hub.connected: + # Disconnect without unsubscribing - subscribers already cleaned up + hub.disconnect(unsubscribe=False) + self.logger.debug( + 'Disconnected unused shared EventHub for {0}@{1}'.format( + hub_key[1], hub_key[0] + ) + ) + except Exception as error: + self.logger.debug( + 'Error disconnecting unused EventHub: {0}'.format( + error) + ) + + def get_session_count(self, hub_key): + """Get the number of active sessions for a hub. + + Args: + hub_key: The hub registry key. + + Returns: + int: Number of active sessions using this hub. + """ + with self._lock: + if hub_key in self._hub_sessions: + return len(self._hub_sessions[hub_key]) + return 0 + + def mark_plugins_discovered(self, hub_key): + """Mark that plugins have been discovered for this hub. + + Args: + hub_key: The hub registry key. + """ + with self._lock: + if hub_key in self._hub_plugins_discovered: + self._hub_plugins_discovered[hub_key] = True + + def plugins_discovered(self, hub_key): + """Check if plugins have been discovered for this hub. + + Args: + hub_key: The hub registry key. + + Returns: + bool: True if plugins already discovered for this hub. + """ + with self._lock: + return self._hub_plugins_discovered.get(hub_key, False) + + +# Global singleton instance +_GLOBAL_EVENT_HUB_REGISTRY = EventHubRegistry() + + +def get_event_hub_registry(): + """Get the global EventHub registry instance. + + Returns: + EventHubRegistry: The global registry instance. + """ + return _GLOBAL_EVENT_HUB_REGISTRY diff --git a/source/ftrack_api/session.py b/source/ftrack_api/session.py index c0a4bcf2..7fdf3e30 100644 --- a/source/ftrack_api/session.py +++ b/source/ftrack_api/session.py @@ -40,6 +40,8 @@ import ftrack_api.attribute import ftrack_api.collection import ftrack_api.event.hub +import ftrack_api.event.hub_registry +import ftrack_api.event.hub_proxy import ftrack_api.event.base import ftrack_api.plugin import ftrack_api.inspection @@ -52,6 +54,7 @@ import ftrack_api.logging from ftrack_api.logging import LazyLogMessage as L +import weakref from weakref import WeakMethod @@ -91,6 +94,7 @@ def __init__( cookies=None, headers=None, strict_api=False, + force_new_connection=False, ): """Initialise session. @@ -148,6 +152,18 @@ def __init__( status explicitly. Subscribing to events does *not* require a connected event hub. + .. note:: + + By default, sessions with identical credentials share a single + WebSocket connection to prevent connection exhaustion. Set + *force_new_connection* to True to create a dedicated connection + for this session instead. + + If *force_new_connection* is True, this session will create its own + dedicated EventHub connection instead of sharing one with other sessions + that have the same credentials. This is useful for advanced use cases + where connection isolation is required. Default is False (shared connection). + Enable schema caching by setting *schema_cache_path* to a folder path. If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to determine the path to store cache in. If the environment variable is @@ -277,40 +293,93 @@ def __init__( # Now check compatibility of server based on retrieved information. self.check_server_compatibility() - # Construct event hub and load plugins. - self._event_hub = ftrack_api.event.hub.EventHub( - self._server_url, - self._api_user, - self._api_key, - headers=headers, - cookies=requests.utils.dict_from_cookiejar(self._request.cookies), - ) + # Construct event hub - either dedicated or shared from registry. + # Track subscribers created by this session for cleanup. + self._session_subscribers = [] + self._is_shared_hub = not force_new_connection + + # Set plugin paths before creating EventHub (needed for hub key) + self._plugin_paths = plugin_paths + if self._plugin_paths is None: + self._plugin_paths = os.environ.get("FTRACK_EVENT_PLUGIN_PATH", "").split( + os.pathsep + ) - self._auto_connect_event_hub_thread = None - if auto_connect_event_hub: - # Connect to event hub in background thread so as not to block main - # session usage waiting for event hub connection. + if force_new_connection: + # Create dedicated EventHub for this session (traditional behavior) + self._event_hub_impl = ftrack_api.event.hub.EventHub( + self._server_url, + self._api_user, + self._api_key, + headers=headers, + cookies=requests.utils.dict_from_cookiejar( + self._request.cookies), + ) + + self._auto_connect_event_hub_thread = None + if auto_connect_event_hub: + # Connect to event hub in background thread so as not to block main + # session usage waiting for event hub connection. - # set the connection as initialising from the main thread so that - # we can queue up any potential published messages. - self._event_hub.init_connection() + # set the connection as initialising from the main thread so that + # we can queue up any potential published messages. + self._event_hub_impl.init_connection() - self._auto_connect_event_hub_thread = threading.Thread( - target=self._event_hub.connect + self._auto_connect_event_hub_thread = threading.Thread( + target=self._event_hub_impl.connect + ) + self._auto_connect_event_hub_thread.daemon = True + self._auto_connect_event_hub_thread.start() + else: + # Get shared EventHub from registry (default behavior) + # Multiple sessions with same credentials will share one connection + self._hub_registry = ftrack_api.event.hub_registry.get_event_hub_registry() + + # Get or create hub - plugin_paths included in key to prevent duplication + self._event_hub_impl, self._hub_key = self._hub_registry.get_or_create( + self._server_url, + self._api_user, + self._api_key, + plugin_paths=self._plugin_paths, + headers=headers, + cookies=requests.utils.dict_from_cookiejar( + self._request.cookies), + auto_connect=auto_connect_event_hub, ) - self._auto_connect_event_hub_thread.daemon = True - self._auto_connect_event_hub_thread.start() + + # Register this session with the registry + # WeakSet will automatically create weak reference + self._hub_registry.register_session(self._hub_key, self) + + # Set up cleanup callback for when session is garbage collected + # This will disconnect the hub if no other sessions are using it + self._hub_finalizer = weakref.finalize( + self, + self._hub_registry.on_session_deleted, + self._hub_key + ) + + # Auto-connect is handled by the registry + self._auto_connect_event_hub_thread = None # Register to auto-close session on exit. atexit.register(WeakMethod(self.close)) - self._plugin_paths = plugin_paths - if self._plugin_paths is None: - self._plugin_paths = os.environ.get("FTRACK_EVENT_PLUGIN_PATH", "").split( - os.pathsep - ) + # Discover plugins - but skip if already discovered for this shared hub + should_discover = True + if not force_new_connection and hasattr(self, '_hub_registry'): + # Check if plugins already discovered for this shared hub + if self._hub_registry.plugins_discovered(self._hub_key): + should_discover = False + self.logger.debug( + 'Skipping plugin discovery - already done for shared EventHub' + ) - self._discover_plugins(plugin_arguments=plugin_arguments) + if should_discover: + self._discover_plugins(plugin_arguments=plugin_arguments) + # Mark plugins as discovered for this hub + if not force_new_connection and hasattr(self, '_hub_registry'): + self._hub_registry.mark_plugins_discovered(self._hub_key) # TODO: Make schemas read-only and non-mutable (or at least without # rebuilding types)? @@ -411,8 +480,17 @@ def api_key(self): @property def event_hub(self): - """Return event hub.""" - return self._event_hub + """Return event hub. + + Returns a session-scoped proxy that tracks subscriptions created + by this session. This allows multiple sessions to share the same + underlying EventHub connection while maintaining independent subscriber + lists for proper cleanup. + """ + # Return a proxy that tracks this session's subscribers + return ftrack_api.event.hub_proxy.SessionEventHubProxy( + self._event_hub_impl, self + ) @property def _local_cache(self): @@ -476,12 +554,28 @@ def close(self): self._request.close() self._request = None - try: - self.event_hub.disconnect() - if self._auto_connect_event_hub_thread: - self._auto_connect_event_hub_thread.join() - except ftrack_api.exception.EventHubConnectionError: - pass + # Handle EventHub cleanup based on whether it's shared or dedicated + if self._is_shared_hub: + # Shared hub: only unsubscribe this session's subscribers + # The hub itself will be disconnected by the registry when the last session closes + for subscriber_id in self._session_subscribers[:]: + try: + self._event_hub_impl.unsubscribe(subscriber_id) + except Exception as error: + self.logger.debug( + 'Failed to unsubscribe {0}: {1}'.format( + subscriber_id, error) + ) + self._session_subscribers = [] + # Registry will auto-disconnect hub via weak reference callback if needed + else: + # Dedicated hub: disconnect it completely (traditional behavior) + try: + self._event_hub_impl.disconnect() + if self._auto_connect_event_hub_thread: + self._auto_connect_event_hub_thread.join() + except ftrack_api.exception.EventHubConnectionError: + pass self.logger.debug("Session closed.") @@ -601,7 +695,8 @@ def reset_remote(self, reset_type, entity=None): if entity is not None: payload.update( - {"entity_type": entity.entity_type, "entity_key": entity.get("id")} + {"entity_type": entity.entity_type, + "entity_key": entity.get("id")} ) result = self.call([payload]) @@ -719,7 +814,8 @@ def ensure(self, entity_type, data, identifying_keys=None): # Server does not store microsecond or timezone currently so # need to strip from query. # TODO: When datetime handling improved, update this logic. - value = arrow.get(value).naive.replace(microsecond=0).isoformat() + value = arrow.get(value).naive.replace( + microsecond=0).isoformat() value = '"{0}"'.format(value) criteria.append("{0} is {1}".format(identifying_key, value)) @@ -747,7 +843,8 @@ def ensure(self, entity_type, data, identifying_keys=None): updated = True if updated: - self.logger.debug("Updating existing entity to match new data.") + self.logger.debug( + "Updating existing entity to match new data.") self.commit() return entity @@ -757,7 +854,8 @@ def delete(self, entity): if self.record_operations: self.recorded_operations.push( ftrack_api.operation.DeleteEntityOperation( - entity.entity_type, ftrack_api.inspection.primary_key(entity) + entity.entity_type, ftrack_api.inspection.primary_key( + entity) ) ) @@ -793,12 +891,14 @@ def get(self, entity_type, entity_key): except KeyError: # Query for matching entity. - self.logger.debug("Entity not present in cache. Issuing new query.") + self.logger.debug( + "Entity not present in cache. Issuing new query.") condition = [] for key, value in zip(primary_key_definition, entity_key): condition.append('{0} is "{1}"'.format(key, value)) - expression = "{0} where ({1})".format(entity_type, " and ".join(condition)) + expression = "{0} where ({1})".format( + entity_type, " and ".join(condition)) results = self.query(expression).all() if results: @@ -817,10 +917,12 @@ def _get(self, entity_type, entity_key): cache_key = self.cache_key_maker.key( (str(entity_type), list(map(str, entity_key))) ) - self.logger.debug(L("Checking cache for entity with key {0}", cache_key)) + self.logger.debug( + L("Checking cache for entity with key {0}", cache_key)) entity = self.cache.get(cache_key) self.logger.debug( - L("Retrieved existing entity from cache: {0} at {1}", entity, id(entity)) + L("Retrieved existing entity from cache: {0} at {1}", entity, id( + entity)) ) return entity @@ -899,7 +1001,8 @@ def _merge(self, value, merged): with self._thread_lock: if isinstance(value, ftrack_api.entity.base.Entity): log_debug and self.logger.debug( - "Merging entity into session: {0} at {1}".format(value, id(value)) + "Merging entity into session: {0} at {1}".format( + value, id(value)) ) return self._merge_entity(value, merged=merged) @@ -1005,7 +1108,8 @@ def _merge_entity(self, entity, merged=None): return attached_entity else: log_debug and self.logger.debug( - "Entity not already processed for key {0}.".format(entity_key) + "Entity not already processed for key {0}.".format( + entity_key) ) # Check for existing instance of entity in cache. @@ -1130,7 +1234,8 @@ def populate(self, entities, projections): query = "{0} where {1} in ({2})".format( query, primary_key, - ",".join([str(entity_key[0]) for entity_key in entity_keys]), + ",".join([str(entity_key[0]) + for entity_key in entity_keys]), ) else: query = "{0} where {1} is {2}".format( @@ -1217,10 +1322,12 @@ def commit(self): for payload in batch: if payload["action"] == "create": - created.add((payload["entity_type"], str(payload["entity_key"]))) + created.add((payload["entity_type"], + str(payload["entity_key"]))) elif payload["action"] == "delete": - deleted.add((payload["entity_type"], str(payload["entity_key"]))) + deleted.add((payload["entity_type"], + str(payload["entity_key"]))) created_then_deleted = deleted.intersection(created) if created_then_deleted: @@ -1393,7 +1500,8 @@ def register(session): """ plugin_arguments = plugin_arguments or {} - ftrack_api.plugin.discover(self._plugin_paths, [self], plugin_arguments) + ftrack_api.plugin.discover( + self._plugin_paths, [self], plugin_arguments) def _read_schemas_from_cache(self, schema_cache_path): """Return schemas and schema hash from *schema_cache_path*. @@ -1402,10 +1510,12 @@ def _read_schemas_from_cache(self, schema_cache_path): schemas in JSON format. """ - self.logger.debug(L("Reading schemas from cache {0!r}", schema_cache_path)) + self.logger.debug( + L("Reading schemas from cache {0!r}", schema_cache_path)) if not os.path.exists(schema_cache_path): - self.logger.info(L("Cache file not found at {0!r}.", schema_cache_path)) + self.logger.info( + L("Cache file not found at {0!r}.", schema_cache_path)) return [], None @@ -1425,7 +1535,8 @@ def _write_schemas_to_cache(self, schemas, schema_cache_path): """ self.logger.debug( - L("Updating schema cache {0!r} with new schemas.", schema_cache_path) + L("Updating schema cache {0!r} with new schemas.", + schema_cache_path) ) with open(schema_cache_path, "w") as local_cache_file: @@ -1476,11 +1587,13 @@ def _load_schemas(self, schema_cache_path): self._write_schemas_to_cache(schemas, schema_cache_path) except (IOError, TypeError): self.logger.exception( - L("Failed to update schema cache {0!r}.", schema_cache_path) + L("Failed to update schema cache {0!r}.", + schema_cache_path) ) else: - self.logger.debug(L("Using cached schemas from {0!r}", schema_cache_path)) + self.logger.debug( + L("Using cached schemas from {0!r}", schema_cache_path)) return schemas @@ -1532,7 +1645,8 @@ def _configure_locations(self): # Origin. location = self.create( "Location", - data=dict(name="ftrack.origin", id=ftrack_api.symbol.ORIGIN_LOCATION_ID), + data=dict(name="ftrack.origin", + id=ftrack_api.symbol.ORIGIN_LOCATION_ID), reconstructing=True, ) ftrack_api.mixin( @@ -1567,7 +1681,8 @@ def _configure_locations(self): # Review. location = self.create( "Location", - data=dict(name="ftrack.review", id=ftrack_api.symbol.REVIEW_LOCATION_ID), + data=dict(name="ftrack.review", + id=ftrack_api.symbol.REVIEW_LOCATION_ID), reconstructing=True, ) ftrack_api.mixin( @@ -1582,7 +1697,8 @@ def _configure_locations(self): # Server. location = self.create( "Location", - data=dict(name="ftrack.server", id=ftrack_api.symbol.SERVER_LOCATION_ID), + data=dict(name="ftrack.server", + id=ftrack_api.symbol.SERVER_LOCATION_ID), reconstructing=True, ) ftrack_api.mixin( @@ -1590,7 +1706,8 @@ def _configure_locations(self): ftrack_api.entity.location.ServerLocationMixin, name="ServerLocation", ) - location.accessor = ftrack_api.accessor.server._ServerAccessor(session=self) + location.accessor = ftrack_api.accessor.server._ServerAccessor( + session=self) location.structure = ftrack_api.structure.entity_id.EntityIdStructure() location.priority = 150 @@ -1617,7 +1734,8 @@ def _configure_locations(self): def call(self, data): """Make request to server with *data* batch describing the actions.""" url = self._server_url + "/api" - headers = {"content-type": "application/json", "accept": "application/json"} + headers = {"content-type": "application/json", + "accept": "application/json"} data = self.encode(data, entity_attribute_strategy="modified_only") self.logger.debug(L("Calling server {0} with {1!r}", url, data)) @@ -1630,7 +1748,8 @@ def call(self, data): data=data, timeout=self.request_timeout, ) - self.logger.debug(L("Call took: {0}", response.elapsed.total_seconds())) + self.logger.debug( + L("Call took: {0}", response.elapsed.total_seconds())) self.logger.debug(L("Response: {0!r}", response.text)) result = self.decode(response.text) @@ -1694,7 +1813,8 @@ def encode(self, data, entity_attribute_strategy="set_only"): raise ValueError( 'Unsupported entity_attribute_strategy "{0}". Must be one of ' "{1}".format( - entity_attribute_strategy, ", ".join(entity_attribute_strategies) + entity_attribute_strategy, ", ".join( + entity_attribute_strategies) ) ) @@ -1801,7 +1921,8 @@ def _decode(self, item): item = arrow.get(item["value"]) elif "__entity_type__" in item: - item = self._create(item["__entity_type__"], item, reconstructing=True) + item = self._create( + item["__entity_type__"], item, reconstructing=True) return item @@ -1817,7 +1938,8 @@ def _get_locations(self, filter_inaccessible=True): # Filter. if filter_inaccessible: - locations = [location for location in locations if location.accessor] + locations = [ + location for location in locations if location.accessor] # Sort by priority. locations = sorted(locations, key=lambda location: location.priority) @@ -1918,7 +2040,8 @@ def create_component(self, path, data=None, location="auto"): "ftrackreview-webm", "ftrackreview-image", ): - location = self.get("Location", ftrack_api.symbol.REVIEW_LOCATION_ID) + location = self.get( + "Location", ftrack_api.symbol.REVIEW_LOCATION_ID) else: location = self.pick_location() @@ -1962,7 +2085,8 @@ def retrieve_file_type(_path): if container_size is not None: if len(collection.indexes) > 0: - member_size = int(round(container_size / len(collection.indexes))) + member_size = int( + round(container_size / len(collection.indexes))) for item in collection: member_sizes[item] = member_size @@ -2003,7 +2127,8 @@ def retrieve_file_type(_path): origin_location = self.get( "Location", ftrack_api.symbol.ORIGIN_LOCATION_ID ) - location.add_component(container, origin_location, recursive=True) + location.add_component( + container, origin_location, recursive=True) return container @@ -2017,7 +2142,8 @@ def _create_component(self, entity_type, path, data, location): # Add to special origin location so that it is possible to add to other # locations. - origin_location = self.get("Location", ftrack_api.symbol.ORIGIN_LOCATION_ID) + origin_location = self.get( + "Location", ftrack_api.symbol.ORIGIN_LOCATION_ID) origin_location.add_component(component, path, recursive=False) if location: @@ -2073,7 +2199,8 @@ def get_component_availabilities(self, components, locations=None): # Perform queries. if standard_components: - self.populate(standard_components, "component_locations.location_id") + self.populate(standard_components, + "component_locations.location_id") if container_components: self.populate( @@ -2199,7 +2326,8 @@ def encode_media(self, media, version_id=None, keep_original="auto"): """ if isinstance(media, str): # Media is a path to a file. - server_location = self.get("Location", ftrack_api.symbol.SERVER_LOCATION_ID) + server_location = self.get( + "Location", ftrack_api.symbol.SERVER_LOCATION_ID) if keep_original == "auto": keep_original = False @@ -2221,7 +2349,8 @@ def encode_media(self, media, version_id=None, keep_original="auto"): keep_original = True else: - raise ValueError("Unable to encode media of type: {0}".format(type(media))) + raise ValueError( + "Unable to encode media of type: {0}".format(type(media))) operation = { "action": "encode_media", @@ -2303,7 +2432,8 @@ def send_user_invites(self, users): operations = [] for user in users: - operations.append({"action": "send_user_invite", "user_id": user["id"]}) + operations.append( + {"action": "send_user_invite", "user_id": user["id"]}) try: self.call(operations) diff --git a/test/unit/event/test_hub_registry.py b/test/unit/event/test_hub_registry.py new file mode 100644 index 00000000..d3e4e78c --- /dev/null +++ b/test/unit/event/test_hub_registry.py @@ -0,0 +1,274 @@ +# :coding: utf-8 +# :copyright: Copyright (c) 2024 ftrack + +import pytest +import weakref + +import ftrack_api +import ftrack_api.event.hub +import ftrack_api.event.hub_registry +import ftrack_api.event.hub_proxy + + +@pytest.fixture() +def registry(): + """Return a fresh EventHub registry instance for testing.""" + # Create a new registry instance for isolation between tests + return ftrack_api.event.hub_registry.EventHubRegistry() + + +def test_get_or_create_returns_same_hub_for_same_credentials(registry): + """Return same EventHub instance for identical credentials.""" + hub1 = registry.get_or_create( + 'https://test.ftrack.com', + 'user1', + 'key1', + auto_connect=False + ) + + hub2 = registry.get_or_create( + 'https://test.ftrack.com', + 'user1', + 'key1', + auto_connect=False + ) + + assert hub1 is hub2 + + +def test_get_or_create_returns_different_hub_for_different_credentials(registry): + """Return different EventHub instances for different credentials.""" + hub1 = registry.get_or_create( + 'https://test.ftrack.com', + 'user1', + 'key1', + auto_connect=False + ) + + hub2 = registry.get_or_create( + 'https://test.ftrack.com', + 'user2', # Different user + 'key2', + auto_connect=False + ) + + assert hub1 is not hub2 + + +def test_get_or_create_returns_different_hub_for_different_server(registry): + """Return different EventHub instances for different servers.""" + hub1 = registry.get_or_create( + 'https://test.ftrack.com', + 'user1', + 'key1', + auto_connect=False + ) + + hub2 = registry.get_or_create( + 'https://other.ftrack.com', # Different server + 'user1', + 'key1', + auto_connect=False + ) + + assert hub1 is not hub2 + + +def test_register_session_tracks_active_sessions(registry): + """Track active sessions using a hub.""" + hub_key = ('https://test.ftrack.com', 'user1', 'key1') + + # Create hub + registry.get_or_create(*hub_key, auto_connect=False) + + # Create mock sessions + class MockSession: + pass + + session1 = MockSession() + session2 = MockSession() + + # Register sessions + registry.register_session(hub_key, weakref.ref(session1)) + registry.register_session(hub_key, weakref.ref(session2)) + + # Check count + count = registry.get_session_count(hub_key) + assert count == 2 + + +def test_on_session_deleted_cleans_up_unused_hub(registry): + """Disconnect and clean up hub when last session is deleted.""" + hub_key = ('https://test.ftrack.com', 'user1', 'key1') + + # Create hub + hub = registry.get_or_create(*hub_key, auto_connect=False) + + # Mock the disconnect method to track if it was called + disconnect_called = [] + original_disconnect = hub.disconnect + + def mock_disconnect(*args, **kwargs): + disconnect_called.append(True) + # Don't actually disconnect in test + + hub.disconnect = mock_disconnect + + # Create and register a mock session + class MockSession: + pass + + session = MockSession() + session_ref = weakref.ref(session) + registry.register_session(hub_key, session_ref) + + # Delete the session + del session + + # Trigger cleanup + registry.on_session_deleted(hub_key) + + # Hub should be removed from registry + assert hub_key not in registry._hubs + assert hub_key not in registry._hub_sessions + + +def test_session_uses_shared_hub_by_default(session): + """Session uses shared EventHub by default.""" + assert session._is_shared_hub is True + assert hasattr(session, '_hub_registry') + assert hasattr(session, '_hub_key') + + +def test_session_with_force_new_connection_creates_dedicated_hub(session): + """Session with force_new_connection=True creates dedicated EventHub.""" + # Create a new session with force_new_connection + dedicated_session = ftrack_api.Session( + server_url=session.server_url, + api_user=session.api_user, + api_key=session.api_key, + auto_connect_event_hub=False, + force_new_connection=True + ) + + try: + assert dedicated_session._is_shared_hub is False + assert dedicated_session._event_hub_impl is not session._event_hub_impl + finally: + dedicated_session.close() + + +def test_multiple_sessions_share_same_hub(): + """Multiple sessions with same credentials share one EventHub.""" + # Note: Using environment variables for credentials from conftest + session1 = ftrack_api.Session(auto_connect_event_hub=False) + session2 = ftrack_api.Session(auto_connect_event_hub=False) + session3 = ftrack_api.Session(auto_connect_event_hub=False) + + try: + # All should share the same underlying EventHub + assert session1._event_hub_impl is session2._event_hub_impl + assert session2._event_hub_impl is session3._event_hub_impl + + # But should have independent subscriber lists + assert session1._session_subscribers is not session2._session_subscribers + assert session2._session_subscribers is not session3._session_subscribers + finally: + session1.close() + session2.close() + session3.close() + + +def test_session_event_hub_property_returns_proxy(session): + """Session.event_hub property returns SessionEventHubProxy.""" + event_hub_proxy = session.event_hub + + assert isinstance( + event_hub_proxy, + ftrack_api.event.hub_proxy.SessionEventHubProxy + ) + assert event_hub_proxy._session is session + assert event_hub_proxy._event_hub is session._event_hub_impl + + +def test_proxy_delegates_attributes_to_hub(session): + """SessionEventHubProxy delegates attributes to underlying EventHub.""" + proxy = session.event_hub + hub = session._event_hub_impl + + # Test attribute delegation + assert proxy.id == hub.id + assert proxy.logger == hub.logger + assert proxy.connected == hub.connected + + +def test_proxy_tracks_session_subscribers(session): + """SessionEventHubProxy tracks subscribers for the session.""" + def callback(event): + pass + + # Subscribe via proxy + subscriber_id = session.event_hub.subscribe('topic=test', callback) + + # Should be tracked in session + assert subscriber_id in session._session_subscribers + + # Unsubscribe + session.event_hub.unsubscribe(subscriber_id) + + # Should be removed from tracking + assert subscriber_id not in session._session_subscribers + + +def test_session_close_only_unsubscribes_own_subscribers(): + """Session.close() only removes its own subscribers from shared hub.""" + session1 = ftrack_api.Session(auto_connect_event_hub=False) + session2 = ftrack_api.Session(auto_connect_event_hub=False) + + def callback1(event): + pass + + def callback2(event): + pass + + try: + # Subscribe from both sessions + subscriber_id1 = session1.event_hub.subscribe('topic=test', callback1) + subscriber_id2 = session2.event_hub.subscribe('topic=test', callback2) + + # Both should be tracked separately + assert subscriber_id1 in session1._session_subscribers + assert subscriber_id2 in session2._session_subscribers + + # Close session1 + session1.close() + + # session1's subscriber should be gone, but session2's should remain + assert subscriber_id2 in session2._session_subscribers + + finally: + if not session1._closed: + session1.close() + if not session2._closed: + session2.close() + + +def test_dedicated_hub_disconnects_on_close(session): + """Session with dedicated hub fully disconnects on close.""" + dedicated_session = ftrack_api.Session( + server_url=session.server_url, + api_user=session.api_user, + api_key=session.api_key, + auto_connect_event_hub=False, + force_new_connection=True + ) + + hub = dedicated_session._event_hub_impl + + # Close the session + dedicated_session.close() + + # The hub should have been disconnected + # (In practice we can't easily test this without mocking, + # but we verify the code path was taken) + assert dedicated_session._closed is True