2929 upgrade /5 ,
3030 takeover /7 ]).
3131
32- -define (APP , rabbitmq_web_ocpp ).
33-
3432-ifdef (TEST ).
3533-define (SILENT_CLOSE_DELAY , 10 ).
3634-else .
@@ -74,70 +72,47 @@ init(Req, Opts) ->
7472 % % Retrieve the vhost and client_id from URL path first
7573 Vhost = cowboy_req :binding (vhost , Req ),
7674 ClientId = cowboy_req :binding (client_id , Req ),
75+ {PeerIp , _PeerPort } = cowboy_req :peer (Req ),
7776
7877 case {Vhost , ClientId } of
7978 {<<>>, _ } ->
8079 {ok , cowboy_req :reply (404 , #{}, <<" Vhost not specified" >>, Req ), # state {}};
8180 {_ , <<>>} ->
8281 {ok , cowboy_req :reply (404 , #{}, <<" Client ID not specified" >>, Req ), # state {}};
83- {V1 , CId } when V1 =:= <<>> orelse CId =:= <<>> ->
84- {ok , cowboy_req :reply (404 , #{}, <<" Invalid Vhost or Client ID" >>, Req ), # state {}};
85- {V2 , CId } ->
86- % % ClientId is valid, now check subprotocol
87- case cowboy_req :parse_header (<<" sec-websocket-protocol" >>, Req ) of
88- undefined ->
89- no_supported_sub_protocol (undefined , ClientId , Req );
90- ProtocolList ->
91- % % Map protocols to their atom representations, filtering out unsupported ones
92- SupportedProtos = [{Proto , ? OCPP_PROTO_TO_ATOM (Proto )} ||
93- Proto <- ProtocolList ,
94- ? OCPP_PROTO_TO_ATOM (Proto ) =/= undefined ],
95-
96- case SupportedProtos of
97- [] ->
98- no_supported_sub_protocol (ProtocolList , ClientId , Req );
99- [{MatchingProtocol , ProtocolVer }|_ ] ->
100- % % First supported protocol is selected (preserving client preference order)
101- Req1 = cowboy_req :set_resp_header (<<" sec-websocket-protocol" >>,
102- MatchingProtocol , Req ),
103- AuthHd = cowboy_req :header (<<" authorization" >>, Req , <<>>),
104- case AuthHd of
105- <<>> ->
106- % % No Authorization header, request credentials
107- Headers = #{<<" www-authenticate" >> => <<" Basic realm=\" RabbitMQ\" " >>},
108- {ok , cowboy_req :reply (401 , Headers , <<" Unauthorized" >>, Req ), # state {}};
109- _ ->
110- case cow_http_hd :parse_authorization (AuthHd ) of
111- {basic , Username , Password } ->
112- % % Perform authentication check here
113- case rabbit_access_control :check_user_login (
114- Username , [{password , Password }]) of
115- {ok , User } ->
116- State0 = # state {socket = maps :get (proxy_header , Req , undefined ),
117- proto_ver = ProtocolVer ,
118- vhost = V2 ,
119- user = User ,
120- client_id = CId },
121- WsOpts0 = proplists :get_value (ws_opts , Opts , #{}),
122- IdleTimeoutMs = maps :get (idle_timeout , WsOpts0 , ? DEFAULT_IDLE_TIMEOUT_MS ),
123- WsOpts = maps :merge (#{compress => true , idle_timeout => IdleTimeoutMs }, WsOpts0 ),
124- IdleTimeoutS = case IdleTimeoutMs of
125- infinity -> 0 ;
126- Ms -> Ms div 1000
127- end ,
128- State = State0 # state {idle_timeout = IdleTimeoutS },
129- {? MODULE , Req1 , State , WsOpts };
130- {error , _Reason } ->
131- Headers = #{<<" www-authenticate" >> => <<" Basic realm=\" RabbitMQ\" " >>},
132- {ok , cowboy_req :reply (401 , Headers , <<" Unauthorized" >>, Req ), # state {}}
133- end ;
134- _ ->
135- % % Invalid Authorization header format
136- Headers = #{<<" www-authenticate" >> => <<" Basic realm=\" RabbitMQ\" " >>},
137- {ok , cowboy_req :reply (401 , Headers , <<" Unauthorized" >>, Req ), # state {}}
138- end
139- end
140- end
82+ _ ->
83+ {Username0 , Password0 } = basic_auth_creds (Req ),
84+ SslLoginName = none ,
85+ Result = maybe
86+ ok ?= check_vhost_exists (Vhost , ClientId , PeerIp ),
87+ ok ?= check_vhost_alive (Vhost ),
88+ {ProtoVer , Req1 } ?= pick_protocol (Req , ClientId ),
89+ {ok , Username1 , Password1 } ?= check_credentials (Username0 , Password0 , SslLoginName , PeerIp ),
90+ {ok , User0 } ?= check_user_login (Vhost , Username1 , Password1 , ClientId , PeerIp ),
91+ AuthzCtx = #{<<" client_id" >> => ClientId , <<" protocol" >> => <<" ocpp" >>},
92+ ok ?= check_vhost_access (Vhost , User0 , ClientId , PeerIp , AuthzCtx ),
93+ {ok , Req1 , Vhost , ClientId , User0 , ProtoVer }
94+ end ,
95+ case Result of
96+ {ok , Req2 , V2 , CId , User , ProtocolVer } ->
97+ ProxyInfo = maps :get (proxy_header , Req2 , undefined ),
98+ WsOpts0 = proplists :get_value (ws_opts , Opts , #{}),
99+ IdleMs = maps :get (idle_timeout , WsOpts0 , ? DEFAULT_IDLE_TIMEOUT_MS ),
100+ WsOpts = maps :merge (#{compress => true , idle_timeout => IdleMs }, WsOpts0 ),
101+ IdleSec = case IdleMs of infinity -> 0 ; Ms -> Ms div 1000 end ,
102+ State = # state {socket = ProxyInfo , proto_ver = ProtocolVer , vhost = V2 ,
103+ user = User , client_id = CId , idle_timeout = IdleSec },
104+ {? MODULE , Req2 , State , WsOpts };
105+ {error , bad_vhost } ->
106+ {ok , cowboy_req :reply (404 , #{}, <<" Invalid Vhost" >>, Req ), # state {}};
107+ {error , vhost_down } ->
108+ {ok , cowboy_req :reply (503 , #{}, <<" Vhost is down" >>, Req ), # state {}};
109+ {error , invalid_subprotocol } ->
110+ {ok , cowboy_req :reply (400 , #{<<" connection" >> => <<" close" >>},
111+ <<" Unsupported or missing OCPP subprotocol" >>, Req ), # state {}};
112+ {error , _ } ->
113+ {ok , cowboy_req :reply (401 ,
114+ #{<<" www-authenticate" >> => <<" Basic realm=\" OCPP\" " >>},
115+ <<" Unauthorized" >>, Req ), # state {}}
141116 end
142117 end .
143118
@@ -156,8 +131,8 @@ info(Pid, Items) ->
156131 Res .
157132-spec websocket_init (state ()) ->
158133 {cowboy_websocket :commands (), state ()} |
159- {cowboy_websocket :commands (), state () , hibernate }.
160- websocket_init (State0 = # state {socket = Socket , vhost = Vhost , client_id = ClientId , proto_ver = ProtoVer }) ->
134+ {cowboy_websocket :commands (), state , hibernate }.
135+ websocket_init (State0 = # state {socket = Socket , vhost = Vhost , client_id = ClientId , user = User , proto_ver = ProtoVer }) ->
161136 logger :set_process_metadata (#{domain => ? RMQLOG_DOMAIN_CONN ++ [web_ocpp ]}),
162137 case rabbit_net :connection_string (Socket , inbound ) of
163138 {ok , ConnStr } ->
@@ -166,8 +141,9 @@ websocket_init(State0 = #state{socket = Socket, vhost = Vhost, client_id = Clien
166141 State1 = State0 # state {conn_name = ConnName },
167142 State2 = rabbit_event :init_stats_timer (State1 , # state .stats_timer ),
168143 % Inside `init` of the processor "connection_created" is called for management UI to show the connection
169- case rabbit_web_ocpp_processor :init (Vhost , ClientId , ProtoVer , rabbit_net :unwrap_socket (Socket ),
170- ConnName , fun send_reply /1 ) of
144+ case rabbit_web_ocpp_processor :init (Vhost , ClientId , ProtoVer ,
145+ rabbit_net :unwrap_socket (Socket ),
146+ ConnName , User , fun send_reply /1 ) of
171147 {ok , ProcState } ->
172148 ? LOG_INFO (" Accepted Web OCPP connection ~ts for client ID ~ts " ,
173149 [ConnName , ClientId ]),
@@ -321,7 +297,7 @@ websocket_info(Msg, State) ->
321297 {[], State , hibernate }.
322298
323299terminate (Reason , _Request , # state {conn_name = ConnName ,
324- proc_state = PState , % Can be undefined if init failed
300+ proc_state = PState , % Can be undefined if init crashed
325301 client_id = ClientId } = State ) ->
326302 ? LOG_INFO (" Web OCPP closing connection ~ts for client ID ~p " , [ConnName , ClientId ]),
327303 maybe_emit_stats (State ),
@@ -331,7 +307,12 @@ terminate(Reason, _Request, #state{conn_name = ConnName,
331307 _ ->
332308 Infos = infos (? EVENT_KEYS , State ),
333309 rabbit_web_ocpp_processor :terminate (Reason , Infos , PState )
334- end .
310+ end ;
311+
312+ terminate (Reason , _Request , Opts ) ->
313+ % % Fallback clause when init crashed before state record was established
314+ ? LOG_INFO (" Web OCPP closing connection. Reason: ~p Opts: ~p " , [Reason , Opts ]),
315+ ok .
335316
336317% % Internal.
337318
@@ -348,6 +329,35 @@ ssl_login_name(Sock) ->
348329 nossl -> none
349330 end .
350331
332+ pick_protocol (Req , ClientId ) ->
333+ % % The client MUST include a valid ocpp version in the list of
334+ % % WebSocket Sub Protocols it offers [OCPP 1.6 JSON spec §3.1.2].
335+ case cowboy_req :parse_header (<<" sec-websocket-protocol" >>, Req ) of
336+ undefined ->
337+ ? LOG_ERROR (" Web OCPP: missing subprotocol list for client ~p " , [ClientId ]),
338+ {error , invalid_subprotocol };
339+ ProtoList ->
340+ case [ {P , ? OCPP_PROTO_TO_ATOM (P )} || P <- ProtoList ,
341+ ? OCPP_PROTO_TO_ATOM (P ) =/= undefined ] of
342+ [] ->
343+ ? LOG_ERROR (" Web OCPP: no supported ocppX.X subprotocol in ~p for client ~p " ,
344+ [ProtoList , ClientId ]),
345+ {error , invalid_subprotocol };
346+ [{Matched , Ver }|_ ] ->
347+ {Ver , cowboy_req :set_resp_header (<<" sec-websocket-protocol" >>, Matched , Req )}
348+ end
349+ end .
350+
351+ basic_auth_creds (Req ) ->
352+ case cowboy_req :header (<<" authorization" >>, Req , <<>>) of
353+ <<>> -> {undefined , undefined };
354+ H ->
355+ case cow_http_hd :parse_authorization (H ) of
356+ {basic , U , P } -> {U , P };
357+ _ -> {undefined , undefined }
358+ end
359+ end .
360+
351361check_credentials (Username , Password , SslLoginName , PeerIp ) ->
352362 case creds (Username , Password , SslLoginName ) of
353363 {ok , _ , _ } = Ok ->
@@ -369,7 +379,7 @@ check_credentials(Username, Password, SslLoginName, PeerIp) ->
369379creds (User , Pass , SSLLoginName ) ->
370380 CredentialsProvided = User =/= undefined orelse Pass =/= undefined ,
371381 ValidCredentials = is_binary (User ) andalso is_binary (Pass ) andalso Pass =/= <<>>,
372- { ok , TLSAuth } = application :get_env (? APP_NAME , ssl_cert_login ),
382+ TLSAuth = application :get_env (? APP_NAME , ssl_cert_login , false ),
373383 SSLLoginProvided = TLSAuth =:= true andalso SSLLoginName =/= none ,
374384
375385 case {CredentialsProvided , ValidCredentials , SSLLoginProvided } of
@@ -380,11 +390,11 @@ creds(User, Pass, SSLLoginName) ->
380390 % % Either username or password is provided
381391 {invalid_creds , {User , Pass }};
382392 {false , false , true } ->
383- % % rabbitmq_mqtt .ssl_cert_login is true. SSL user name provided.
393+ % % rabbitmq_web_ocpp .ssl_cert_login is true. SSL user name provided.
384394 % % Authenticating using username only.
385395 {ok , SSLLoginName , none };
386396 {false , false , false } ->
387- { ok , AllowAnon } = application :get_env (? APP_NAME , allow_anonymous ),
397+ AllowAnon = application :get_env (? APP_NAME , allow_anonymous , false ),
388398 case AllowAnon of
389399 true ->
390400 case rabbit_auth_mechanism_anonymous :credentials () of
@@ -400,19 +410,57 @@ creds(User, Pass, SSLLoginName) ->
400410 nocreds
401411 end .
402412
413+ check_vhost_exists (Vhost , UsernameForLog , PeerIp ) ->
414+ case rabbit_vhost :exists (Vhost ) of
415+ true -> ok ;
416+ false ->
417+ ? LOG_ERROR (" OCPP connection failed: vhost '~s ' does not exist" , [Vhost ]),
418+ auth_attempt_failed (PeerIp , UsernameForLog ),
419+ {error , bad_vhost }
420+ end .
421+
422+ check_vhost_alive (Vhost ) ->
423+ case rabbit_vhost_sup_sup :is_vhost_alive (Vhost ) of
424+ true -> ok ;
425+ false ->
426+ ? LOG_ERROR (" OCPP connection failed: vhost '~s ' is down" , [Vhost ]),
427+ {error , vhost_down }
428+ end .
429+
430+ check_vhost_access (Vhost , User , _ClientId , PeerIp , AuthzCtx ) ->
431+ try rabbit_access_control :check_vhost_access (User , Vhost , {ip , PeerIp }, AuthzCtx ) of
432+ ok -> ok
433+ catch exit :# amqp_error {name = not_allowed , explanation = Msg } ->
434+ ? LOG_ERROR (" OCPP vhost access refused for user '~s ' to vhost '~s ': ~s " ,
435+ [User # user .username , Vhost , Msg ]),
436+ auth_attempt_failed (PeerIp , User # user .username ),
437+ {error , access_refused }
438+ end .
439+
440+ check_user_login (Vhost , Username , Password , ClientId , PeerIp ) ->
441+ % % For OCPP, Password might be 'none' if using cert auth or no auth
442+ EffectivePassword = case Password of none -> <<>> ; _ -> Password end ,
443+ AuthProps = [{vhost , Vhost }, {client_id , ClientId }, {password , EffectivePassword }],
444+ case rabbit_access_control :check_user_login (Username , AuthProps ) of
445+ {ok , User = # user {username = RabbitUser }} ->
446+ notify_auth_result (user_authentication_success , RabbitUser , PeerIp ),
447+ {ok , User };
448+ {refused , UserForLog , Msg , Args } ->
449+ ? LOG_ERROR (" OCPP login failed for user '~s ': " ++ Msg , [UserForLog | Args ]),
450+ notify_auth_result (user_authentication_failure , UserForLog , PeerIp ),
451+ auth_attempt_failed (PeerIp , UserForLog ),
452+ {error , authentication_failure }
453+ end .
454+
455+ notify_auth_result (Event , Username , PeerIp ) ->
456+ rabbit_event :notify (Event , [{name , Username }, {peer_id , PeerIp },
457+ {connection_type , network }, {protocol , ocpp }]).
458+
403459-spec auth_attempt_failed (inet :ip_address (), binary ()) -> ok .
404460auth_attempt_failed (PeerIp , Username ) ->
405461 rabbit_core_metrics :auth_attempt_failed (PeerIp , Username , ocpp ),
406462 timer :sleep (? SILENT_CLOSE_DELAY ).
407463
408- no_supported_sub_protocol (Protocol , ClientId , Req ) ->
409- % % The client MUST include a valid ocpp version in the list of
410- % % WebSocket Sub Protocols it offers [OCPP 1.6 JSON spec §3.1.2].
411- ? LOG_ERROR (" Web OCPP: Invalid 'ocppX.X' version included in client (~p ) offered subprotocols: ~tp " , [ClientId , Protocol ]),
412- {ok ,
413- cowboy_req :reply (400 , #{<<" connection" >> => <<" close" >>}, Req ),
414- # state {}}.
415-
416464% % Allow DISCONNECT packet to be sent to client before closing the connection.
417465defer_close (CloseStatusCode ) ->
418466 self () ! {stop , CloseStatusCode , server_initiated_disconnect },
0 commit comments