Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f905bab
Prepare for new code.
craig8 Mar 18, 2025
fb163fc
Add support for loading environment variables from a .env file and up…
craig8 Mar 18, 2025
a9f2646
Add support for environment variables in forwarder command options
craig8 Mar 18, 2025
a4d3da3
Add python-dotenv as a dependency for environment variable support
craig8 Mar 18, 2025
17731ba
remove python-dotenv from outer pyproject.toml file.
craig8 Mar 18, 2025
dc7d4e8
Bump version to 2025.3.2a1
Mar 19, 2025
eda3e73
Update gridappsd_field_bus.py
craig8 Apr 10, 2025
a8f0b75
Bump version to 2025.3.2a2
Apr 10, 2025
50d2fb1
Update gridappsd_field_bus.py
craig8 Apr 10, 2025
4bfa3d7
Bump version to 2025.3.2a3
Apr 10, 2025
03633a7
Update agents.py
craig8 Apr 10, 2025
3b3e13c
Bump version to 2025.3.2a4
Apr 10, 2025
776eb2a
Update agents.py
craig8 Apr 10, 2025
d31acb4
Bump version to 2025.3.2a5
Apr 10, 2025
f48bc2a
Update goss.py
craig8 Apr 10, 2025
b022fb9
Bump version to 2025.3.2a6
Apr 10, 2025
5713a54
added request
singha42 Apr 17, 2025
d83e997
Bump version to 2025.3.2a7
Apr 17, 2025
98438aa
Update goss.py
poorva1209 Apr 18, 2025
a75faab
Update goss.py
poorva1209 Apr 18, 2025
606045c
Update field_proxy_forwarder.py
poorva1209 Apr 18, 2025
2b8d523
Bump version to 2025.3.2a8
Apr 18, 2025
6c20a2e
Update field_proxy_forwarder.py
poorva1209 Apr 23, 2025
9f04c86
Bump version to 2025.3.2a9
Apr 23, 2025
e02fb37
Fixing simulation.py dataclass defaults to use field
afisher1 Jun 20, 2025
9591533
Fixing simulation.py dataclass defaults to use field
afisher1 Jun 20, 2025
bfc2e00
Bump version to 2025.3.2a10
Jun 24, 2025
971d463
fixing syntax errors and missing import.
afisher1 Jun 25, 2025
5ca4678
Merge branch 'develop' into bugfix/152
afisher1 Jun 25, 2025
db3b962
Update simulation.py
craig8 Jun 25, 2025
81bf0a1
Bugfix/152 bugfixes (#184)
craig8 Jun 25, 2025
4235940
Bump version to 2025.3.2a11
Jun 25, 2025
4e9b522
Bump version to 2025.3.2a12
Jun 25, 2025
add825e
Update agents.py
poorva1209 Jul 9, 2025
e3e253f
Update __init__.py
poorva1209 Jul 9, 2025
3d2beb6
Bump version to 2025.3.2a13
Jul 9, 2025
3a13360
Merge branch 'main' into releases/2025.06.0
poorva1209 Jul 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

from gridappsd_field_bus.field_interface.agents.agents import (FeederAgent, DistributedAgent,
CoordinatingAgent, SwitchAreaAgent,
SecondaryAreaAgent, SubstationAgent)
SecondaryAreaAgent, SubstationAgent, compute_req)

__all__: List[str] = ["FeederAgent", "DistributedAgent", "CoordinatingAgent"]
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
from datetime import datetime
from typing import Dict

import time
import os
from functools import wraps
import sys
import inspect
import atexit

from cimgraph.databases import ConnectionParameters
from cimgraph.databases.gridappsd import GridappsdConnection
from cimgraph.models import FeederModel
Expand All @@ -17,12 +24,18 @@
from gridappsd_field_bus.field_interface.gridappsd_field_bus import GridAPPSDMessageBus
from gridappsd_field_bus.field_interface.interfaces import (FieldMessageBus, MessageBusDefinition, MessageBusFactory)


CIM_PROFILE = None
IEC61970_301 = None
cim = None

_log = logging.getLogger(__name__)

