Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 250 additions & 12 deletions simpleAPI/simpleAPI_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,15 @@ def _on_message(self, client, userdata, msg):
def _transform_and_publish(self, original_topic: str, payload: str):
"""Transform original topic to simpleAPI format and publish if value changed."""
try:
log.debug(f"DEBUG: Processing original topic: {original_topic}")

# Parse the payload
parsed_payload = self._parse_payload(payload)

# Generate transformed topics
transformed_topics = self._generate_simple_topics(original_topic, parsed_payload)

log.debug(f"DEBUG: Generated {len(transformed_topics)} transformed topics: {list(transformed_topics.keys())}")

# Publish each transformed topic if value changed
for topic, value in transformed_topics.items():
Expand Down Expand Up @@ -229,6 +233,12 @@ def _generate_simple_topics(self, original_topic: str, parsed_value: Any) -> Dic
# Extract component info for ID tracking
self._track_component_ids(original_topic)

# Apply chargepoint-specific transformations
if '/chargepoint/' in simple_base:
simple_base = self._transform_chargepoint_topic(simple_base)
if simple_base is None: # Topic should be filtered out
return result

# Handle different value types
self._expand_value_to_topics(simple_base, parsed_value, result)

Expand All @@ -238,6 +248,83 @@ def _generate_simple_topics(self, original_topic: str, parsed_value: Any) -> Dic

return result

def _transform_chargepoint_topic(self, simple_base: str) -> Optional[str]:
"""Transform chargepoint topics according to simplified structure."""

log.debug(f"DEBUG: _transform_chargepoint_topic input: {simple_base}")

# FIRST: Handle connected_vehicle transformations (both /get/ and direct)
# This must happen BEFORE any other filtering
if '/connected_vehicle/config/chargemode' in simple_base:
log.debug(f"DEBUG: Found chargemode pattern, transforming: {simple_base}")
original = simple_base
simple_base = re.sub(r'/get/connected_vehicle/config/chargemode$', '/chargemode', simple_base)
log.debug(f"DEBUG: After first regex: {simple_base}")
simple_base = re.sub(r'/connected_vehicle/config/chargemode$', '/chargemode', simple_base)
log.debug(f"DEBUG: After second regex: {simple_base}")
if original != simple_base:
log.debug(f"DEBUG: Chargemode transformation successful: {original} -> {simple_base}")
elif '/connected_vehicle/info/name' in simple_base:
log.debug(f"DEBUG: Found vehicle name pattern, transforming: {simple_base}")
simple_base = re.sub(r'/get/connected_vehicle/info/name$', '/vehicle_name', simple_base)
simple_base = re.sub(r'/connected_vehicle/info/name$', '/vehicle_name', simple_base)

# Keep only config topics that are in the allowed list
if '/config/' in simple_base and not re.search(r'/(chargemode|vehicle_name)$', simple_base):
allowed_config_paths = [
'configuration/ip_address', 'configuration/duo_num', 'ev', 'name',
'type', 'template', 'connected_phases', 'phase_1',
'auto_phase_switch_hw', 'control_pilot_interruption_hw', 'id', 'ocpp_chargebox_id'
]

# Extract the config path part
config_match = re.search(r'/config/(.+)$', simple_base)
if config_match:
config_path = config_match.group(1)
if config_path in allowed_config_paths:
return simple_base # Keep this config topic

return None # Filter out other config topics

# Handle set topics with special mappings
if '/set/' in simple_base:
# set/manual_lock -> manual_lock
simple_base = re.sub(r'/set/manual_lock$', '/manual_lock', simple_base)

# set/current -> evse_current
simple_base = re.sub(r'/set/current$', '/evse_current', simple_base)

# Filter out all other set topics (like charge_template, log, etc.)
if '/set/' in simple_base:
return None

# Handle get topics - remove /get/ prefix
if '/get/' in simple_base:
simple_base = simple_base.replace('/get/', '/')

# Filter out unwanted topics - but exclude already transformed ones
if not re.search(r'/(chargemode|vehicle_name)$', simple_base):
unwanted_patterns = [
r'/connected_vehicle/(info|config|soc)/',
r'/max_evse_current$',
r'/current_branch$',
r'/current_commit$'
]

for pattern in unwanted_patterns:
if re.search(pattern, simple_base):
return None
else:
# For non-get topics, also filter out remaining connected_vehicle topics
# but exclude the ones we already transformed
if not re.search(r'/(chargemode|vehicle_name)$', simple_base):
if re.search(r'/connected_vehicle/', simple_base):
log.debug(f"DEBUG: Filtering out connected_vehicle topic: {simple_base}")
return None

log.debug(f"DEBUG: _transform_chargepoint_topic output: {simple_base}")
return simple_base

