Skip to content

Commit 88b8aee

Browse files
committed
Module refactoring
1 parent 6978f34 commit 88b8aee

File tree

7 files changed

+77
-24
lines changed

7 files changed

+77
-24
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import paho.mqtt.client as mqtt
2+
3+
class MQTTWriter:
4+
def __init__(self, broker_address:str, broker_port:int, topic:str, username:str, password:str):
5+
self.client = mqtt.Client()
6+
self.client.username_pw_set(username, password)
7+
self.client.connect(broker_address, broker_port, 60)
8+
self.topic = topic
9+
10+
def close(self):
11+
self.client.disconnect()
12+
13+
def write(self, message:str):
14+
if message and message != "":
15+
self.client.publish(self.topic, message)
16+
17+
def consume(self, item):
18+
if isinstance(item, str):
19+
self.write(item)
20+
return item
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
name: mqtt_sink
2+
module_description: "MQTT output writer"
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
# Database storage module
22

33
TODO: document usage
4+
5+
6+
# Local testing
7+
8+
Startup containarized infrastructure using
9+
10+
```sh
11+
docker compose -f module_compose.yaml up -d
12+
```

appslab_modules/steps/db_storage/database_storage.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def __init__(self, host:str = "db_storage", port:int = 8086):
1717
self._connect()
1818

1919
def load_default_infra(self):
20-
with open("compose.yaml", "r") as file:
20+
with open("module_compose.yaml", "r") as file:
2121
config = yaml.safe_load(file)
2222
return config
2323

@@ -27,9 +27,6 @@ def _connect(self):
2727
self.client = client
2828
self.write_api = client.write_api(write_precision=WritePrecision.MS)
2929
self.query_api = client.query_api()
30-
# Test connection
31-
query = f'from(bucket: "{self.bucket}") |> range(start: -1m)'
32-
self.query_api.query(query)
3330

3431
except Exception as e:
3532
print(f"Error: {e}")
@@ -38,7 +35,7 @@ def close(self):
3835
self.client.close()
3936

4037
class DatabasePersistence(_InfluxPersistence):
41-
def __init__(self, host:str, port:int):
38+
def __init__(self, host:str = "db_storage", port:int = 8086):
4239
super().__init__(host, port)
4340

4441
def write_sample(self, measure :str, value :any, ts :int = 0, measurement_name :str = "arduino"):
@@ -61,14 +58,14 @@ def process(self, item):
6158
return item
6259

6360
class DatabaseRetrieval(_InfluxPersistence):
64-
def __init__(self, host:str, port:int):
61+
def __init__(self, host:str = "db_storage", port:int = 8086):
6562
super().__init__(host, port)
6663

6764
def read_last_sample(self, measure :str, measurement_name :str = "arduino") -> tuple:
6865
try:
6966
query = f"""
7067
from(bucket: "{self.bucket}")
71-
|> range(start: -1d) # look back 1 day
68+
|> range(start: -1d)
7269
|> filter(fn: (r) => r["_measurement"] == "{measurement_name}")
7370
|> filter(fn: (r) => r["_field"] == "{measure}")
7471
|> last()
@@ -79,7 +76,7 @@ def read_last_sample(self, measure :str, measurement_name :str = "arduino") -> t
7976
if result:
8077
for table in result:
8178
for record in table.records:
82-
return measure, record.get_time(), record.get_value()
79+
return measure, record.get_time().isoformat(), record.get_value()
8380
else:
8481
return None
8582

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import unittest
2+
from database_storage import DatabasePersistence, DatabaseRetrieval # Assuming this is in database_storage.py
3+
4+
# NOTE: before test, start dependencies with "docker compose -f module_compose.yaml up -d"
5+
class TestDatabaseStorage(unittest.TestCase):
6+
7+
def setUp(self):
8+
"""Set up test fixtures."""
9+
self.db = DatabasePersistence(host="localhost")
10+
self.dbread = DatabaseRetrieval(host="localhost")
11+
12+
def tearDown(self):
13+
"""Clean up after tests."""
14+
self.db.close()
15+
self.dbread.close()
16+
17+
def test_write_and_read_string(self):
18+
"""Test writing and reading a string sample."""
19+
self.db.write_sample("test_measurement_str", "test_string")
20+
val = self.dbread.read_last_sample("test_measurement_str")
21+
self.assertEqual(val[2], "test_string")
22+
self.assertEqual(val[0], "test_measurement_str")
23+
24+
def test_write_and_read_integer(self):
25+
"""Test writing and reading an integer sample."""
26+
self.db.write_sample("test_measurement", 1111)
27+
self.db.write_sample("test_measurement", 1234)
28+
val = self.dbread.read_last_sample("test_measurement")
29+
self.assertEqual(val[2], 1234)
30+
31+
def test_read_nonexistent_measurement(self):
32+
"""Test reading a non-existent measurement."""
33+
val = self.dbread.read_last_sample("nonexistent_measurement")
34+
self.assertIsNone(val) #or whatever the expected behavior is in your case.
35+
36+
if __name__ == '__main__':
37+
unittest.main()

appslab_modules/steps/db_storage/test_database_storage.py

Lines changed: 0 additions & 16 deletions
This file was deleted.

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ dependencies = [
2525
db_storage = [
2626
"influxdb_client>=1.48.0"
2727
]
28+
mqtt = [
29+
"paho_mqtt>=2.1.0"
30+
]
2831
all = [
2932
"appslab_modules[db_storage]",
33+
"appslab_modules[mqtt]",
3034
]
3135

3236
[project.urls]

0 commit comments

Comments
 (0)