Skip to content

Commit eb655e4

Browse files
committed
added json parser step
1 parent 88b8aee commit eb655e4

File tree

5 files changed

+92
-3
lines changed

5 files changed

+92
-3
lines changed
Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,65 @@
11
import paho.mqtt.client as mqtt
2+
import json
3+
import logging
4+
import time
5+
6+
logger = logging.getLogger(__name__)
27

38
class MQTTWriter:
4-
def __init__(self, broker_address:str, broker_port:int, topic:str, username:str, password:str):
5-
self.client = mqtt.Client()
9+
def __init__(self, broker_address:str, broker_port:int, topic:str, username:str, password:str, client_id:str = "Arduino_MQTTWriter"):
10+
self.topic = topic
11+
self.client = mqtt.Client(client_id)
612
self.client.username_pw_set(username, password)
13+
14+
def on_connect(client, userdata, flags, rc):
15+
if rc != 0:
16+
logger.error("Failed to connect, return code %d\n", rc)
17+
18+
FIRST_RECONNECT_DELAY = 1
19+
RECONNECT_RATE = 2
20+
MAX_RECONNECT_COUNT = 12
21+
MAX_RECONNECT_DELAY = 60
22+
23+
def on_disconnect(client, userdata, rc):
24+
logging.warning("Disconnected with result code: %s", rc)
25+
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
26+
while reconnect_count < MAX_RECONNECT_COUNT:
27+
logging.info("Reconnecting in %d seconds...", reconnect_delay)
28+
time.sleep(reconnect_delay)
29+
30+
try:
31+
client.reconnect()
32+
logging.info("Reconnected successfully!")
33+
return
34+
except Exception as err:
35+
logging.error("%s. Reconnect failed. Retrying...", err)
36+
37+
reconnect_delay *= RECONNECT_RATE
38+
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
39+
reconnect_count += 1
40+
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
41+
42+
self.client.on_connect = on_connect
43+
self.client.on_disconnect = on_disconnect
44+
745
self.client.connect(broker_address, broker_port, 60)
8-
self.topic = topic
46+
self.client.loop_start()
947

1048
def close(self):
49+
self.client.loop_stop()
1150
self.client.disconnect()
1251

1352
def write(self, message:str):
1453
if message and message != "":
1554
self.client.publish(self.topic, message)
1655

1756
def consume(self, item):
57+
if item == None:
58+
return None
59+
1860
if isinstance(item, str):
1961
self.write(item)
62+
elif isinstance(item, dict) and len(item) > 0:
63+
self.write(json.dumps(item))
64+
2065
return item

appslab_modules/steps/db_storage/test/test_database_storage.py renamed to appslab_modules/steps/db_storage/test_database_storage.py

File renamed without changes.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import json
2+
import logging
3+
4+
logger = logging.getLogger(__name__)
5+
6+
class JSONParser:
7+
def __init__(self, silent:bool = False):
8+
self.silent = silent
9+
10+
def parse(self, item:str) -> dict:
11+
try:
12+
return json.loads(item)
13+
except Exception as e:
14+
if self.silent == False:
15+
logger.error(f"Error parsing content: {e}")
16+
return None
17+
18+
def process(self, item):
19+
if isinstance(item, str):
20+
return self.parse(item)
21+
22+
return item # No processing needed
23+
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
name: json_parser
2+
module_description: "JSON parser module. Parses JSON data and outputs it as a dictionary. Drop strings that cannot be converted to JSON."
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import unittest
2+
from __init__ import JSONParser
3+
4+
class TestJsonParser(unittest.TestCase):
5+
6+
def test_successful_parser(self):
7+
"""Test successfull parse."""
8+
ps = JSONParser()
9+
out = ps.parse('{"test_key": "test_val"}')
10+
self.assertEqual(out["test_key"], "test_val")
11+
12+
def test_drop_data_parser(self):
13+
"""Test parse of not valid json data."""
14+
ps = JSONParser(silent=True)
15+
out = ps.parse('not json text')
16+
self.assertEqual(out,None)
17+
18+
if __name__ == '__main__':
19+
unittest.main()

0 commit comments

Comments
 (0)