def _track_component_ids(self, topic: str):
"""Track component IDs to determine lowest IDs."""
# Pattern to match topics with numeric IDs
Expand Down Expand Up @@ -266,8 +353,16 @@ def _expand_value_to_topics(self, base_topic: str, value: Any, result: Dict[str,
new_topic = f"{base_topic}/{i}"
self._expand_value_to_topics(new_topic, val, result)
else:
# Raw value
result[base_topic] = value
# Raw value - apply chargepoint transformations to final topics
final_topic = base_topic
if '/chargepoint/' in base_topic:
transformed = self._transform_chargepoint_topic(base_topic)
if transformed is not None:
final_topic = transformed
else:
return # Topic should be filtered out

result[final_topic] = value

def _generate_simplified_topics(self, simple_topic: str, parsed_value: Any) -> Dict[str, Any]:
"""Generate topics without IDs for components with lowest IDs."""
Expand All @@ -285,6 +380,13 @@ def _generate_simplified_topics(self, simple_topic: str, parsed_value: Any) -> D
# Check if this is the lowest ID for this component type
if component_type in self.lowest_ids and self.lowest_ids[component_type] == component_id:
simplified_base = f"openWB/simpleAPI/{component_type}/{remaining_path}"

# Apply chargepoint transformations to simplified topics as well
if component_type == 'chargepoint':
simplified_base = self._transform_chargepoint_topic(simplified_base)
if simplified_base is None:
return result

self._expand_value_to_topics(simplified_base, parsed_value, result)

return result
Expand Down Expand Up @@ -328,28 +430,64 @@ def _cache_charge_template(self, topic: str, payload: str):
def _handle_write_operation(self, topic: str, payload: str):
"""Handle write operations from simpleAPI set topics."""
try:
# Skip processing if payload is empty (cleared set topic)
if not payload or payload.strip() == "":
log.debug(f"Skipping empty set topic: {topic}")
return

log.info(f"Write operation: {topic} = {payload}")

# Parse the set topic to extract operation details
topic_parts = topic.replace('openWB/simpleAPI/set/', '').split('/')

if len(topic_parts) < 2:
topic_remainder = topic.replace('openWB/simpleAPI/set/', '')

# Check for instant_charging_limit operations first (can be with or without chargepoint ID)
if 'instant_charging_limit_soc' in topic_remainder:
success = self._handle_instant_charging_limit_soc_operation(payload)
if success:
self._clear_set_topic(topic)
return
elif 'instant_charging_limit_amount' in topic_remainder:
success = self._handle_instant_charging_limit_amount_operation(payload)
if success:
self._clear_set_topic(topic)
return
elif 'instant_charging_limit' in topic_remainder:
success = self._handle_instant_charging_limit_operation(payload)
if success:
self._clear_set_topic(topic)
return

topic_parts = topic_remainder.split('/')

if len(topic_parts) < 1:
log.error(f"Invalid set topic format: {topic}")
return

operation = topic_parts[0]

if operation == 'chargepoint':
self._handle_chargepoint_operation(topic_parts, payload)
success = self._handle_chargepoint_operation(topic_parts, payload)
if success:
self._clear_set_topic(topic)
elif operation == 'bat_mode':
self._handle_bat_mode_operation(payload)
success = self._handle_bat_mode_operation(payload)
if success:
self._clear_set_topic(topic)
else:
log.error(f"Unknown operation: {operation}")

except Exception as e:
log.error(f"Error handling write operation {topic}: {e}")

def _handle_chargepoint_operation(self, topic_parts: list, payload: str):
def _clear_set_topic(self, topic: str):
"""Clear a set topic after successful operation."""
try:
self.client.publish(topic, "", qos=0, retain=True)
log.debug(f"Cleared set topic: {topic}")
except Exception as e:
log.error(f"Failed to clear set topic {topic}: {e}")

def _handle_chargepoint_operation(self, topic_parts: list, payload: str) -> bool:
"""Handle chargepoint-specific write operations."""
# Determine chargepoint ID
if len(topic_parts) >= 3 and topic_parts[1].isdigit():
Expand All @@ -363,10 +501,10 @@ def _handle_chargepoint_operation(self, topic_parts: list, payload: str):
parameter = topic_parts[1]
else:
log.error("No chargepoint ID found and no lowest ID available")
return
return False
else:
log.error(f"Invalid chargepoint topic format: {'/'.join(topic_parts)}")
return
return False

log.debug(f"Chargepoint operation: ID={chargepoint_id}, parameter={parameter}, value={payload}")

Expand All @@ -384,6 +522,9 @@ def _handle_chargepoint_operation(self, topic_parts: list, payload: str):
self._set_chargepoint_lock(chargepoint_id, payload)
else:
log.error(f"Unknown chargepoint parameter: {parameter}")
return False

return True

def _get_charge_template(self, chargepoint_id: str) -> Optional[Dict[str, Any]]:
"""Get charge_template for a chargepoint, either from cache or request it."""
Expand Down Expand Up @@ -507,17 +648,114 @@ def _set_chargepoint_lock(self, chargepoint_id: str, lock_state: str):
except Exception as e:
log.error(f"Error setting chargepoint lock: {e}")

def _handle_bat_mode_operation(self, payload: str):
def _handle_bat_mode_operation(self, payload: str) -> bool:
"""Handle battery mode operation."""
valid_modes = ['min_soc_bat_mode', 'ev_mode', 'bat_mode']

if payload not in valid_modes:
log.error(f"Invalid bat_mode: {payload}. Valid values: {valid_modes}")
return
return False

target_topic = "openWB/set/general/chargemode_config/pv_charging/bat_mode"
self.client.publish(target_topic, payload, qos=0, retain=True)
log.info(f"Set bat_mode to {payload}")
return True

def _handle_instant_charging_limit_operation(self, payload: str) -> bool:
"""Handle instant charging limit type operation."""
valid_limits = ['none', 'soc', 'amount']

if payload not in valid_limits:
log.error(f"Invalid instant_charging_limit: {payload}. Valid values: {valid_limits}")
return False

# Get chargepoint ID (use lowest if not specified)
if 'chargepoint' in self.lowest_ids:
chargepoint_id = str(self.lowest_ids['chargepoint'])
else:
log.error("No chargepoint ID found for instant_charging_limit operation")
return False

charge_template = self._get_charge_template(chargepoint_id)
if charge_template is None:
log.error(f"No charge_template available for chargepoint {chargepoint_id}")
return False

# Modify the instant_charging.limit.selected value
charge_template['chargemode']['instant_charging']['limit']['selected'] = payload

# Publish the modified template
target_topic = f"openWB/set/chargepoint/{chargepoint_id}/set/charge_template"
self._publish_json(target_topic, charge_template)
log.info(f"Set instant_charging_limit to {payload} for chargepoint {chargepoint_id}")
return True

def _handle_instant_charging_limit_soc_operation(self, payload: str) -> bool:
"""Handle instant charging limit SoC operation."""
try:
soc_value = int(payload)
if soc_value < 0 or soc_value > 100:
log.error(f"Invalid SoC value: {soc_value}. Must be between 0 and 100")
return False
except ValueError:
log.error(f"Invalid SoC value: {payload}. Must be an integer")
return False

# Get chargepoint ID (use lowest if not specified)
if 'chargepoint' in self.lowest_ids:
chargepoint_id = str(self.lowest_ids['chargepoint'])
else:
log.error("No chargepoint ID found for instant_charging_limit_soc operation")
return False

charge_template = self._get_charge_template(chargepoint_id)
if charge_template is None:
log.error(f"No charge_template available for chargepoint {chargepoint_id}")
return False

# Modify the instant_charging.limit.soc value
charge_template['chargemode']['instant_charging']['limit']['soc'] = soc_value

# Publish the modified template
target_topic = f"openWB/set/chargepoint/{chargepoint_id}/set/charge_template"
self._publish_json(target_topic, charge_template)
log.info(f"Set instant_charging_limit_soc to {soc_value}% for chargepoint {chargepoint_id}")
return True

def _handle_instant_charging_limit_amount_operation(self, payload: str) -> bool:
"""Handle instant charging limit amount operation."""
try:
amount_value = int(payload)
if amount_value < 1 or amount_value > 50:
log.error(f"Invalid amount value: {amount_value}. Must be between 1 and 50")
return False

# Convert to internal value (multiply by 1000)
internal_amount = amount_value * 1000
except ValueError:
log.error(f"Invalid amount value: {payload}. Must be an integer")
return False

# Get chargepoint ID (use lowest if not specified)
if 'chargepoint' in self.lowest_ids:
chargepoint_id = str(self.lowest_ids['chargepoint'])
else:
log.error("No chargepoint ID found for instant_charging_limit_amount operation")
return False

charge_template = self._get_charge_template(chargepoint_id)
if charge_template is None:
log.error(f"No charge_template available for chargepoint {chargepoint_id}")
return False

# Modify the instant_charging.limit.amount value
charge_template['chargemode']['instant_charging']['limit']['amount'] = internal_amount

# Publish the modified template
target_topic = f"openWB/set/chargepoint/{chargepoint_id}/set/charge_template"
self._publish_json(target_topic, charge_template)
log.info(f"Set instant_charging_limit_amount to {amount_value} kWh ({internal_amount} Wh) for chargepoint {chargepoint_id}")
return True

def _publish_json(self, topic: str, data: Dict[str, Any]):
"""Publish JSON data to a topic."""
Expand Down