decorator_logger = logging.getLogger("decorator_logger")
decorator_logger.setLevel(logging.INFO)
file_handler = logging.FileHandler("compute_req_log.txt") # Log file name
formatter = logging.Formatter('[COMPUTE_REQ] %(asctime)s - %(message)s')
file_handler.setFormatter(formatter)
decorator_logger.addHandler(file_handler)

def set_cim_profile(cim_profile: str, iec61970_301: int):
global CIM_PROFILE
Expand All @@ -41,6 +54,147 @@ class AgentRegistrationDetails:
upstream_message_bus_id: FieldMessageBus.id
downstream_message_bus_id: FieldMessageBus.id

@atexit.register
def call_counter_report():
decorator_logger.info("Function call counts summary:")
for func_name, count in function_call_counts.items():
decorator_logger.info(f"{func_name} was called {count} time(s)")

@atexit.register
def message_size_report():
decorator_logger.info("Total message size summary:")
for func_name, total_size in message_size_totals.items():
decorator_logger.info(f"{func_name} total message size: {total_size} bytes")

def compute_req(cls):
functions = [
'__init__',
#'on_measurement',
'on_upstream_message',
'on_downstream_message',
'on_request',
'publish_upstream',
'publish_downstream',
'send_control_command'
]

def call_counter(func):
name = func.__qualname__

@wraps(func)
def wrapper(*args, **kwargs):
if args[0].agent_id+'.'+name not in function_call_counts:
function_call_counts[args[0].agent_id+'.'+name] = 0
function_call_counts[args[0].agent_id+'.'+name] += 1
#decorator_logger.info(f"{name} called {function_call_counts[name]} times")
return func(*args, **kwargs)
return wrapper

def timed(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
class_name = args[0].__class__.__name__ if args else ""
if func.__name__ == '__init__':
decorator_logger.info(f"{class_name}.{func.__name__}.{args[0].agent_id} took: {end - start:.6f} seconds")
return result
return wrapper

def get_deep_size(func):
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)

def deep_size(obj, seen=None):
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(deep_size(k, seen) + deep_size(v, seen) for k, v in obj.items())
elif isinstance(obj, (list, tuple, set, frozenset)):
size += sum(deep_size(i, seen) for i in obj)
elif hasattr(obj, '__dict__'):
for attr_name, attr_value in vars(obj).items():
if attr_name in ['feeder_area', 'switch_area', 'secondary_area']:
continue
size += deep_size(attr_value, seen)
elif hasattr(obj, '__slots__'):
size += sum(deep_size(getattr(obj, slot), seen) for slot in obj.__slots__ if hasattr(obj, slot))
return size

self = args[0]
obj_size = deep_size(self)
decorator_logger.info(f"{self.__class__.__name__}.{func.__name__}.{args[0].agent_id} size is: {obj_size} bytes")

return result
return wrapper

def get_graph_size(func):
@wraps(func)
def wrapper(*args, **kwargs):
self = args[0]
result = func(*args, **kwargs)
area_names = ['feeder_area', 'switch_area', 'secondary_area']
area_found = False
for name in area_names:
area_dict = getattr(self, name, None)
if area_dict is not None and hasattr(area_dict, 'graph'):
graph_keys = [key.__name__ for key in list(area_dict.graph.keys())]
size = len(area_dict.graph.keys())
decorator_logger.info(f"{self.__class__.__name__}.{func.__name__}.{args[0].agent_id} length of graph: {size}")
decorator_logger.info(f"{self.__class__.__name__}.{name}.{args[0].agent_id} graph keys: {graph_keys}")
area_found = True
break

if not area_found:
decorator_logger.error(f"{class_name}.{func.__name__}.{args[0].agent_id} No area dictionary (feeder/switch/secondary) found in {self.__class__.__name__}")
return result
return wrapper

def log_message_size(func):
name = func.__qualname__

@wraps(func)
def wrapper(*args, **kwargs):
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()

