Skip to content

Commit 98aac1c

Browse files
committed
Some updates
1 parent 11dd93a commit 98aac1c

File tree

6 files changed

+80
-16
lines changed

6 files changed

+80
-16
lines changed

appslab_modules/sinks/mqtt/__init__.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,33 @@
22
import json
33
import logging
44
import time
5+
import uuid
56

67
logger = logging.getLogger(__name__)
78

8-
class MQTTWriter:
9-
def __init__(self, broker_address:str, broker_port:int, topic:str, username:str, password:str, client_id:str = "Arduino_MQTTWriter"):
9+
def generate_client_id():
10+
return "Arduino_MQTTSink-" + str(uuid.uuid4())
11+
12+
class MQTTSink:
13+
def __init__(self, broker_address:str, broker_port:int, topic:str, username:str, password:str, client_id:str = ""):
1014
self.topic = topic
11-
self.client = mqtt.Client(client_id)
12-
self.client.username_pw_set(username, password)
15+
if not client_id or client_id == "":
16+
client_id = generate_client_id()
17+
self.client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
18+
if username and password:
19+
self.client.username_pw_set(username, password)
1320

14-
def on_connect(client, userdata, flags, rc):
15-
if rc != 0:
16-
logger.error("Failed to connect, return code %d\n", rc)
21+
def on_connect(client, userdata, flags, reason_code, properties):
22+
if reason_code != 0:
23+
logger.error("Failed to connect, return code %d\n", reason_code)
1724

1825
FIRST_RECONNECT_DELAY = 1
1926
RECONNECT_RATE = 2
2027
MAX_RECONNECT_COUNT = 12
2128
MAX_RECONNECT_DELAY = 60
2229

23-
def on_disconnect(client, userdata, rc):
24-
logging.warning("Disconnected with result code: %s", rc)
30+
def on_disconnect(client, userdata, flags, reason_code, properties):
31+
logging.info("Disconnected with result code: %s", reason_code)
2532
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
2633
while reconnect_count < MAX_RECONNECT_COUNT:
2734
logging.info("Reconnecting in %d seconds...", reconnect_delay)
@@ -51,7 +58,8 @@ def close(self):
5158

5259
def write(self, message:str):
5360
if message and message != "":
54-
self.client.publish(self.topic, message)
61+
return self.client.publish(self.topic, message)
62+
return None
5563

5664
def consume(self, item):
5765
if item == None:
@@ -60,6 +68,4 @@ def consume(self, item):
6068
if isinstance(item, str):
6169
self.write(item)
6270
elif isinstance(item, dict) and len(item) > 0:
63-
self.write(json.dumps(item))
64-
65-
return item
71+
self.write(json.dumps(item))
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import unittest
2+
from __init__ import MQTTSink
3+
4+
class TestMQTTSink(unittest.TestCase):
5+
6+
def setUp(self):
7+
"""Set up test fixtures."""
8+
self.mqtt = MQTTSink(broker_address = "192.168.1.11", broker_port = 1883, topic = "/test/tp1", username = "admin", password = "admin")
9+
10+
def tearDown(self):
11+
"""Clean up after tests."""
12+
self.mqtt.close()
13+
14+
def test_write_to_topic(self):
15+
"""Test successfull write to topic."""
16+
msginfo = self.mqtt.write("Hello, World!")
17+
print(msginfo)
18+
msginfo.wait_for_publish(2)
19+
self.assertTrue(msginfo.is_published())
20+
21+
22+
if __name__ == '__main__':
23+
unittest.main()
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
2+
import logging
3+
4+
logger = logging.getLogger(__name__)
5+
6+
class LogStep:
7+
def __init__(self):
8+
pass
9+
10+
def process(self, item):
11+
if item == None:
12+
return None
13+
14+
logger.info(item)
15+
16+
return item
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
name: log_step
2+
module_description: "Logger step"

design/declarative.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,24 @@ pipe.add_sink(DBStore())
2929
pipe.process()
3030
```
3131

32-
### Adapter functions
32+
### Adapters - explicit mapping steps
33+
```python
34+
35+
def user_mapping_function(input :dic) - dic
36+
# User defined logic
37+
return input
38+
39+
pipe = Pipeline()
40+
pipe.add_source(UserInputText())
41+
pipe.add_source(JSONParser()) # To properly filter input - or
42+
pipe.add_source(ExecuteFunction(user_mapping_function)) # To let use inline a used defined function
43+
pipe.add_step(WeatherForecast())
44+
pipe.add_step(print)
45+
pipe.add_sink(DBStore())
46+
pipe.process()
47+
```
48+
49+
### Adapter functions - inlining
3350
```python
3451
pipe = Pipeline()
3552
pipe.add_source(UserInputText(), map = lambda x: x.strip())

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ dependencies = [
2323

2424
[project.optional-dependencies]
2525
db_storage = [
26-
"influxdb_client>=1.48.0"
26+
"influxdb_client>=1.48.0",
2727
]
2828
mqtt = [
29-
"paho_mqtt>=2.1.0"
29+
"paho_mqtt>=2.1.0",
3030
]
3131
all = [
3232
"appslab_modules[db_storage]",

0 commit comments

Comments
 (0)