Skip to content

Commit 50d0d31

Browse files
author
Greg Poirier
committed
Add SQSServiceSensor for non-polling SQS sensor
This adds a SQS Sensor with its own polling loop so that we can consume messages from one or more SQS queues as quickly as possible without relying on StackStorm to trigger a poll interval.
1 parent 777aa4a commit 50d0d31

File tree

2 files changed

+170
-0
lines changed

2 files changed

+170
-0
lines changed

sensors/sqs_service_sensor.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
"""
2+
This is generic SQS Sensor using boto3 api to fetch messages from sqs queue.
3+
After receiving a message it's content is passed as payload to a trigger 'aws.sqs_new_message'
4+
This sensor can be configured either by using config.yaml within a pack or by creating
5+
following values in datastore:
6+
- aws.input_queues (list queues as comma separated string: first_queue,second_queue)
7+
- aws.aws_access_key_id
8+
- aws.aws_secret_access_key
9+
- aws.region
10+
- aws.max_number_of_messages (must be between 1 - 10)
11+
For configuration in config.yaml with config like this
12+
setup:
13+
aws_access_key_id:
14+
aws_access_key_id:
15+
region:
16+
sqs_sensor:
17+
input_queues:
18+
- first_queue
19+
- second_queue
20+
sqs_other:
21+
max_number_of_messages: 1
22+
If any value exist in datastore it will be taken instead of any value in config.yaml
23+
"""
24+
25+
import six
26+
import json
27+
from boto3.session import Session
28+
from botocore.exceptions import ClientError
29+
from botocore.exceptions import NoRegionError
30+
from botocore.exceptions import NoCredentialsError
31+
from botocore.exceptions import EndpointConnectionError
32+
33+
from st2reactor.sensor.base import Sensor
34+
35+
36+
class AWSSQSServiceSensor(Sensor):
37+
def __init__(self, sensor_service, config=None):
38+
super(AWSSQSServiceSensor, self).__init__(sensor_service=sensor_service, config=config)
39+
40+
def setup(self):
41+
self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
42+
43+
self.session = None
44+
self.sqs_res = None
45+
46+
def run(self):
47+
# setting SQS ServiceResource object from the parameter of datastore or configuration file
48+
self._may_setup_sqs()
49+
50+
while True:
51+
for queue in self.input_queues:
52+
msgs = self._receive_messages(queue=self._get_queue_by_name(queue),
53+
num_messages=self.max_number_of_messages)
54+
for msg in msgs:
55+
if msg:
56+
payload = {"queue": queue, "body": json.loads(msg.body)}
57+
self._sensor_service.dispatch(trigger="aws.sqs_new_message",
58+
payload=payload)
59+
msg.delete()
60+
61+
def cleanup(self):
62+
pass
63+
64+
def add_trigger(self, trigger):
65+
# This method is called when trigger is created
66+
pass
67+
68+
def update_trigger(self, trigger):
69+
# This method is called when trigger is updated
70+
pass
71+
72+
def remove_trigger(self, trigger):
73+
pass
74+
75+
def _get_config_entry(self, key, prefix=None):
76+
''' Get configuration values either from Datastore or config file. '''
77+
config = self.config
78+
if prefix:
79+
config = self._config.get(prefix, {})
80+
81+
value = self._sensor_service.get_value('aws.%s' % (key), local=False)
82+
if not value:
83+
value = config.get(key, None)
84+
85+
if not value and config.get('setup', None):
86+
value = config['setup'].get(key, None)
87+
88+
return value
89+
90+
def _may_setup_sqs(self):
91+
queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor')
92+
93+
# XXX: This is a hack as from datastore we can only receive a string while
94+
# from config.yaml we can receive a list
95+
if isinstance(queues, six.string_types):
96+
self.input_queues = [x.strip() for x in queues.split(',')]
97+
elif isinstance(queues, list):
98+
self.input_queues = queues
99+
else:
100+
self.input_queues = []
101+
102+
self.aws_access_key = self._get_config_entry('aws_access_key_id')
103+
self.aws_secret_key = self._get_config_entry('aws_secret_access_key')
104+
self.aws_region = self._get_config_entry('region')
105+
106+
self.max_number_of_messages = self._get_config_entry('max_number_of_messages',
107+
prefix='sqs_other')
108+
109+
# checker configuration is update, or not
110+
def _is_same_credentials():
111+
c = self.session.get_credentials()
112+
return c is not None and \
113+
c.access_key == self.aws_access_key and \
114+
c.secret_key == self.aws_secret_key and \
115+
self.session.region_name == self.aws_region
116+
117+
if self.session is None or not _is_same_credentials():
118+
self._setup_sqs()
119+
120+
def _setup_sqs(self):
121+
''' Setup Boto3 structures '''
122+
self._logger.debug('Setting up SQS resources')
123+
self.session = Session(aws_access_key_id=self.aws_access_key,
124+
aws_secret_access_key=self.aws_secret_key,
125+
region_name=self.aws_region)
126+
127+
try:
128+
self.sqs_res = self.session.resource('sqs')
129+
except NoRegionError:
130+
self._logger.warning("The specified region '%s' is invalid", self.aws_region)
131+
132+
def _get_queue_by_name(self, queueName):
133+
''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
134+
try:
135+
return self.sqs_res.get_queue_by_name(QueueName=queueName)
136+
except ClientError as e:
137+
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
138+
self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
139+
return self.sqs_res.create_queue(QueueName=queueName)
140+
elif e.response['Error']['Code'] == 'InvalidClientTokenId':
141+
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
142+
else:
143+
raise
144+
except NoCredentialsError as e:
145+
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
146+
except EndpointConnectionError as e:
147+
self._logger.warning(e)
148+
149+
def _receive_messages(self, queue, num_messages, wait_time=2):
150+
''' Receive a message from queue and return it. '''
151+
if queue:
152+
return queue.receive_messages(WaitTimeSeconds=wait_time,
153+
MaxNumberOfMessages=num_messages)
154+
else:
155+
return []

sensors/sqs_service_sensor.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
class_name: "AWSSQSServiceSensor"
3+
entry_point: "sqs_service_sensor.py"
4+
description: "Service Sensor which monitors a SQS queue for new messages"
5+
trigger_types:
6+
-
7+
name: "sqs_new_message"
8+
description: "Trigger which indicates that a new message has arrived"
9+
payload_schema:
10+
type: "object"
11+
properties:
12+
queue:
13+
type: "string"
14+
body:
15+
type: "object"

0 commit comments

Comments
 (0)