if 'message' in bound_args.arguments:
msg = bound_args.arguments['message']
size = sys.getsizeof(msg)
if args[0].agent_id+'.'+name not in message_size_totals:
message_size_totals[args[0].agent_id+'.'+name] = 0
message_size_totals[args[0].agent_id+'.'+name] += size

if 'differenceBuilder' in bound_args.arguments:
msg = bound_args.arguments['differenceBuilder']
size = sys.getsizeof(msg)
if args[0].agent_id+'.'+name not in message_size_totals:
message_size_totals[args[0].agent_id+'.'+name] = 0
message_size_totals[args[0].agent_id+'.'+name] += size

return func(*args, **kwargs)
return wrapper

# Decorate the relevant functions
for attr_name in functions:
if hasattr(cls, attr_name):
original_func = getattr(cls, attr_name)
if callable(original_func):
if attr_name == '__init__':
decorated = get_deep_size(get_graph_size(timed(original_func)))
else:
decorated = call_counter(log_message_size(timed(original_func)))
setattr(cls, attr_name, decorated)

return cls

class DistributedAgent:

Expand All @@ -61,10 +215,7 @@ def __init__(self,
self.simulation_id = simulation_id
self.context = None

# TODO: Change params and connection to local connection
self.params = ConnectionParameters(cim_profile=CIM_PROFILE, iec61970_301=IEC61970_301)

self.connection = GridappsdConnection(self.params)
self.connection = GridappsdConnection()
self.connection.cim_profile = cim_profile

self.app_id = agent_config['app_id']
Expand All @@ -79,14 +230,10 @@ def __init__(self,
self.agent_area_dict = agent_area_dict

if upstream_message_bus_def is not None:
if upstream_message_bus_def.is_ot_bus:
self.upstream_message_bus = MessageBusFactory.create(upstream_message_bus_def)
# else:
# self.upstream_message_bus = VolttronMessageBus(upstream_message_bus_def)

self.upstream_message_bus = MessageBusFactory.create(upstream_message_bus_def)

if downstream_message_bus_def is not None:
if downstream_message_bus_def.is_ot_bus:
self.downstream_message_bus = MessageBusFactory.create(downstream_message_bus_def)
self.downstream_message_bus = MessageBusFactory.create(downstream_message_bus_def)

if self.downstream_message_bus is None and self.upstream_message_bus is None:
raise ValueError("Must have at least a downstream and/or upstream message bus specified")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
from gridappsd import GridAPPSD
from gridappsd import topics

from cimgraph.databases import GridappsdConnection, BlazegraphConnection
from cimgraph.models import BusBranchModel, FeederModel

import os
import cimgraph.utils as utils
import cimgraph.data_profile.cimhub_ufls as cim

REQUEST_FIELD = ".".join((topics.PROCESS_PREFIX, "request.field"))

class FieldListener:
Expand All @@ -16,7 +23,7 @@ def __init__(self, ot_connection: GridAPPSD, proxy_connection: stomp.Connection)
def on_message(self, headers, message):
"Receives messages coming from Proxy bus (e.g. ARTEMIS) and forwards to OT bus"
try:
print(f"Received message at Proxy: {message}")
print(f"Received message at Proxy. destination: {headers['destination']}, message: {headers}")

if headers["destination"] == topics.field_output_topic():
self.ot_connection.send(topics.field_output_topic(), message)
Expand All @@ -29,8 +36,11 @@ def on_message(self, headers, message):
request_type = request_data.get("request_type")
if request_type == "get_context":
response = self.ot_connection.get_response(headers["destination"],message)
self.proxy_connection.send(headers["reply_to"],response)

self.proxy_connection.send(headers["reply-to"],response)
elif request_type == "start_publishing":
response = self.ot_connection.get_response(headers["destination"],message)
self.proxy_connection.send(headers["reply-to"],json.dumps(response))

else:
print(f"Unrecognized message received by Proxy: {message}")

Expand All @@ -43,7 +53,7 @@ class FieldProxyForwarder:
when direct connection is not possible.
"""

def __init__(self, connection_url: str, username: str, password: str):
def __init__(self, connection_url: str, username: str, password: str, mrid :str):

#Connect to OT
self.ot_connection = GridAPPSD()
Expand All @@ -52,27 +62,60 @@ def __init__(self, connection_url: str, username: str, password: str):
self.broker_url = connection_url
self.username = username
self.password = password
self.proxy_connection = stomp.Connection([(self.broker_url.split(":")[0], int(self.broker_url.split(":")[1]))],keepalive=True)
self.proxy_connection = stomp.Connection([(self.broker_url.split(":")[0], int(self.broker_url.split(":")[1]))],keepalive=True, heartbeats=(10000,10000))
self.proxy_connection.set_listener('', FieldListener(self.ot_connection, self.proxy_connection))
self.proxy_connection.connect(self.username, self.password, wait=True)

print('Connected to Proxy')



#Subscribe to messages from field
self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC+'.*', id=1, ack="auto")

self.proxy_connection.subscribe(destination='goss.gridappsd.process.request.*', id=2, ack="auto")

#Subscribe to messages on OT bus
self.ot_connection.subscribe(topics.field_input_topic(), self.on_message_from_ot)



os.environ['CIMG_CIM_PROFILE'] = 'cimhub_ufls'
os.environ['CIMG_URL'] = 'http://localhost:8889/bigdata/namespace/kb/sparql'
os.environ['CIMG_DATABASE'] = 'powergridmodel'
os.environ['CIMG_NAMESPACE'] = 'http://iec.ch/TC57/CIM100#'
os.environ['CIMG_IEC61970_301'] = '8'
os.environ['CIMG_USE_UNITS'] = 'False'

self.database = BlazegraphConnection()
distribution_area = cim.DistributionArea(mRID=mrid)
self.network = BusBranchModel(
connection=self.database,
container=distribution_area,
distributed=False)
self.network.get_all_edges(cim.DistributionArea)
self.network.get_all_edges(cim.Substation)

for substation in self.network.graph.get(cim.Substation,{}).values():
print(f'Subscribing to Substation: /topic/goss.gridappsd.field.{substation.mRID}')
self.ot_connection.subscribe('/topic/goss.gridappsd.field.'+substation.mRID, self.on_message_from_ot)



#self.ot_connection.subscribe(topics.BASE_FIELD_TOPIC, self.on_message_from_ot)


def on_message_from_ot(self, headers, message):

"Receives messages coming from OT bus (GridAPPS-D) and forwards to Proxy bus"
try:
print(f"Received message from OT: {message}")

if headers["destination"] == topics.field_input_topic():
self.proxy_connection.send(topics.field_input_topic(), message)
self.proxy_connection.send(topics.field_input_topic(),json.dumps(message))

elif 'goss.gridappsd.field' in headers["destination"]:

self.proxy_connection.send(headers["destination"],json.dumps(message))
else:
print(f"Unrecognized message received by OT: {message}")

Expand All @@ -86,12 +129,14 @@ def on_message_from_ot(self, headers, message):
parser.add_argument("username")
parser.add_argument("passwd")
parser.add_argument("connection_url")
parser.add_argument("mrid")
opts = parser.parse_args()
proxy_connection_url = opts.connection_url
proxy_username = opts.username
proxy_password = opts.passwd
mrid = opts.mrid

proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password)
proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password, mrid)

while True:
time.sleep(0.1)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(self, definition: MessageBusDefinition):
self._user = definition.connection_args["GRIDAPPSD_USER"]
self._password = definition.connection_args["GRIDAPPSD_PASSWORD"]
self._address = definition.connection_args["GRIDAPPSD_ADDRESS"]
self._use_auth_token = definition.connection_args.get("GRIDAPPSD_USE_TOKEN_AUTH", False)

self.gridappsd_obj = None

Expand All @@ -29,7 +30,7 @@ def connect(self):
"""
Connect to the concrete message bus that implements this interface.
"""
self.gridappsd_obj = GridAPPSD()
self.gridappsd_obj = GridAPPSD(use_auth_token=self._use_auth_token)

def subscribe(self, topic, callback):
if self.gridappsd_obj is not None:
Expand Down
Loading
Loading