From 8c4edc8807ae53b7cc8d01ace0b895886bed2560 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Jun 2026 13:38:47 +0200 Subject: [PATCH 1/8] Add GOPACSHandler test scaffold and test-support constructor --- .../ems/manager/gopacs/GOPACSHandler.java | 46 ++++++++ .../manager/gopacs/GOPACSHandlerTest.groovy | 108 ++++++++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy diff --git a/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java b/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java index 0871d66..ba098b5 100644 --- a/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java +++ b/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java @@ -211,6 +211,52 @@ protected GOPACSHandler(String contractedEAN, String realm, String electricitySu deploy(container); } + /** + * Test-support constructor. Wires the message-processing collaborators directly and skips + * remote configuration (OAuth client, private-key file) and JAX-RS deployment, so the + * day-ahead UFTP message flow can be exercised in isolation. Not used in production wiring. + */ + protected GOPACSHandler(String contractedEAN, + String realm, + String electricitySupplierAssetId, + AssetProcessingService assetProcessingService, + AssetPredictedDatapointService assetPredictedDatapointService, + TimerService timerService, + ScheduledExecutorService scheduledExecutorService, + String privateKey) { + this.devMode = false; + this.contractedEAN = contractedEAN; + this.realm = realm; + this.electricitySupplierAssetId = electricitySupplierAssetId; + this.participants = new HashMap<>(); + + this.assetProcessingService = assetProcessingService; + this.assetPredictedDatapointService = assetPredictedDatapointService; + this.timerService = timerService; + this.scheduledExecutorService = scheduledExecutorService; + this.webService = null; + + this.gopacsBrokerUrl = ""; + this.responseDelaySeconds = 0; + this.flexOfferDelaySeconds = 0; + this.clientId = null; + this.clientSecret = null; + this.privateKey = privateKey; + + this.client = null; + this.gopacsAddressBookResource = null; + this.gopacsAuthResource = null; + this.gopacsServerResource = null; + + this.participantResolutionService = new ParticipantResolutionService(this); + this.cryptoService = new UftpCryptoService(participantResolutionService, new LazySodiumFactory(), new LazySodiumBase64Pool()); + this.uftpValidationService = new UftpValidationService(new ArrayList<>()); + this.uftpReceivedMessageService = new UftpReceivedMessageService(uftpValidationService, this); + this.uftpSendMessageService = new UftpSendMessageService(serializer, cryptoService, participantResolutionService, this, uftpValidationService); + + this.objectMapper = new ObjectMapper(); + } + protected static String getDeploymentName(String contractedEAN) { return "GOPACS: " + contractedEAN; } diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy new file mode 100644 index 0000000..efc1470 --- /dev/null +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy @@ -0,0 +1,108 @@ +/* + * Copyright 2026, OpenRemote Inc. + * + * See the CONTRIBUTORS.txt file in the distribution for a + * full listing of individual contributors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.openremote.extension.ems.manager.gopacs + +import com.goterl.lazysodium.LazySodiumJava +import com.goterl.lazysodium.SodiumJava +import com.goterl.lazysodium.utils.KeyPair +import org.lfenergy.shapeshifter.api.* +import org.lfenergy.shapeshifter.api.model.UftpParticipantInformation +import org.lfenergy.shapeshifter.core.model.OutgoingUftpMessage +import org.lfenergy.shapeshifter.core.model.UftpParticipant +import org.openremote.manager.asset.AssetProcessingService +import org.openremote.manager.datapoint.AssetPredictedDatapointService +import org.openremote.container.timer.TimerService +import spock.lang.Specification + +import java.time.Duration +import java.time.LocalDate +import java.time.OffsetDateTime +import java.time.ZoneOffset +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit + +class GOPACSHandlerTest extends Specification { + + static final String CONTRACTED_EAN = "871234567890123456" + static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu" + static final String AGR_DOMAIN = "agr.example.com" + static final String DSO_DOMAIN = "dso.example.com" + static final String TIME_ZONE = "Europe/Amsterdam" + static final LocalDate PERIOD = LocalDate.of(2026, 6, 4) + + AssetProcessingService assetProcessingService + AssetPredictedDatapointService assetPredictedDatapointService + TimerService timerService + ScheduledExecutorService executor + RecordingGOPACSHandler handler + String pubB64 + + def setup() { + assetProcessingService = Mock(AssetProcessingService) + assetPredictedDatapointService = Mock(AssetPredictedDatapointService) + timerService = Stub(TimerService) { + getCurrentTimeMillis() >> PERIOD.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() + } + // Run every scheduled task inline so processRawMessage is fully synchronous. + executor = Stub(ScheduledExecutorService) { + schedule(_ as Runnable, _ as Long, _ as TimeUnit) >> { Runnable r, long d, TimeUnit u -> + r.run() + Stub(ScheduledFuture) + } + } + + // Generate an ed25519 keypair; hex secret signs, base64 public verifies. + def lazySodium = new LazySodiumJava(new SodiumJava()) + KeyPair kp = lazySodium.cryptoSignKeypair() + String privHex = kp.secretKey.asHexString + pubB64 = Base64.encoder.encodeToString(kp.publicKey.asBytes) + + handler = new RecordingGOPACSHandler(CONTRACTED_EAN, "master", ASSET_ID, + assetProcessingService, assetPredictedDatapointService, timerService, executor, privHex) + + // Pre-seed the DSO participant so signature verification never makes an HTTP call. + handler.participants.put(DSO_DOMAIN, + new UftpParticipantInformation(DSO_DOMAIN, pubB64, "https://dso.example.com/endpoint", true)) + } + + def "handler is constructed via the test-support constructor and is in scope for its contracted EAN"() { + expect: + handler != null + handler.isWithinContractedScope("FlexRequest", "conv-1", CONTRACTED_EAN) + !handler.isWithinContractedScope("FlexRequest", "conv-1", "999999999999999999") + } + + // ---- Test subclass: records outbound messages instead of signing/sending them ---- + static class RecordingGOPACSHandler extends GOPACSHandler { + final List sent = new ArrayList<>() + + RecordingGOPACSHandler(String ean, String realm, String assetId, + AssetProcessingService aps, AssetPredictedDatapointService apds, + TimerService ts, ScheduledExecutorService exec, String privateKey) { + super(ean, realm, assetId, aps, apds, ts, exec, privateKey) + } + + @Override + void notifyNewOutgoingMessage(OutgoingUftpMessage message) { + sent.add(message.payloadMessage()) + } + } +} From ae4f0f122dec2494d62d2205ef2decad07f41460 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Jun 2026 13:42:58 +0200 Subject: [PATCH 2/8] Add GOPACSHandler FlexRequest flow test --- .../manager/gopacs/GOPACSHandlerTest.groovy | 73 ++++++++++++++++++- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy index efc1470..fac0dea 100644 --- a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy @@ -41,7 +41,7 @@ import java.util.concurrent.TimeUnit class GOPACSHandlerTest extends Specification { - static final String CONTRACTED_EAN = "871234567890123456" + static final String CONTRACTED_EAN = "ean.871234567890123456" static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu" static final String AGR_DOMAIN = "agr.example.com" static final String DSO_DOMAIN = "dso.example.com" @@ -69,14 +69,15 @@ class GOPACSHandlerTest extends Specification { } } - // Generate an ed25519 keypair; hex secret signs, base64 public verifies. + // Generate an ed25519 keypair. The handler's crypto pool (LazySodiumBase64Pool) decodes + // both the signing secret key and the verifying public key as base64. def lazySodium = new LazySodiumJava(new SodiumJava()) KeyPair kp = lazySodium.cryptoSignKeypair() - String privHex = kp.secretKey.asHexString + String privKeyB64 = Base64.encoder.encodeToString(kp.secretKey.asBytes) pubB64 = Base64.encoder.encodeToString(kp.publicKey.asBytes) handler = new RecordingGOPACSHandler(CONTRACTED_EAN, "master", ASSET_ID, - assetProcessingService, assetPredictedDatapointService, timerService, executor, privHex) + assetProcessingService, assetPredictedDatapointService, timerService, executor, privKeyB64) // Pre-seed the DSO participant so signature verification never makes an HTTP call. handler.participants.put(DSO_DOMAIN, @@ -90,6 +91,70 @@ class GOPACSHandlerTest extends Specification { !handler.isWithinContractedScope("FlexRequest", "conv-1", "999999999999999999") } + // Signs the payload as the DSO and feeds the transport XML through the real entry point. + private void signAndProcess(PayloadMessageType payload) { + def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO) + String payloadXml = GOPACSHandler.serializer.toXml(payload) + SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, handler.privateKey) + String transportXml = GOPACSHandler.serializer.toXml(signed) + handler.processRawMessage(transportXml) + } + + private static void applyHeader(PayloadMessageType m) { + m.setVersion("3.0.0") + m.setSenderDomain(DSO_DOMAIN) + m.setRecipientDomain(AGR_DOMAIN) + m.setTimeStamp(OffsetDateTime.now(ZoneOffset.UTC)) + m.setMessageID(UUID.randomUUID().toString()) + m.setConversationID(UUID.randomUUID().toString()) + } + + private static FlexRequestISPType reqIsp(long start, long maxPower, long minPower) { + def isp = new FlexRequestISPType() + isp.setDisposition(AvailableRequestedType.REQUESTED) + isp.setStart(start) + isp.setDuration(1L) + isp.setMaxPower(maxPower) + isp.setMinPower(minPower) + return isp + } + + private static FlexRequest buildFlexRequest(String congestionPoint) { + def fr = new FlexRequest() + applyHeader(fr) + fr.setISPDuration(Duration.ofMinutes(15)) + fr.setTimeZone(TIME_ZONE) + fr.setPeriod(PERIOD) + fr.setCongestionPoint(congestionPoint) + fr.setExpirationDateTime(OffsetDateTime.now(ZoneOffset.UTC).plusHours(6)) + fr.setRevision(1L) + fr.setContractID("contract-1") + fr.getISPS().add(reqIsp(1L, 5000L, -3000L)) // importMax 5.0, exportMax 3.0 + fr.getISPS().add(reqIsp(2L, 6000L, -4000L)) // importMax 6.0, exportMax 4.0 + return fr + } + + def "FlexRequest updates the asset from request ISPs and replies with FlexRequestResponse then FlexOffer"() { + when: "a signed in-scope FlexRequest is processed" + signAndProcess(buildFlexRequest(CONTRACTED_EAN)) + + then: "predicted datapoints are written: max=importMax, min=exportMax" + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerMaximumFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [5.0d, 6.0d] + }) + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerMinimumFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [3.0d, 4.0d] + }) + + and: "a FlexRequestResponse (Accepted) is sent, followed by a FlexOffer for the same congestion point" + handler.sent.size() == 2 + handler.sent[0] instanceof FlexRequestResponse + ((FlexRequestResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED + handler.sent[1] instanceof FlexOffer + ((FlexOffer) handler.sent[1]).congestionPoint == CONTRACTED_EAN + !((FlexOffer) handler.sent[1]).offerOptions.isEmpty() + } + // ---- Test subclass: records outbound messages instead of signing/sending them ---- static class RecordingGOPACSHandler extends GOPACSHandler { final List sent = new ArrayList<>() From ee8fd1745c46afbd4a270401736d5fde7762113f Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Jun 2026 13:43:46 +0200 Subject: [PATCH 3/8] Add GOPACSHandler FlexOfferResponse no-reply test --- .../manager/gopacs/GOPACSHandlerTest.groovy | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy index fac0dea..1853378 100644 --- a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy @@ -155,6 +155,30 @@ class GOPACSHandlerTest extends Specification { !((FlexOffer) handler.sent[1]).offerOptions.isEmpty() } + private static FlexOfferResponse buildFlexOfferResponse(AcceptedRejectedType result) { + def r = new FlexOfferResponse() + applyHeader(r) + r.setFlexOfferMessageID(UUID.randomUUID().toString()) + r.setResult(result) + if (result == AcceptedRejectedType.REJECTED) { + r.setRejectionReason("insufficient flexibility") + } + return r + } + + def "FlexOfferResponse (#result) is handled without mutating the asset or sending a reply"() { + when: "a signed FlexOfferResponse is processed" + signAndProcess(buildFlexOfferResponse(result)) + + then: "no asset mutation and no outbound message" + 0 * assetPredictedDatapointService.updateValues(_, _, _) + 0 * assetProcessingService.sendAttributeEvent(_, _) + handler.sent.isEmpty() + + where: + result << [AcceptedRejectedType.ACCEPTED, AcceptedRejectedType.REJECTED] + } + // ---- Test subclass: records outbound messages instead of signing/sending them ---- static class RecordingGOPACSHandler extends GOPACSHandler { final List sent = new ArrayList<>() From 7300781a4543e5c1ac996515e77f80c3359b8d0e Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Jun 2026 13:44:38 +0200 Subject: [PATCH 4/8] Add GOPACSHandler FlexOrder offtake and feed-in flow tests --- .../manager/gopacs/GOPACSHandlerTest.groovy | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy index 1853378..fbb6507 100644 --- a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy @@ -179,6 +179,69 @@ class GOPACSHandlerTest extends Specification { result << [AcceptedRejectedType.ACCEPTED, AcceptedRejectedType.REJECTED] } + private static FlexOrderISPType orderIsp(long start, long power) { + def isp = new FlexOrderISPType() + isp.setStart(start) + isp.setDuration(1L) + isp.setPower(power) + return isp + } + + private static FlexOrder buildFlexOrder(String congestionPoint, List powers) { + def fo = new FlexOrder() + applyHeader(fo) + fo.setISPDuration(Duration.ofMinutes(15)) + fo.setTimeZone(TIME_ZONE) + fo.setPeriod(PERIOD) + fo.setCongestionPoint(congestionPoint) + fo.setFlexOfferMessageID(UUID.randomUUID().toString()) + fo.setOrderReference(UUID.randomUUID().toString()) + fo.setContractID("contract-1") + fo.setCurrency("EUR") + fo.setPrice(new BigDecimal("0.00")) + long start = 1L + powers.each { p -> fo.getISPS().add(orderIsp(start++, p)) } + return fo + } + + def "FlexOrder with offtake power updates currentPower and the max-profile and replies with FlexOrderResponse"() { + when: "a signed in-scope FlexOrder with positive (offtake) power is processed" + signAndProcess(buildFlexOrder(CONTRACTED_EAN, [4000L, 8000L])) // 4.0, 8.0 kW + + then: "current power and the offtake (max) profile are written; the feed-in (min) profile is not" + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [4.0d, 8.0d] + }) + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMaximumProfileFlexOrder", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [4.0d, 8.0d] + }) + 0 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMinimumProfileFlexOrder", _) + + and: "a FlexOrderResponse (Accepted) is sent back" + handler.sent.size() == 1 + handler.sent[0] instanceof FlexOrderResponse + ((FlexOrderResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED + } + + def "FlexOrder with feed-in power updates currentPower and the min-profile"() { + when: "a signed in-scope FlexOrder with negative (feed-in) power is processed" + signAndProcess(buildFlexOrder(CONTRACTED_EAN, [-2000L, -5000L])) // -2.0, -5.0 kW + + then: "current power and the feed-in (min) profile are written; the offtake (max) profile is not" + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [-2.0d, -5.0d] + }) + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMinimumProfileFlexOrder", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [-2.0d, -5.0d] + }) + 0 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMaximumProfileFlexOrder", _) + + and: "a FlexOrderResponse (Accepted) is sent back" + handler.sent.size() == 1 + handler.sent[0] instanceof FlexOrderResponse + ((FlexOrderResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED + } + // ---- Test subclass: records outbound messages instead of signing/sending them ---- static class RecordingGOPACSHandler extends GOPACSHandler { final List sent = new ArrayList<>() From 3f88e1538770f00814e1e925fa334cee390c9a46 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Jun 2026 13:45:45 +0200 Subject: [PATCH 5/8] Add GOPACSHandler out-of-scope congestion point drop test --- .../ems/manager/gopacs/GOPACSHandlerTest.groovy | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy index fbb6507..dfcfa67 100644 --- a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy @@ -242,6 +242,19 @@ class GOPACSHandlerTest extends Specification { ((FlexOrderResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED } + def "a validly-signed FlexRequest for a different congestion point is dropped with no mutation and no reply"() { + given: "a FlexRequest whose congestion point is not the contracted EAN" + def otherEan = "ean.999999999999999999" + + when: "the signed out-of-scope FlexRequest is processed" + signAndProcess(buildFlexRequest(otherEan)) + + then: "the asset is not mutated and nothing is sent back" + 0 * assetPredictedDatapointService.updateValues(_, _, _) + 0 * assetProcessingService.sendAttributeEvent(_, _) + handler.sent.isEmpty() + } + // ---- Test subclass: records outbound messages instead of signing/sending them ---- static class RecordingGOPACSHandler extends GOPACSHandler { final List sent = new ArrayList<>() From 610007ef16c106faf30741d2a6e653f2374f1976 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Jun 2026 15:36:27 +0200 Subject: [PATCH 6/8] Restyle GOPACS handler tests to embedded UFTP XML fixtures matching the example message format --- .../manager/gopacs/GOPACSHandlerTest.groovy | 192 ++++++++---------- 1 file changed, 82 insertions(+), 110 deletions(-) diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy index dfcfa67..98bdfb0 100644 --- a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy @@ -26,34 +26,43 @@ import org.lfenergy.shapeshifter.api.* import org.lfenergy.shapeshifter.api.model.UftpParticipantInformation import org.lfenergy.shapeshifter.core.model.OutgoingUftpMessage import org.lfenergy.shapeshifter.core.model.UftpParticipant +import org.openremote.container.timer.TimerService import org.openremote.manager.asset.AssetProcessingService import org.openremote.manager.datapoint.AssetPredictedDatapointService -import org.openremote.container.timer.TimerService import spock.lang.Specification -import java.time.Duration import java.time.LocalDate -import java.time.OffsetDateTime import java.time.ZoneOffset import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit +/** + * Exercises the day-ahead UFTP message flow handled by {@link GOPACSHandler} on the AGR (EMS) side. + * + * Real signed messages are driven through the handler's actual entry point ({@code processRawMessage}) + * using libsodium crypto -- no database, no container. The embedded payload XML matches the wire format + * of the example messages (an outbound FlexRequestResponse + its SignedMessage envelope), with the + * inbound direction being DSO (nilsgrid.net) -> AGR (openremote.io). Outbound messages -- both the + * library-generated responses and the handler-built FlexOffer -- are captured by overriding + * {@code notifyNewOutgoingMessage}. + */ class GOPACSHandlerTest extends Specification { static final String CONTRACTED_EAN = "ean.871234567890123456" static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu" - static final String AGR_DOMAIN = "agr.example.com" - static final String DSO_DOMAIN = "dso.example.com" - static final String TIME_ZONE = "Europe/Amsterdam" + static final String AGR_DOMAIN = "openremote.io" // our AGR: recipient of inbound, sender of replies + static final String DSO_DOMAIN = "nilsgrid.net" // the DSO: sender of inbound, recipient of replies static final LocalDate PERIOD = LocalDate.of(2026, 6, 4) + // IDs taken from the provided example payload.xml so that file is literally the expected FlexRequest reply. + static final String FLEX_REQUEST_MESSAGE_ID = "b3030b8b-2f45-43f4-8bf3-f00ef6fd74fc" + static final String CONVERSATION_ID = "5f7c9c4d-5988-4a17-a479-2f716051fd6d" AssetProcessingService assetProcessingService AssetPredictedDatapointService assetPredictedDatapointService TimerService timerService ScheduledExecutorService executor RecordingGOPACSHandler handler - String pubB64 def setup() { assetProcessingService = Mock(AssetProcessingService) @@ -69,74 +78,27 @@ class GOPACSHandlerTest extends Specification { } } - // Generate an ed25519 keypair. The handler's crypto pool (LazySodiumBase64Pool) decodes - // both the signing secret key and the verifying public key as base64. + // Generate an ed25519 keypair. The handler's crypto pool (LazySodiumBase64Pool) decodes both the + // signing secret key and the verifying public key as base64. def lazySodium = new LazySodiumJava(new SodiumJava()) KeyPair kp = lazySodium.cryptoSignKeypair() String privKeyB64 = Base64.encoder.encodeToString(kp.secretKey.asBytes) - pubB64 = Base64.encoder.encodeToString(kp.publicKey.asBytes) + String pubB64 = Base64.encoder.encodeToString(kp.publicKey.asBytes) handler = new RecordingGOPACSHandler(CONTRACTED_EAN, "master", ASSET_ID, assetProcessingService, assetPredictedDatapointService, timerService, executor, privKeyB64) - // Pre-seed the DSO participant so signature verification never makes an HTTP call. + // Pre-seed the DSO participant so inbound signature verification never makes an HTTP call. handler.participants.put(DSO_DOMAIN, - new UftpParticipantInformation(DSO_DOMAIN, pubB64, "https://dso.example.com/endpoint", true)) - } - - def "handler is constructed via the test-support constructor and is in scope for its contracted EAN"() { - expect: - handler != null - handler.isWithinContractedScope("FlexRequest", "conv-1", CONTRACTED_EAN) - !handler.isWithinContractedScope("FlexRequest", "conv-1", "999999999999999999") - } - - // Signs the payload as the DSO and feeds the transport XML through the real entry point. - private void signAndProcess(PayloadMessageType payload) { - def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO) - String payloadXml = GOPACSHandler.serializer.toXml(payload) - SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, handler.privateKey) - String transportXml = GOPACSHandler.serializer.toXml(signed) - handler.processRawMessage(transportXml) - } - - private static void applyHeader(PayloadMessageType m) { - m.setVersion("3.0.0") - m.setSenderDomain(DSO_DOMAIN) - m.setRecipientDomain(AGR_DOMAIN) - m.setTimeStamp(OffsetDateTime.now(ZoneOffset.UTC)) - m.setMessageID(UUID.randomUUID().toString()) - m.setConversationID(UUID.randomUUID().toString()) - } - - private static FlexRequestISPType reqIsp(long start, long maxPower, long minPower) { - def isp = new FlexRequestISPType() - isp.setDisposition(AvailableRequestedType.REQUESTED) - isp.setStart(start) - isp.setDuration(1L) - isp.setMaxPower(maxPower) - isp.setMinPower(minPower) - return isp - } - - private static FlexRequest buildFlexRequest(String congestionPoint) { - def fr = new FlexRequest() - applyHeader(fr) - fr.setISPDuration(Duration.ofMinutes(15)) - fr.setTimeZone(TIME_ZONE) - fr.setPeriod(PERIOD) - fr.setCongestionPoint(congestionPoint) - fr.setExpirationDateTime(OffsetDateTime.now(ZoneOffset.UTC).plusHours(6)) - fr.setRevision(1L) - fr.setContractID("contract-1") - fr.getISPS().add(reqIsp(1L, 5000L, -3000L)) // importMax 5.0, exportMax 3.0 - fr.getISPS().add(reqIsp(2L, 6000L, -4000L)) // importMax 6.0, exportMax 4.0 - return fr + new UftpParticipantInformation(DSO_DOMAIN, pubB64, "https://nilsgrid.net/shapeshifter/api/v3/message", true)) } def "FlexRequest updates the asset from request ISPs and replies with FlexRequestResponse then FlexOffer"() { - when: "a signed in-scope FlexRequest is processed" - signAndProcess(buildFlexRequest(CONTRACTED_EAN)) + given: "a signed in-scope FlexRequest in the example wire format" + def xml = flexRequestXml(CONTRACTED_EAN) + + when: "it is processed" + signAndProcess(xml) then: "predicted datapoints are written: max=importMax, min=exportMax" 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerMaximumFlexRequest", { List dps -> @@ -146,29 +108,26 @@ class GOPACSHandlerTest extends Specification { dps.size() == 2 && dps.collect { it.value as double } == [3.0d, 4.0d] }) - and: "a FlexRequestResponse (Accepted) is sent, followed by a FlexOffer for the same congestion point" + and: "a FlexRequestResponse is sent, followed by a FlexOffer for the same congestion point" handler.sent.size() == 2 handler.sent[0] instanceof FlexRequestResponse - ((FlexRequestResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED handler.sent[1] instanceof FlexOffer ((FlexOffer) handler.sent[1]).congestionPoint == CONTRACTED_EAN !((FlexOffer) handler.sent[1]).offerOptions.isEmpty() - } - private static FlexOfferResponse buildFlexOfferResponse(AcceptedRejectedType result) { - def r = new FlexOfferResponse() - applyHeader(r) - r.setFlexOfferMessageID(UUID.randomUUID().toString()) - r.setResult(result) - if (result == AcceptedRejectedType.REJECTED) { - r.setRejectionReason("insufficient flexibility") - } - return r + and: "the emitted FlexRequestResponse matches the example payload.xml shape" + def response = (FlexRequestResponse) handler.sent[0] + response.result == AcceptedRejectedType.ACCEPTED + response.flexRequestMessageID == FLEX_REQUEST_MESSAGE_ID + response.senderDomain == AGR_DOMAIN + response.recipientDomain == DSO_DOMAIN + response.version == "3.0.0" + response.conversationID == CONVERSATION_ID } def "FlexOfferResponse (#result) is handled without mutating the asset or sending a reply"() { when: "a signed FlexOfferResponse is processed" - signAndProcess(buildFlexOfferResponse(result)) + signAndProcess(flexOfferResponseXml(result)) then: "no asset mutation and no outbound message" 0 * assetPredictedDatapointService.updateValues(_, _, _) @@ -179,34 +138,9 @@ class GOPACSHandlerTest extends Specification { result << [AcceptedRejectedType.ACCEPTED, AcceptedRejectedType.REJECTED] } - private static FlexOrderISPType orderIsp(long start, long power) { - def isp = new FlexOrderISPType() - isp.setStart(start) - isp.setDuration(1L) - isp.setPower(power) - return isp - } - - private static FlexOrder buildFlexOrder(String congestionPoint, List powers) { - def fo = new FlexOrder() - applyHeader(fo) - fo.setISPDuration(Duration.ofMinutes(15)) - fo.setTimeZone(TIME_ZONE) - fo.setPeriod(PERIOD) - fo.setCongestionPoint(congestionPoint) - fo.setFlexOfferMessageID(UUID.randomUUID().toString()) - fo.setOrderReference(UUID.randomUUID().toString()) - fo.setContractID("contract-1") - fo.setCurrency("EUR") - fo.setPrice(new BigDecimal("0.00")) - long start = 1L - powers.each { p -> fo.getISPS().add(orderIsp(start++, p)) } - return fo - } - def "FlexOrder with offtake power updates currentPower and the max-profile and replies with FlexOrderResponse"() { when: "a signed in-scope FlexOrder with positive (offtake) power is processed" - signAndProcess(buildFlexOrder(CONTRACTED_EAN, [4000L, 8000L])) // 4.0, 8.0 kW + signAndProcess(flexOrderXml(CONTRACTED_EAN, [4000, 8000])) // 4.0, 8.0 kW then: "current power and the offtake (max) profile are written; the feed-in (min) profile is not" 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps -> @@ -225,7 +159,7 @@ class GOPACSHandlerTest extends Specification { def "FlexOrder with feed-in power updates currentPower and the min-profile"() { when: "a signed in-scope FlexOrder with negative (feed-in) power is processed" - signAndProcess(buildFlexOrder(CONTRACTED_EAN, [-2000L, -5000L])) // -2.0, -5.0 kW + signAndProcess(flexOrderXml(CONTRACTED_EAN, [-2000, -5000])) // -2.0, -5.0 kW then: "current power and the feed-in (min) profile are written; the offtake (max) profile is not" 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps -> @@ -243,11 +177,8 @@ class GOPACSHandlerTest extends Specification { } def "a validly-signed FlexRequest for a different congestion point is dropped with no mutation and no reply"() { - given: "a FlexRequest whose congestion point is not the contracted EAN" - def otherEan = "ean.999999999999999999" - - when: "the signed out-of-scope FlexRequest is processed" - signAndProcess(buildFlexRequest(otherEan)) + when: "a signed out-of-scope FlexRequest (different EAN) is processed" + signAndProcess(flexRequestXml("ean.999999999999999999")) then: "the asset is not mutated and nothing is sent back" 0 * assetPredictedDatapointService.updateValues(_, _, _) @@ -255,7 +186,48 @@ class GOPACSHandlerTest extends Specification { handler.sent.isEmpty() } - // ---- Test subclass: records outbound messages instead of signing/sending them ---- + // ---- Embedded UFTP payload fixtures (attribute-style XML, matching the example message format) ---- + + private static String flexRequestXml(String congestionPoint) { + """ + + + +""" + } + + private static String flexOrderXml(String congestionPoint, List powers) { + def isps = (0.. + """ """ + }.join("\n") + """ + +${isps} +""" + } + + private static String flexOfferResponseXml(AcceptedRejectedType result) { + def reason = result == AcceptedRejectedType.REJECTED ? ' RejectionReason="insufficient flexibility"' : '' + """ +""" + } + + // Signs the payload XML as the DSO and feeds the transport XML through the real entry point. + private void signAndProcess(String payloadXml) { + def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO) + SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, handler.privateKey) + handler.processRawMessage(GOPACSHandler.serializer.toXml(signed)) + } + + // Records outbound messages (library-generated responses + handler-built FlexOffer) instead of sending them. static class RecordingGOPACSHandler extends GOPACSHandler { final List sent = new ArrayList<>() From 086187d5fce7c4ef8f91a5665a98fa81f79a5b63 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 4 Jun 2026 14:39:24 +0200 Subject: [PATCH 7/8] Add WireMock test dependency for GOPACS HTTP-level tests The GOPACS broker delivery goes through java.net.http.HttpClient (via shapeshifter's UftpSendMessageService), which a JAX-RS ClientRequestFilter cannot intercept. WireMock (standalone, shaded to avoid clashing with the manager's web stack) provides a local HTTP server to stub and verify the broker, OAuth2 token and address-book endpoints on the wire -- the same approach shapeshifter uses to test UftpSendMessageService. --- ems/build.gradle | 4 ++++ gradle.properties | 1 + 2 files changed, 5 insertions(+) diff --git a/ems/build.gradle b/ems/build.gradle index 5557af3..1a99709 100644 --- a/ems/build.gradle +++ b/ems/build.gradle @@ -14,6 +14,10 @@ dependencies { api "org.jboss.resteasy:resteasy-client-api:$resteasyVersion" implementation "org.lfenergy.shapeshifter:shapeshifter-core:$shapeshifterVersion" testImplementation "io.openremote:openremote-test:$openremoteVersion" + // Standalone (shaded) WireMock so its embedded Jetty/Jackson don't clash with the manager's web stack; + // used to mock the GOPACS broker / OAuth2 / address-book HTTP endpoints (the broker send goes through + // java.net.http.HttpClient, which a JAX-RS ClientRequestFilter cannot intercept). + testImplementation "org.wiremock:wiremock-standalone:$wiremockVersion" } jar { diff --git a/gradle.properties b/gradle.properties index 75531c3..aa28d06 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,3 +20,4 @@ jacksonVersion = 2.21.3 resteasyVersion = 6.2.16.Final shapeshifterVersion = 3.5.0 testLoggerVersion = 4.0.0 +wiremockVersion = 3.13.1 From f69c2c92fcbae1349798c30331a3a96d1424c694 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 4 Jun 2026 14:39:24 +0200 Subject: [PATCH 8/8] Add GOPACSHandler HTTP-contract integration test Complements the in-process GOPACSHandlerTest (which calls processRawMessage directly and records outgoing payload objects) by exercising the real transport chain end to end, addressing the HTTP coverage gap raised in review: - Inbound: real POST to the deployed /gopacs/message JAX-RS resource, asserting accepted media type, JAX-RS delegation and the WebApplicationException status mapping (success vs 400). - Outbound: real broker delivery via UftpSendMessageService (over java.net.http.HttpClient), plus the OAuth2 token and address-book lookups via the handler's RESTEasy client. All three are pointed at one WireMock server, so the signed XML, synthesised broker endpoint and 'Authorization: Bearer' header are verified on the wire. Covers: in-scope FlexRequest -> FlexRequestResponse + FlexOffer broker delivery; FlexOrder -> FlexOrderResponse; out-of-scope drop (accepted, no delivery); malformed XML and signature-mismatch -> 400; OAuth2 500 / invalid token JSON and address-book 404 -> 400; and participant-lookup caching. Like the other manager-container integration tests in this repo (e.g. EntsoeProtocolTest) it requires a running OpenRemote dev stack and is skipped on CI via @IgnoreIf(GITHUB_ACTIONS). --- .../gopacs/GOPACSHandlerHttpTest.groovy | 391 ++++++++++++++++++ 1 file changed, 391 insertions(+) create mode 100644 ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy new file mode 100644 index 0000000..50d9df1 --- /dev/null +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy @@ -0,0 +1,391 @@ +/* + * Copyright 2026, OpenRemote Inc. + * + * See the CONTRIBUTORS.txt file in the distribution for a + * full listing of individual contributors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.openremote.extension.ems.manager.gopacs + +import com.github.tomakehurst.wiremock.WireMockServer +import com.goterl.lazysodium.LazySodiumJava +import com.goterl.lazysodium.SodiumJava +import com.goterl.lazysodium.utils.KeyPair +import jakarta.ws.rs.client.Entity +import jakarta.ws.rs.core.Response +import org.lfenergy.shapeshifter.api.USEFRoleType +import org.lfenergy.shapeshifter.api.SignedMessage +import org.lfenergy.shapeshifter.core.model.UftpParticipant +import org.openremote.container.web.WebService +import org.openremote.extension.ems.manager.EmsOptimisationService +import org.openremote.extension.ems.manager.EmsOptimisationSetupService +import org.openremote.model.ContainerService +import org.openremote.test.ManagerContainerTrait +import spock.lang.IgnoreIf +import spock.lang.Shared +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.time.LocalDate + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse +import static com.github.tomakehurst.wiremock.client.WireMock.containing +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo +import static com.github.tomakehurst.wiremock.client.WireMock.get +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor +import static com.github.tomakehurst.wiremock.client.WireMock.okJson +import static com.github.tomakehurst.wiremock.client.WireMock.post +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo +import static org.openremote.model.Constants.MASTER_REALM + +/** + * HTTP-contract integration test for {@link GOPACSHandler}, complementing the in-process message-flow + * unit test in {@code GOPACSHandlerTest}. Where that test calls {@code processRawMessage} directly and + * records outgoing payload objects, this test exercises the real transport chain: + * + *
    + *
  • Inbound: a real {@code POST} to the deployed JAX-RS resource ({@code /gopacs/message}), + * asserting the accepted media type, JAX-RS delegation and {@code WebApplicationException} status + * mapping in {@link GOPACSHandler#processRawMessage}.
  • + *
  • Outbound: the real broker delivery via {@code UftpSendMessageService} (which posts over + * {@code java.net.http.HttpClient}), plus the OAuth2 token and address-book lookups via the + * handler's RESTEasy client. All three are pointed at a single WireMock server, so signed XML, + * the synthesised broker endpoint and the {@code Authorization: Bearer} header are asserted on the + * wire.
  • + *
+ * + * The broker send uses the JDK {@code HttpClient}, which a JAX-RS {@code ClientRequestFilter} (the style + * used by {@code EntsoeProtocolTest}) cannot intercept; hence the WireMock server, matching the approach + * shapeshifter itself uses to test {@code UftpSendMessageService}. + * + * Like the other manager-container integration tests in this repository (e.g. {@code EntsoeProtocolTest}), + * this requires a running OpenRemote dev stack and is skipped on CI. + * + * Asset mutation (predicted datapoints / attribute events) is intentionally not re-asserted here -- those + * writes are scheduled asynchronously and are already covered precisely by {@code GOPACSHandlerTest}. This + * test focuses on the HTTP contract, so a placeholder asset id is sufficient. + */ +@IgnoreIf({ System.getenv("GITHUB_ACTIONS") == "true" }) +class GOPACSHandlerHttpTest extends Specification implements ManagerContainerTrait { + + static final String CONTRACTED_EAN = "ean.871234567890123456" + static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu" + static final String AGR_DOMAIN = "openremote.io" // our AGR: recipient of inbound, sender of replies + static final String DSO_DOMAIN = "nilsgrid.net" // the DSO: sender of inbound, recipient of replies + static final LocalDate PERIOD = LocalDate.of(2026, 6, 4) + static final String FLEX_REQUEST_MESSAGE_ID = "b3030b8b-2f45-43f4-8bf3-f00ef6fd74fc" + static final String CONVERSATION_ID = "5f7c9c4d-5988-4a17-a479-2f716051fd6d" + static final String ACCESS_TOKEN = "test-access-token" + + static final String OAUTH_PATH = "/oauth/token" + static final String ADDRESS_BOOK_PATH = "/uftp-participants/v3/participants/" + DSO_DOMAIN + static final String BROKER_PATH = "/shapeshifter/api/v3/message" + + @Shared + WireMockServer wireMock + @Shared + String dsoPrivateKeyB64 + @Shared + String dsoPublicKeyB64 + @Shared + File privateKeyFile + @Shared + GOPACSHandler handler + @Shared + int serverPort + + def setupSpec() { + // Two ed25519 keypairs: the DSO signs the inbound messages (its public key is served by the + // address-book stub so the handler can verify them), and the AGR signs its outbound replies. + def lazySodium = new LazySodiumJava(new SodiumJava()) + KeyPair dso = lazySodium.cryptoSignKeypair() + dsoPrivateKeyB64 = Base64.encoder.encodeToString(dso.secretKey.asBytes) + dsoPublicKeyB64 = Base64.encoder.encodeToString(dso.publicKey.asBytes) + + KeyPair agr = lazySodium.cryptoSignKeypair() + privateKeyFile = File.createTempFile("gopacs-agr-private-key", ".txt") + privateKeyFile.deleteOnExit() + privateKeyFile.text = Base64.encoder.encodeToString(agr.secretKey.asBytes) + + wireMock = new WireMockServer(0) + wireMock.start() + } + + def cleanupSpec() { + handler?.undeploy() + wireMock?.stop() + } + + def setup() { + // Must be a plain String (not a Groovy GString): OpenRemote's Config.init casts every config value to String. + String base = "http://localhost:${wireMock.port()}".toString() + def config = defaultConfig() << [ + (GOPACSHandler.GOPACS_CLIENT_ID) : "test-client", + (GOPACSHandler.GOPACS_CLIENT_SECRET) : "test-secret", + (GOPACSHandler.GOPACS_PRIVATE_KEY_FILE) : privateKeyFile.absolutePath, + (GOPACSHandler.GOPACS_OAUTH2_URL) : base + OAUTH_PATH, + (GOPACSHandler.GOPACS_PARTICIPANT_URL) : base, + (GOPACSHandler.GOPACS_BROKER_URL) : base, + // Send replies immediately so assertions don't wait on the production response/offer delays. + (GOPACSHandler.GOPACS_RESPONSE_DELAY_SECONDS) : "0", + (GOPACSHandler.GOPACS_FLEX_OFFER_DELAY_SECONDS): "0" + ] + + // startContainer reuses the already-running container when config (ignoring the web-server port) + // and services match, so this is cheap after the first feature. + def runningContainer = startContainer(config, gopacsServices()) + + if (handler == null) { + serverPort = runningContainer.getConfig().get(WebService.OR_WEBSERVER_LISTEN_PORT) as int + + // Construct the production handler directly (real RESTEasy client, real send service, real + // JAX-RS deployment). EmsOptimisationService is excluded from the service list above so it + // cannot spin up a second handler on the same /gopacs deployment path. + handler = new GOPACSHandler(CONTRACTED_EAN, MASTER_REALM, ASSET_ID, runningContainer) + } + + // Reset cross-test state: the participant cache leaks across features otherwise, and the request + // journal / stubs must start clean for the per-feature verifications. + handler.participants.clear() + wireMock.resetAll() + stubOAuthToken(ACCESS_TOKEN) + stubAddressBook(dsoPublicKeyB64) + stubBrokerAccepts() + } + + // The manager services minus the EMS optimisation services, which would otherwise deploy their own + // GOPACS handler (on the same /gopacs path) from any persisted EmsGOPACSAsset. + private Iterable gopacsServices() { + defaultServices().findAll { + !(it instanceof EmsOptimisationService) && !(it instanceof EmsOptimisationSetupService) + } + } + + def "a signed in-scope FlexRequest POSTed to /gopacs/message is accepted and the reply is delivered to the broker with a bearer token"() { + given: "polling for the asynchronous outbound deliveries" + def conditions = new PollingConditions(timeout: 10, delay: 0.2) + + when: "a signed FlexRequest is POSTed to the deployed endpoint" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "the transport call succeeds" + response.statusInfo.family == Response.Status.Family.SUCCESSFUL + response.close() + + and: "verifying the signature fetched an OAuth2 token and resolved the DSO via the address book" + conditions.eventually { + wireMock.verify(postRequestedFor(urlPathEqualTo(OAUTH_PATH)) + .withRequestBody(containing("grant_type=client_credentials"))) + wireMock.verify(getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH)) + .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN))) + } + + and: "the FlexRequestResponse and FlexOffer are actually POSTed to the broker as signed XML with a bearer token" + conditions.eventually { + wireMock.verify(postRequestedFor(urlPathEqualTo(BROKER_PATH)) + .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN)) + .withRequestBody(containing("SignedMessage"))) + def payloads = decodedBrokerPayloads() + assert payloads.any { it.contains("FlexRequestResponse") } + assert payloads.any { it.contains("FlexOffer") } + } + } + + def "a signed in-scope FlexOrder POSTed to /gopacs/message is accepted and a FlexOrderResponse is delivered to the broker"() { + given: + def conditions = new PollingConditions(timeout: 10, delay: 0.2) + + when: "a signed FlexOrder is POSTed to the deployed endpoint" + Response response = postSignedAsDso(flexOrderXml(CONTRACTED_EAN, [4000, 8000])) + + then: "the transport call succeeds" + response.statusInfo.family == Response.Status.Family.SUCCESSFUL + response.close() + + and: "a FlexOrderResponse is POSTed to the broker as signed XML with a bearer token" + conditions.eventually { + wireMock.verify(postRequestedFor(urlPathEqualTo(BROKER_PATH)) + .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN)) + .withRequestBody(containing("SignedMessage"))) + assert decodedBrokerPayloads().any { it.contains("FlexOrderResponse") } + } + } + + def "a validly-signed FlexRequest for a different congestion point is accepted but produces no broker delivery"() { + when: "a signed out-of-scope FlexRequest (different EAN) is POSTed" + Response response = postSignedAsDso(flexRequestXml("ean.999999999999999999")) + + then: "the transport call still succeeds (the signed envelope is accepted)" + response.statusInfo.family == Response.Status.Family.SUCCESSFUL + response.close() + + and: "no message is ever delivered to the broker" + // The drop happens before any reply is scheduled; allow the (zero-delay) scheduler to run first. + Thread.sleep(1000) + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "malformed transport XML is rejected with 400"() { + when: "a body that is not a SignedMessage envelope is POSTed" + Response response = postXml("") + + then: "the endpoint maps the failure to 400 Bad Request" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "a correctly-formed message whose signature does not match the resolved public key is rejected with 400"() { + given: "the address book returns a public key that does not match the DSO signing key" + def lazySodium = new LazySodiumJava(new SodiumJava()) + def wrongPublicKeyB64 = Base64.encoder.encodeToString(lazySodium.cryptoSignKeypair().publicKey.asBytes) + stubAddressBook(wrongPublicKeyB64) + + when: "a validly-signed FlexRequest is POSTed" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "signature verification fails and the endpoint returns 400 (the UftpConnectorException branch)" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "an OAuth2 token failure (#scenario) prevents participant resolution so the inbound message is rejected with 400"() { + given: "the token endpoint is unhealthy" + wireMock.stubFor(post(urlPathEqualTo(OAUTH_PATH)).willReturn(tokenResponse)) + + when: "a validly-signed FlexRequest is POSTed" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "without a bearer token the DSO cannot be resolved, verification fails and the endpoint returns 400" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + + where: + scenario | tokenResponse + "500 error" | aResponse().withStatus(500) + "invalid token JSON" | okJson("not-json") + } + + def "an address-book 404 for the sender domain is rejected with 400"() { + given: "the address book does not know the DSO" + wireMock.stubFor(get(urlPathEqualTo(ADDRESS_BOOK_PATH)).willReturn(aResponse().withStatus(404))) + + when: "a validly-signed FlexRequest is POSTed" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "the sender cannot be resolved, verification fails and the endpoint returns 400" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "the resolved participant is cached so a second inbound message does not trigger a second address-book lookup"() { + given: + def conditions = new PollingConditions(timeout: 10, delay: 0.2) + + when: "two signed in-scope FlexRequests are POSTed" + postSignedAsDso(flexRequestXml(CONTRACTED_EAN)).close() + conditions.eventually { + wireMock.verify(getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH))) + } + postSignedAsDso(flexRequestXml(CONTRACTED_EAN)).close() + // Let any (zero-delay) second lookup happen before asserting it did not. + Thread.sleep(1000) + + then: "the address book is queried exactly once -- the second message uses the cached participant" + wireMock.verify(1, getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH))) + } + + // ---- WireMock stubs ---- + + private void stubOAuthToken(String token) { + wireMock.stubFor(post(urlPathEqualTo(OAUTH_PATH)).willReturn( + okJson("""{"access_token":"${token}","token_type":"Bearer","expires_in":3600,"scope":"uftp"}"""))) + } + + private void stubAddressBook(String publicKeyB64) { + wireMock.stubFor(get(urlPathEqualTo(ADDRESS_BOOK_PATH)).willReturn( + okJson("""{"domain":"${DSO_DOMAIN}","publicKey":"${publicKeyB64}"}"""))) + } + + private void stubBrokerAccepts() { + wireMock.stubFor(post(urlPathEqualTo(BROKER_PATH)).willReturn(aResponse().withStatus(200))) + } + + // ---- HTTP helpers ---- + + // Signs the payload XML as the DSO and POSTs the resulting transport XML to the deployed endpoint. + private Response postSignedAsDso(String payloadXml) { + def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO) + SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, dsoPrivateKeyB64) + return postXml(GOPACSHandler.serializer.toXml(signed)) + } + + // The broker receives SignedMessage envelopes carrying the payload in the base64 "Body" attribute. The + // decoded bytes are the libsodium-signed payload (a 64-byte signature prefixed to the payload XML), so the + // outgoing message type is still a substring of the decoded text and can be asserted on the wire. + private List decodedBrokerPayloads() { + wireMock.findAll(postRequestedFor(urlPathEqualTo(BROKER_PATH))).collect { req -> + def matcher = (req.bodyAsString =~ /Body="([^"]+)"/) + matcher ? new String(Base64.decoder.decode(matcher[0][1] as String)) : "" + } + } + + private Response postXml(String transportXml) { + // The deployment registers a realm-path-extractor filter (realmIndex 0), so the realm is the first + // segment after the /gopacs context path: /gopacs/{realm}/message -> resource @Path("message"). + createClient(null) + .target(serverUri(serverPort).path("gopacs").path(MASTER_REALM).path("message")) + .request() + .post(Entity.entity(transportXml, "text/xml")) + } + + // ---- Embedded UFTP payload fixtures (attribute-style XML, matching the example message format) ---- + + private static String flexRequestXml(String congestionPoint) { + """ + + + +""" + } + + private static String flexOrderXml(String congestionPoint, List powers) { + def isps = (0.. + """ """ + }.join("\n") + """ + +${isps} +""" + } +}