From efc1c2baf144d65846c0402d26847e8aea040f83 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Mon, 4 Aug 2025 23:36:02 +0700 Subject: [PATCH 1/5] add limiter to recipe --- custom-recipes/pi-system-retrieve-list/recipe.py | 10 ++++++++-- python-lib/osisoft_client.py | 12 +++++++++--- python-lib/osisoft_plugin_common.py | 3 +++ 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/custom-recipes/pi-system-retrieve-list/recipe.py b/custom-recipes/pi-system-retrieve-list/recipe.py index 88b4fbb..611dfb6 100644 --- a/custom-recipes/pi-system-retrieve-list/recipe.py +++ b/custom-recipes/pi-system-retrieve-list/recipe.py @@ -7,7 +7,8 @@ get_credentials, get_interpolated_parameters, normalize_af_path, get_combined_description, get_base_for_data_type, check_debug_mode, PerformanceTimer, get_max_count, check_must_convert_object_to_string, - convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters + convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters, + get_batch_parameters ) from osisoft_client import OSIsoftClient from osisoft_constants import OSIsoftConstants @@ -63,6 +64,8 @@ def get_step_value(item): record_boundary_type = config.get("record_boundary_type") if data_type == "RecordedData" else None summary_type, summary_duration = get_summary_parameters(config) do_duplicate_input_row = config.get("do_duplicate_input_row", False) +max_request_size, estimated_density, maximum_points_returned = get_batch_parameters(config) +max_time_to_retrieve_per_batch = estimated_density / maximum_points_returned #density per hour <- max time is in hour network_timer = PerformanceTimer() processing_timer = PerformanceTimer() @@ -150,9 +153,12 @@ def get_step_value(item): object_id=object_id, summary_type=summary_type, summary_duration=summary_duration, - endpoint_type="AF" + endpoint_type="AF", + estimated_density=estimated_density, + maximum_points_returned=maximum_points_returned ) batch_buffer_size = 0 + total_batch_time = 0 buffer = [] else: continue diff --git a/python-lib/osisoft_client.py b/python-lib/osisoft_client.py index 1fbf55c..012f4af 100644 --- a/python-lib/osisoft_client.py +++ b/python-lib/osisoft_client.py @@ -10,7 +10,8 @@ from osisoft_plugin_common import ( assert_server_url_ok, build_requests_params, is_filtered_out, is_server_throttling, escape, epoch_to_iso, - iso_to_epoch, RecordsLimit, is_iso8601, get_next_page_url, change_key_in_dict + iso_to_epoch, RecordsLimit, is_iso8601, get_next_page_url, change_key_in_dict, + BatchTimeCounter ) from osisoft_pagination import OffsetPagination from safe_logger import SafeLogger @@ -243,7 +244,10 @@ def get_rows_from_webid(self, webid, data_type, **kwargs): def get_rows_from_webids(self, input_rows, data_type, **kwargs): endpoint_type = kwargs.get("endpoint_type", "event_frames") batch_size = kwargs.get("batch_size", 500) - + estimated_density = kwargs.get("estimated_density", 500) + maximum_points_returned = kwargs.get("maximum_points_returned", 500) + max_time_to_retrieve_per_batch = maximum_points_returned / estimated_density + batch_time = BatchTimeCounter(max_time_to_retrieve_per_batch) batch_requests_parameters = [] number_processed_webids = 0 number_of_webids_to_process = len(input_rows) @@ -260,13 +264,15 @@ def get_rows_from_webids(self, input_rows, data_type, **kwargs): webid = input_row url = self.endpoint.get_data_from_webid_url(endpoint_type, data_type, webid) requests_kwargs = self.generic_get_kwargs(**kwargs) + print("ALX:requests_kwargs={}".format(requests_kwargs)) + batch_time.add(requests_kwargs.get("params", {}).get("starttime"), requests_kwargs.get("params",{}).get("endtime"), None) requests_kwargs['url'] = build_query_string(url, requests_kwargs.get("params")) web_ids.append(webid) event_start_times.append(event_start_time) event_end_times.append(event_end_time) batch_requests_parameters.append(requests_kwargs) number_processed_webids += 1 - if (len(batch_requests_parameters) >= batch_size) or (number_processed_webids == number_of_webids_to_process): + if (len(batch_requests_parameters) >= batch_size) or (number_processed_webids == number_of_webids_to_process) or batch_time.is_batch_full(): json_responses = self._batch_requests(batch_requests_parameters) batch_requests_parameters = [] response_index = 0 diff --git a/python-lib/osisoft_plugin_common.py b/python-lib/osisoft_plugin_common.py index a4aff59..e7c920c 100644 --- a/python-lib/osisoft_plugin_common.py +++ b/python-lib/osisoft_plugin_common.py @@ -619,6 +619,7 @@ def get_worst_performers(self): class BatchTimeCounter(object): def __init__(self, max_time_to_retrieve_per_batch): + logger.info("ALX:max_time_to_retrieve_per_batch:{}".format(max_time_to_retrieve_per_batch * 60 * 60)) self.max_time_to_retrieve_per_batch = max_time_to_retrieve_per_batch * 60 * 60 self.total_batch_time = 0 # 2 points /h each line @@ -636,4 +637,6 @@ def is_batch_full(self): return False def add(self, start_time, end_time, interval): + print("ALX:add time {}, {}, {}, {}".format(start_time, end_time, interval, self.total_batch_time)) self.total_batch_time += compute_time_spent(start_time, end_time, interval) + print("ALX:added time {}".format(self.total_batch_time)) From 5b9ee8f707aac1cf5d6535bb693d1979a4807012 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Tue, 5 Aug 2025 13:54:49 +0700 Subject: [PATCH 2/5] cleaning --- custom-recipes/pi-system-retrieve-list/recipe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/custom-recipes/pi-system-retrieve-list/recipe.py b/custom-recipes/pi-system-retrieve-list/recipe.py index 611dfb6..b0f3c1c 100644 --- a/custom-recipes/pi-system-retrieve-list/recipe.py +++ b/custom-recipes/pi-system-retrieve-list/recipe.py @@ -158,7 +158,6 @@ def get_step_value(item): maximum_points_returned=maximum_points_returned ) batch_buffer_size = 0 - total_batch_time = 0 buffer = [] else: continue From 11be561e3ccb8d3946cdbfe17b516badaae8e66e Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Tue, 5 Aug 2025 14:28:33 +0700 Subject: [PATCH 3/5] cleaning --- python-lib/osisoft_plugin_common.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/python-lib/osisoft_plugin_common.py b/python-lib/osisoft_plugin_common.py index e7c920c..2c84fc1 100644 --- a/python-lib/osisoft_plugin_common.py +++ b/python-lib/osisoft_plugin_common.py @@ -442,9 +442,9 @@ def epoch_to_iso(epoch): def iso_to_epoch(iso_timestamp): - logger.info("Converting iso timestamp '{}' to epoch".format(iso_timestamp)) + # logger.info("Converting iso timestamp '{}' to epoch".format(iso_timestamp)) if is_epoch(iso_timestamp): - logger.info("Timestamp is already epoch") + # logger.info("Timestamp is already epoch") return iso_timestamp epoch_timestamp = None try: @@ -453,7 +453,7 @@ def iso_to_epoch(iso_timestamp): except Exception: logger.error("Error when converting iso timestamp '{}' to epoch".format(iso_timestamp)) return None - logger.info("Timestamp is now '{}'".format(epoch_timestamp)) + # logger.info("Timestamp is now '{}'".format(epoch_timestamp)) return epoch_timestamp @@ -619,24 +619,19 @@ def get_worst_performers(self): class BatchTimeCounter(object): def __init__(self, max_time_to_retrieve_per_batch): - logger.info("ALX:max_time_to_retrieve_per_batch:{}".format(max_time_to_retrieve_per_batch * 60 * 60)) + logger.info("BatchTimeCounter:max_time_to_retrieve_per_batch={}s".format(max_time_to_retrieve_per_batch * 60 * 60)) self.max_time_to_retrieve_per_batch = max_time_to_retrieve_per_batch * 60 * 60 - self.total_batch_time = 0 - # 2 points /h each line - # max 1 000 000 lines back -> 500k hours max + self.total_batched_time = 0 def is_batch_full(self): - # return False if self.max_time_to_retrieve_per_batch < 0: return False - if self.total_batch_time > self.max_time_to_retrieve_per_batch: - logger.warning("batch contains {}s of request, needs to flush now".format(self.total_batch_time)) - self.total_batch_time = 0 + if self.total_batched_time > self.max_time_to_retrieve_per_batch: + logger.warning("batch contains {}s of request, needs to flush now".format(self.total_batched_time)) + self.total_batched_time = 0 return True - logger.info("Batch below time threshold") return False def add(self, start_time, end_time, interval): - print("ALX:add time {}, {}, {}, {}".format(start_time, end_time, interval, self.total_batch_time)) - self.total_batch_time += compute_time_spent(start_time, end_time, interval) - print("ALX:added time {}".format(self.total_batch_time)) + print("ALX:adding start_time={}, end_time={}, interval={}".format(start_time, end_time, interval)) + self.total_batched_time += compute_time_spent(start_time, end_time, interval) From 2337c49f0347711411ce0d3e41babfcddece40bc Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Tue, 5 Aug 2025 14:28:43 +0700 Subject: [PATCH 4/5] getting time from kwargs --- python-lib/osisoft_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python-lib/osisoft_client.py b/python-lib/osisoft_client.py index 012f4af..8b10b9d 100644 --- a/python-lib/osisoft_client.py +++ b/python-lib/osisoft_client.py @@ -263,9 +263,11 @@ def get_rows_from_webids(self, input_rows, data_type, **kwargs): else: webid = input_row url = self.endpoint.get_data_from_webid_url(endpoint_type, data_type, webid) + start_date = kwargs.get("start_date") + end_date = kwargs.get("end_date") + interval = kwargs.get("interval") requests_kwargs = self.generic_get_kwargs(**kwargs) - print("ALX:requests_kwargs={}".format(requests_kwargs)) - batch_time.add(requests_kwargs.get("params", {}).get("starttime"), requests_kwargs.get("params",{}).get("endtime"), None) + batch_time.add(start_date, end_date, interval) requests_kwargs['url'] = build_query_string(url, requests_kwargs.get("params")) web_ids.append(webid) event_start_times.append(event_start_time) From 5ca5a62f154adcf7d84dfd798edfab334a03c875 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Wed, 27 Aug 2025 09:31:34 +0200 Subject: [PATCH 5/5] removing dev log --- python-lib/osisoft_plugin_common.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python-lib/osisoft_plugin_common.py b/python-lib/osisoft_plugin_common.py index 2c84fc1..c8e0815 100644 --- a/python-lib/osisoft_plugin_common.py +++ b/python-lib/osisoft_plugin_common.py @@ -633,5 +633,4 @@ def is_batch_full(self): return False def add(self, start_time, end_time, interval): - print("ALX:adding start_time={}, end_time={}, interval={}".format(start_time, end_time, interval)) self.total_batched_time += compute_time_spent(start_time, end_time, interval)