Skip to content

Commit c3e5e8b

Browse files
committed
add python example, add CP message routing key
1 parent 1c009c6 commit c3e5e8b

File tree

7 files changed

+237
-15
lines changed

7 files changed

+237
-15
lines changed

README.md

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,22 @@ RabbitMQ already excels at message durability and routing, but traditionally the
3535
## Installation
3636

3737
This plugin works only with modern versions of RabbitMQ 4.x based on AMQP 1.0.
38+
You can [build from source](https://www.rabbitmq.com/plugin-development.html) or you can download the latest release build from GitHub. Then, unzip and place the `rabbitmq_web_ocpp-4.x.x.ez` file into your `/etc/rabbitmq/plugins/` folder.
3839
Like all plugins, it [must be enabled](https://www.rabbitmq.com/plugins.html) before it can be used:
3940

4041
``` bash
4142
# this might require sudo
4243
rabbitmq-plugins enable rabbitmq_web_ocpp
4344
```
4445

45-
## Documentation
46-
47-
For all configuration options, please refer to the nearly identical plugin, [RabbitMQ Web MQTT guide](https://www.rabbitmq.com/web-mqtt.html).
48-
46+
Detailed instructions on how to install a plugin into RabbitMQ broker can be found [here](https://www.rabbitmq.com/plugins.html#installing-plugins).
4947

50-
## Building From Source
48+
Note that release branches (`v4.1.x` vs. `main`) and target RabbitMQ version need to be taken into account
49+
when building plugins from source.
5150

52-
* [Generic plugin build instructions](https://www.rabbitmq.com/plugin-development.html)
53-
* Instructions on [how to install a plugin into RabbitMQ broker](https://www.rabbitmq.com/plugins.html#installing-plugins)
51+
## Documentation
5452

55-
Note that release branches (`stable` vs. `master`) and target RabbitMQ version need to be taken into account
56-
when building plugins from source.
53+
For all configuration options, please refer to the nearly identical plugin, [RabbitMQ Web MQTT guide](https://www.rabbitmq.com/web-mqtt.html).
5754

5855
## Enterprise-Grade Hosting & SLA Support
5956

examples/python/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__pycache__/

examples/python/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# OCPP AMQP Server
2+
3+
This directory contains Python examples for handling OCPP (Open Charge Point Protocol) messages using RabbitMQ native OCPP to AMQP plugin. It uses the popular python OCPP lib, [mobilityhouse/ocpp](https://github.com/mobilityhouse/ocpp).
4+
5+
## Files
6+
7+
- `server.py` - CSMS / OCPP server
8+
- `amqp_charge_point.py` - Overrides the original ChargePoint class
9+
- `requirements.txt` - Python dependencies
10+
11+
## Key Differences from traditional WebSocket Approach
12+
13+
- Uses RabbitMQ message broker for communication
14+
- Messages are persistent and can survive connection drops
15+
- Better scalability and load distribution
16+
- Supports complex routing patterns
17+
- Can handle multiple charge points efficiently
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import asyncio
2+
from datetime import datetime
3+
4+
from aio_pika import Message, Queue, Exchange
5+
from ocpp.v16 import ChargePoint as cp
6+
7+
8+
class AmqpChargePoint(cp):
9+
"""
10+
An OCPP ChargePoint implementation that is tightly integrated with AMQP.
11+
12+
This class directly consumes from an AMQP queue and publishes responses
13+
to an exchange, making it completely self-contained.
14+
"""
15+
16+
def __init__(self, queue: Queue, exchange: Exchange, **kwargs):
17+
"""
18+
Args:
19+
queue (aio_pika.Queue): The AMQP queue to consume messages from
20+
exchange (aio_pika.Exchange): The AMQP exchange to publish responses to
21+
"""
22+
# Pass None as the id since we'll extract it dynamically from routing keys
23+
super().__init__(None, self, **kwargs)
24+
self.queue = queue
25+
self.exchange = exchange
26+
27+
async def _send(self, message: str):
28+
"""
29+
Overrides the base `_send` method to publish the message to the
30+
AMQP exchange using the current charge point ID from the routing key.
31+
"""
32+
33+
self.logger.info("%s: send %s", self.id, message)
34+
msg = Message(
35+
body=message.encode(),
36+
content_type='application/json',
37+
timestamp=int(datetime.now().timestamp()),
38+
)
39+
await self.exchange.publish(msg, routing_key=self.id)
40+
41+
async def start(self):
42+
"""
43+
Start consuming from the AMQP queue and process messages directly.
44+
Completely overrides the base class implementation.
45+
"""
46+
async with self.queue.iterator() as queue_iter:
47+
async for message in queue_iter:
48+
# Acknowledge the message automatically after processing
49+
async with message.process(requeue=False):
50+
try:
51+
# Extract routing key and charge point ID
52+
routing_key = getattr(message, 'reply_to', '') or ''
53+
if not routing_key:
54+
self.logger.warning("Received message without reply_to set")
55+
continue
56+
self.id = routing_key
57+
58+
# Decode message body
59+
body = message.body.decode()
60+
61+
# Process the message directly
62+
self.logger.info("%s: receive message %s", routing_key, body)
63+
await self.route_message(body)
64+
65+
except Exception as e:
66+
self.logger.exception("Error processing message: %s", e)

examples/python/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ocpp==2.0.0
2+
aio-pika==9.5.7

examples/python/server.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import asyncio
2+
import logging
3+
from datetime import datetime, timezone
4+
5+
import aio_pika
6+
from aio_pika import Message, ExchangeType
7+
8+
from ocpp.routing import on
9+
from ocpp.v16 import call_result
10+
from ocpp.v16.enums import Action, RegistrationStatus
11+
12+
from amqp_charge_point import AmqpChargePoint
13+
14+
EXCHANGE_NAME = 'amq.topic'
15+
QUEUE_NAME = 'ocpp.worker'
16+
17+
# Configure logging
18+
logging.basicConfig(level=logging.INFO)
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class MyChargePoint(AmqpChargePoint):
23+
"""
24+
Extends the AmqpChargePoint to add application-specific message handlers.
25+
"""
26+
@on(Action.boot_notification)
27+
async def on_boot_notification(
28+
self, charge_point_vendor, charge_point_model, **kwargs
29+
):
30+
logger.info(
31+
"#### Boot notification from vendor %s, model %s",
32+
charge_point_vendor,
33+
charge_point_model,
34+
)
35+
return call_result.BootNotification(
36+
current_time=datetime.now(tz=timezone.utc).isoformat(),
37+
interval=10,
38+
status=RegistrationStatus.accepted,
39+
)
40+
41+
@on(Action.status_notification)
42+
async def on_status_notification(
43+
self,
44+
status,
45+
connector_id,
46+
error_code,
47+
**kwargs,
48+
):
49+
logger.info(
50+
"#### Status notification from connector %s: status=%s, error=%s",
51+
connector_id,
52+
status,
53+
error_code,
54+
)
55+
return call_result.StatusNotification()
56+
57+
@on(Action.heartbeat)
58+
async def on_heartbeat(self, **kwargs):
59+
return call_result.Heartbeat(
60+
current_time=datetime.now(tz=timezone.utc).isoformat()
61+
)
62+
63+
async def main():
64+
"""Main async entry: connect to RabbitMQ and start the OCPP charge point."""
65+
connection = None
66+
67+
try:
68+
connection = await aio_pika.connect_robust(
69+
host='localhost',
70+
port=5672,
71+
virtualhost='/',
72+
login='guest',
73+
password='guest',
74+
)
75+
76+
channel = await connection.channel()
77+
exchange = await channel.declare_exchange(EXCHANGE_NAME, ExchangeType.TOPIC, durable=True)
78+
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
79+
80+
# Add bindings to route messages to our queue
81+
# Bind to receive messages for all message types (wildcard pattern)
82+
await queue.bind(exchange, routing_key='ocpp16.#')
83+
84+
# Create and start the charge point
85+
charge_point = MyChargePoint(queue, exchange)
86+
87+
logger.info('Listening on queue "%s" and publishing to exchange "%s"', QUEUE_NAME, EXCHANGE_NAME)
88+
89+
# Start the charge point (this will run forever)
90+
await charge_point.start()
91+
92+
except asyncio.CancelledError:
93+
logger.info('Shutting down...')
94+
except Exception:
95+
logger.exception('Fatal error in main')
96+
finally:
97+
if connection:
98+
await connection.close()
99+
100+
101+
if __name__ == '__main__':
102+
print('Demo OCPP Central System (AMQP-backed, Stateless Worker)')
103+
print('Publishing responses to exchange: amq.topic with routing_key = <chargePointId>')
104+
print()
105+
106+
asyncio.run(main())

src/rabbit_web_ocpp_processor.erl

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,20 @@ process_incoming(McOcpp = #ocpp_msg{},
178178
%% 2. Validate Exchange
179179
case rabbit_exchange:lookup(ExchangeName) of
180180
{ok, Exchange} ->
181-
%% 3. Prepare annotations and initialize Message Container (mc) with mc_ocpp type
182-
Anns = #{?ANN_EXCHANGE => ?DEFAULT_EXCHANGE_NAME},
181+
%% 3. Generate structured routing key (protocolver.actionname.req/conf)
182+
RoutingKey = generate_routing_key(McOcpp, State),
183+
184+
%% 4. Prepare annotations and initialize Message Container (mc) with mc_ocpp type
185+
Anns = #{?ANN_EXCHANGE => ?DEFAULT_EXCHANGE_NAME,
186+
?ANN_ROUTING_KEYS => [RoutingKey]},
183187
case mc:init(mc_ocpp, McOcpp, Anns, #{}) of
184188
McMsg ->
185-
?LOG_DEBUG("Created message container for ClientId ~ts: ~tp", [ClientId, McMsg]),
186-
%% 4. Trace (optional)
189+
?LOG_DEBUG("Created message container for ClientId ~ts with routing key ~ts: ~tp",
190+
[ClientId, RoutingKey, McMsg]),
191+
%% 5. Trace (optional)
187192
rabbit_trace:tap_in(McMsg, [ExchangeName], ConnName, User#user.username, TraceState),
188193

189-
%% 5. Publish to the exchange with the ClientId as routing key
194+
%% 6. Publish to the exchange with the structured routing key
190195
case rabbit_exchange:route(Exchange, McMsg, #{}) of
191196
ok ->
192197
?LOG_INFO("Message routed successfully for ClientId ~ts", [ClientId]),
@@ -383,6 +388,34 @@ terminate(Reason, Infos, _State = #state{queue_states = QStates,
383388

384389
%% --- Internal Functions ---
385390

391+
%% @doc Generates a structured routing key in the format: ocpp.protocolver.actionname.req/conf
392+
%% Examples: "ocpp.ocpp16.BootNotification.req", "ocpp.ocpp20.Heartbeat.conf", "ocpp.ocpp201.StatusNotification.req"
393+
-spec generate_routing_key(#ocpp_msg{}, state()) -> binary().
394+
generate_routing_key(#ocpp_msg{msg_type = MsgType, action = Action},
395+
#state{cfg = #cfg{proto_ver = ProtoVer}}) ->
396+
%% Convert protocol version atom to binary string
397+
ProtoVerBin = atom_to_binary(ProtoVer, utf8),
398+
399+
%% Handle action name (may be undefined for responses)
400+
ActionBin = case Action of
401+
undefined -> <<"response">>; % For CALLRESULT/CALLERROR without action
402+
ActionName when is_binary(ActionName) -> ActionName;
403+
_ -> <<"unknown">>
404+
end,
405+
406+
%% Determine message direction (req/conf/error)
407+
MsgTypeBin = case MsgType of
408+
?OCPP_MESSAGE_TYPE_CALL -> <<"req">>; % Request from charge point
409+
?OCPP_MESSAGE_TYPE_SEND -> <<"req">>; % Request in OCPP 2.1
410+
?OCPP_MESSAGE_TYPE_CALLRESULT -> <<"conf">>; % Response/confirmation
411+
?OCPP_MESSAGE_TYPE_CALLERROR -> <<"error">>; % Error response
412+
?OCPP_MESSAGE_TYPE_CALLRESULTERROR -> <<"error">>; % Error in OCPP 2.1
413+
_ -> <<"unknown">>
414+
end,
415+
416+
%% Construct routing key: ocpp.protocolver.actionname.req|conf|error
417+
<<ProtoVerBin/binary, ".", ActionBin/binary, ".", MsgTypeBin/binary>>.
418+
386419
%% @doc Extracts relevant metadata from a parsed OCPP message list.
387420
% -spec extract_ocpp_metadata(list()) -> {ok, MsgId :: binary(), Action :: binary() | undefined} | {error, term()}.
388421
% extract_ocpp_metadata([?OCPP_MESSAGE_TYPE_CALL, MsgId, Action, _Payload]) when is_binary(MsgId), is_binary(Action) -> {ok, MsgId, Action};
@@ -552,7 +585,7 @@ consume_from_queue(State = #state{cfg = #cfg{queue_name = QName, client_id = Cli
552585
limiter_active => false,
553586
mode => {simple_prefetch, Prefetch},
554587
consumer_tag => ?CONSUMER_TAG,
555-
exclusive_consume => false, % Allow other consumers? (Usually false for OCPP)
588+
exclusive_consume => true, % Allow other consumers? (Usually false for OCPP)
556589
args => [],
557590
ok_msg => undefined,
558591
acting_user => User#user.username},

0 commit comments

Comments
 (0)