From 032530c310077ae49b543937a05a601e18cead3d Mon Sep 17 00:00:00 2001 From: Luis Fernando Gomes Date: Wed, 10 Jun 2020 14:34:40 -0300 Subject: [PATCH] Refactor publisher error handing --- queue_manager/pubsub_publisher.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/queue_manager/pubsub_publisher.py b/queue_manager/pubsub_publisher.py index a5b96b3..5488043 100644 --- a/queue_manager/pubsub_publisher.py +++ b/queue_manager/pubsub_publisher.py @@ -14,11 +14,11 @@ from . import QueuePublisher try: - from google.api_core.exceptions import GoogleAPICallError + from google.api_core.exceptions import NotFound from google.cloud import pubsub_v1 from google.oauth2.service_account import Credentials except ImportError: - raise Exception("You need to install google-cloud-pubsub") + raise Exception('You need to install google-cloud-pubsub') logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ class PubsubPublisher(QueuePublisher): _assertion_ttl = 30 scope = 'https://www.googleapis.com/auth/pubsub' - def __init__(self, project_id, service_account, topic_name="ping"): + def __init__(self, project_id, service_account, topic_name='ping'): logger.debug('Init PubsubPublisher ...') self.project_id = project_id self.service_account = service_account @@ -49,19 +49,15 @@ def __init__(self, project_id, service_account, topic_name="ping"): def assert_topic(self, topic_name): if time() - self._assertion_ttl < self._last_assertion[topic_name]: return - self._last_assertion[topic_name] = time() + self._last_assertion[topic_name] = 0 try: logger.debug('Getting topic %s', topic_name) self.client.get_topic(topic_name) logger.debug('Nice, topic already exists %s', topic_name) - except GoogleAPICallError as error: - if error.code == 404: - logger.info('Topic doesnt exist, creating a new topic %s', topic_name) - self.client.create_topic(topic_name) - else: - self._last_assertion[topic_name] = 0 - logger.error('An error occurred while getting the topic %s, reason: %s', topic_name, error) - raise error + self._last_assertion[topic_name] = time() + except NotFound: + logger.warning('Topic doesnt exist, creating a new topic %s', topic_name) + self.client.create_topic(topic_name) def _get_credentials(self): if isinstance(self.service_account, Mapping): @@ -75,9 +71,9 @@ def setup_client(self): def ping(self): self.assert_topic(self.topic_name_ping) - response = self.client.publish(topic=self.topic_name_ping, data=b"OK") + response = self.client.publish(topic=self.topic_name_ping, data=b'OK') message_id = response.result(timeout=5000) - logger.debug(f"Response from PubSub: {message_id}") + logger.debug('Response from PubSub: %s', message_id) return message_id def publish_message(self, message, message_properties=None):