@@ -27,10 +27,9 @@ def _connect(self):
2727 self .client = client
2828 self .write_api = client .write_api (write_precision = WritePrecision .MS )
2929 self .query_api = client .query_api ()
30- #Test connection
30+ # Test connection
3131 query = f'from(bucket: "{ self .bucket } ") |> range(start: -1m)'
3232 self .query_api .query (query )
33- print ("Connection and query successful!" )
3433
3534 except Exception as e :
3635 print (f"Error: { e } " )
@@ -42,23 +41,30 @@ class DatabasePersistence(_InfluxPersistence):
4241 def __init__ (self , host :str , port :int ):
4342 super ().__init__ (host , port )
4443
45- def write_sample (self , measurement_name : str , measure :str , value :any , ts :int = 0 ):
44+ def write_sample (self , measure :str , value :any , ts :int = 0 , measurement_name : str = "arduino" ):
4645 try :
4746 if ts <= 0 :
4847 ts = int (time .time_ns () / 1000000 )
4948 point = Point (measurement_name ).field (measure , value ).time (ts , WritePrecision .MS )
5049
5150 self .write_api .write (bucket = self .bucket , record = point )
52- print ("Data written successfully!" )
5351
5452 except Exception as e :
5553 print (f"Error: { e } " )
5654
55+ def process (self , item ):
56+ if isinstance (item , dict ):
57+ for k , v in item .items ():
58+ self .write_sample (k , v )
59+
60+ # After processing, return the original item
61+ return item
62+
5763class DatabaseRetrieval (_InfluxPersistence ):
5864 def __init__ (self , host :str , port :int ):
5965 super ().__init__ (host , port )
6066
61- def read_last_sample (self , measurement_name :str , measure :str ) :
67+ def read_last_sample (self , measure :str , measurement_name :str = "arduino" ) -> tuple :
6268 try :
6369 query = f"""
6470 from(bucket: "{ self .bucket } ")
@@ -73,10 +79,24 @@ def read_last_sample(self, measurement_name :str, measure :str):
7379 if result :
7480 for table in result :
7581 for record in table .records :
76- return record .get_time (), record .get_value ()
82+ return measure , record .get_time (), record .get_value ()
7783 else :
78- print ( f"No data found for { measurement_name } and { measure } " )
84+ return None
7985
8086 except Exception as e :
8187 print (f"Error: { e } " )
88+
89+ def process (self , item ):
90+ output = {}
91+ if isinstance (item , str ):
92+ data = self .read_last_sample (item )
93+ if data :
94+ output [data [0 ]] = (data [1 ], data [2 ])
95+ elif isinstance (item , dict ):
96+ for k in item :
97+ data = self .read_last_sample (k )
98+ if data :
99+ output [data [0 ]] = (data [1 ], data [2 ])
100+
101+ return output
82102
0 commit comments