diff --git a/ems/build.gradle b/ems/build.gradle
index 5557af3..1a99709 100644
--- a/ems/build.gradle
+++ b/ems/build.gradle
@@ -14,6 +14,10 @@ dependencies {
api "org.jboss.resteasy:resteasy-client-api:$resteasyVersion"
implementation "org.lfenergy.shapeshifter:shapeshifter-core:$shapeshifterVersion"
testImplementation "io.openremote:openremote-test:$openremoteVersion"
+ // Standalone (shaded) WireMock so its embedded Jetty/Jackson don't clash with the manager's web stack;
+ // used to mock the GOPACS broker / OAuth2 / address-book HTTP endpoints (the broker send goes through
+ // java.net.http.HttpClient, which a JAX-RS ClientRequestFilter cannot intercept).
+ testImplementation "org.wiremock:wiremock-standalone:$wiremockVersion"
}
jar {
diff --git a/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java b/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java
index 15cfc2d..108dc6a 100644
--- a/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java
+++ b/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java
@@ -211,6 +211,52 @@ protected GOPACSHandler(String contractedEAN, String realm, String electricitySu
deploy(container);
}
+ /**
+ * Test-support constructor. Wires the message-processing collaborators directly and skips
+ * remote configuration (OAuth client, private-key file) and JAX-RS deployment, so the
+ * day-ahead UFTP message flow can be exercised in isolation. Not used in production wiring.
+ */
+ protected GOPACSHandler(String contractedEAN,
+ String realm,
+ String electricitySupplierAssetId,
+ AssetProcessingService assetProcessingService,
+ AssetPredictedDatapointService assetPredictedDatapointService,
+ TimerService timerService,
+ ScheduledExecutorService scheduledExecutorService,
+ String privateKey) {
+ this.devMode = false;
+ this.contractedEAN = contractedEAN;
+ this.realm = realm;
+ this.electricitySupplierAssetId = electricitySupplierAssetId;
+ this.participants = new HashMap<>();
+
+ this.assetProcessingService = assetProcessingService;
+ this.assetPredictedDatapointService = assetPredictedDatapointService;
+ this.timerService = timerService;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.webService = null;
+
+ this.gopacsBrokerUrl = "";
+ this.responseDelaySeconds = 0;
+ this.flexOfferDelaySeconds = 0;
+ this.clientId = null;
+ this.clientSecret = null;
+ this.privateKey = privateKey;
+
+ this.client = null;
+ this.gopacsAddressBookResource = null;
+ this.gopacsAuthResource = null;
+ this.gopacsServerResource = null;
+
+ this.participantResolutionService = new ParticipantResolutionService(this);
+ this.cryptoService = new UftpCryptoService(participantResolutionService, new LazySodiumFactory(), new LazySodiumBase64Pool());
+ this.uftpValidationService = new UftpValidationService(new ArrayList<>());
+ this.uftpReceivedMessageService = new UftpReceivedMessageService(uftpValidationService, this);
+ this.uftpSendMessageService = new UftpSendMessageService(serializer, cryptoService, participantResolutionService, this, uftpValidationService);
+
+ this.objectMapper = new ObjectMapper();
+ }
+
protected static String getDeploymentName(String contractedEAN) {
return "GOPACS: " + contractedEAN;
}
diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy
new file mode 100644
index 0000000..50d9df1
--- /dev/null
+++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy
@@ -0,0 +1,391 @@
+/*
+ * Copyright 2026, OpenRemote Inc.
+ *
+ * See the CONTRIBUTORS.txt file in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.openremote.extension.ems.manager.gopacs
+
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.goterl.lazysodium.LazySodiumJava
+import com.goterl.lazysodium.SodiumJava
+import com.goterl.lazysodium.utils.KeyPair
+import jakarta.ws.rs.client.Entity
+import jakarta.ws.rs.core.Response
+import org.lfenergy.shapeshifter.api.USEFRoleType
+import org.lfenergy.shapeshifter.api.SignedMessage
+import org.lfenergy.shapeshifter.core.model.UftpParticipant
+import org.openremote.container.web.WebService
+import org.openremote.extension.ems.manager.EmsOptimisationService
+import org.openremote.extension.ems.manager.EmsOptimisationSetupService
+import org.openremote.model.ContainerService
+import org.openremote.test.ManagerContainerTrait
+import spock.lang.IgnoreIf
+import spock.lang.Shared
+import spock.lang.Specification
+import spock.util.concurrent.PollingConditions
+
+import java.time.LocalDate
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse
+import static com.github.tomakehurst.wiremock.client.WireMock.containing
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo
+import static com.github.tomakehurst.wiremock.client.WireMock.get
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor
+import static com.github.tomakehurst.wiremock.client.WireMock.okJson
+import static com.github.tomakehurst.wiremock.client.WireMock.post
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo
+import static org.openremote.model.Constants.MASTER_REALM
+
+/**
+ * HTTP-contract integration test for {@link GOPACSHandler}, complementing the in-process message-flow
+ * unit test in {@code GOPACSHandlerTest}. Where that test calls {@code processRawMessage} directly and
+ * records outgoing payload objects, this test exercises the real transport chain:
+ *
+ *
+ * - Inbound: a real {@code POST} to the deployed JAX-RS resource ({@code /gopacs/message}),
+ * asserting the accepted media type, JAX-RS delegation and {@code WebApplicationException} status
+ * mapping in {@link GOPACSHandler#processRawMessage}.
+ * - Outbound: the real broker delivery via {@code UftpSendMessageService} (which posts over
+ * {@code java.net.http.HttpClient}), plus the OAuth2 token and address-book lookups via the
+ * handler's RESTEasy client. All three are pointed at a single WireMock server, so signed XML,
+ * the synthesised broker endpoint and the {@code Authorization: Bearer} header are asserted on the
+ * wire.
+ *
+ *
+ * The broker send uses the JDK {@code HttpClient}, which a JAX-RS {@code ClientRequestFilter} (the style
+ * used by {@code EntsoeProtocolTest}) cannot intercept; hence the WireMock server, matching the approach
+ * shapeshifter itself uses to test {@code UftpSendMessageService}.
+ *
+ * Like the other manager-container integration tests in this repository (e.g. {@code EntsoeProtocolTest}),
+ * this requires a running OpenRemote dev stack and is skipped on CI.
+ *
+ * Asset mutation (predicted datapoints / attribute events) is intentionally not re-asserted here -- those
+ * writes are scheduled asynchronously and are already covered precisely by {@code GOPACSHandlerTest}. This
+ * test focuses on the HTTP contract, so a placeholder asset id is sufficient.
+ */
+@IgnoreIf({ System.getenv("GITHUB_ACTIONS") == "true" })
+class GOPACSHandlerHttpTest extends Specification implements ManagerContainerTrait {
+
+ static final String CONTRACTED_EAN = "ean.871234567890123456"
+ static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu"
+ static final String AGR_DOMAIN = "openremote.io" // our AGR: recipient of inbound, sender of replies
+ static final String DSO_DOMAIN = "nilsgrid.net" // the DSO: sender of inbound, recipient of replies
+ static final LocalDate PERIOD = LocalDate.of(2026, 6, 4)
+ static final String FLEX_REQUEST_MESSAGE_ID = "b3030b8b-2f45-43f4-8bf3-f00ef6fd74fc"
+ static final String CONVERSATION_ID = "5f7c9c4d-5988-4a17-a479-2f716051fd6d"
+ static final String ACCESS_TOKEN = "test-access-token"
+
+ static final String OAUTH_PATH = "/oauth/token"
+ static final String ADDRESS_BOOK_PATH = "/uftp-participants/v3/participants/" + DSO_DOMAIN
+ static final String BROKER_PATH = "/shapeshifter/api/v3/message"
+
+ @Shared
+ WireMockServer wireMock
+ @Shared
+ String dsoPrivateKeyB64
+ @Shared
+ String dsoPublicKeyB64
+ @Shared
+ File privateKeyFile
+ @Shared
+ GOPACSHandler handler
+ @Shared
+ int serverPort
+
+ def setupSpec() {
+ // Two ed25519 keypairs: the DSO signs the inbound messages (its public key is served by the
+ // address-book stub so the handler can verify them), and the AGR signs its outbound replies.
+ def lazySodium = new LazySodiumJava(new SodiumJava())
+ KeyPair dso = lazySodium.cryptoSignKeypair()
+ dsoPrivateKeyB64 = Base64.encoder.encodeToString(dso.secretKey.asBytes)
+ dsoPublicKeyB64 = Base64.encoder.encodeToString(dso.publicKey.asBytes)
+
+ KeyPair agr = lazySodium.cryptoSignKeypair()
+ privateKeyFile = File.createTempFile("gopacs-agr-private-key", ".txt")
+ privateKeyFile.deleteOnExit()
+ privateKeyFile.text = Base64.encoder.encodeToString(agr.secretKey.asBytes)
+
+ wireMock = new WireMockServer(0)
+ wireMock.start()
+ }
+
+ def cleanupSpec() {
+ handler?.undeploy()
+ wireMock?.stop()
+ }
+
+ def setup() {
+ // Must be a plain String (not a Groovy GString): OpenRemote's Config.init casts every config value to String.
+ String base = "http://localhost:${wireMock.port()}".toString()
+ def config = defaultConfig() << [
+ (GOPACSHandler.GOPACS_CLIENT_ID) : "test-client",
+ (GOPACSHandler.GOPACS_CLIENT_SECRET) : "test-secret",
+ (GOPACSHandler.GOPACS_PRIVATE_KEY_FILE) : privateKeyFile.absolutePath,
+ (GOPACSHandler.GOPACS_OAUTH2_URL) : base + OAUTH_PATH,
+ (GOPACSHandler.GOPACS_PARTICIPANT_URL) : base,
+ (GOPACSHandler.GOPACS_BROKER_URL) : base,
+ // Send replies immediately so assertions don't wait on the production response/offer delays.
+ (GOPACSHandler.GOPACS_RESPONSE_DELAY_SECONDS) : "0",
+ (GOPACSHandler.GOPACS_FLEX_OFFER_DELAY_SECONDS): "0"
+ ]
+
+ // startContainer reuses the already-running container when config (ignoring the web-server port)
+ // and services match, so this is cheap after the first feature.
+ def runningContainer = startContainer(config, gopacsServices())
+
+ if (handler == null) {
+ serverPort = runningContainer.getConfig().get(WebService.OR_WEBSERVER_LISTEN_PORT) as int
+
+ // Construct the production handler directly (real RESTEasy client, real send service, real
+ // JAX-RS deployment). EmsOptimisationService is excluded from the service list above so it
+ // cannot spin up a second handler on the same /gopacs deployment path.
+ handler = new GOPACSHandler(CONTRACTED_EAN, MASTER_REALM, ASSET_ID, runningContainer)
+ }
+
+ // Reset cross-test state: the participant cache leaks across features otherwise, and the request
+ // journal / stubs must start clean for the per-feature verifications.
+ handler.participants.clear()
+ wireMock.resetAll()
+ stubOAuthToken(ACCESS_TOKEN)
+ stubAddressBook(dsoPublicKeyB64)
+ stubBrokerAccepts()
+ }
+
+ // The manager services minus the EMS optimisation services, which would otherwise deploy their own
+ // GOPACS handler (on the same /gopacs path) from any persisted EmsGOPACSAsset.
+ private Iterable gopacsServices() {
+ defaultServices().findAll {
+ !(it instanceof EmsOptimisationService) && !(it instanceof EmsOptimisationSetupService)
+ }
+ }
+
+ def "a signed in-scope FlexRequest POSTed to /gopacs/message is accepted and the reply is delivered to the broker with a bearer token"() {
+ given: "polling for the asynchronous outbound deliveries"
+ def conditions = new PollingConditions(timeout: 10, delay: 0.2)
+
+ when: "a signed FlexRequest is POSTed to the deployed endpoint"
+ Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN))
+
+ then: "the transport call succeeds"
+ response.statusInfo.family == Response.Status.Family.SUCCESSFUL
+ response.close()
+
+ and: "verifying the signature fetched an OAuth2 token and resolved the DSO via the address book"
+ conditions.eventually {
+ wireMock.verify(postRequestedFor(urlPathEqualTo(OAUTH_PATH))
+ .withRequestBody(containing("grant_type=client_credentials")))
+ wireMock.verify(getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH))
+ .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN)))
+ }
+
+ and: "the FlexRequestResponse and FlexOffer are actually POSTed to the broker as signed XML with a bearer token"
+ conditions.eventually {
+ wireMock.verify(postRequestedFor(urlPathEqualTo(BROKER_PATH))
+ .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN))
+ .withRequestBody(containing("SignedMessage")))
+ def payloads = decodedBrokerPayloads()
+ assert payloads.any { it.contains("FlexRequestResponse") }
+ assert payloads.any { it.contains("FlexOffer") }
+ }
+ }
+
+ def "a signed in-scope FlexOrder POSTed to /gopacs/message is accepted and a FlexOrderResponse is delivered to the broker"() {
+ given:
+ def conditions = new PollingConditions(timeout: 10, delay: 0.2)
+
+ when: "a signed FlexOrder is POSTed to the deployed endpoint"
+ Response response = postSignedAsDso(flexOrderXml(CONTRACTED_EAN, [4000, 8000]))
+
+ then: "the transport call succeeds"
+ response.statusInfo.family == Response.Status.Family.SUCCESSFUL
+ response.close()
+
+ and: "a FlexOrderResponse is POSTed to the broker as signed XML with a bearer token"
+ conditions.eventually {
+ wireMock.verify(postRequestedFor(urlPathEqualTo(BROKER_PATH))
+ .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN))
+ .withRequestBody(containing("SignedMessage")))
+ assert decodedBrokerPayloads().any { it.contains("FlexOrderResponse") }
+ }
+ }
+
+ def "a validly-signed FlexRequest for a different congestion point is accepted but produces no broker delivery"() {
+ when: "a signed out-of-scope FlexRequest (different EAN) is POSTed"
+ Response response = postSignedAsDso(flexRequestXml("ean.999999999999999999"))
+
+ then: "the transport call still succeeds (the signed envelope is accepted)"
+ response.statusInfo.family == Response.Status.Family.SUCCESSFUL
+ response.close()
+
+ and: "no message is ever delivered to the broker"
+ // The drop happens before any reply is scheduled; allow the (zero-delay) scheduler to run first.
+ Thread.sleep(1000)
+ wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH)))
+ }
+
+ def "malformed transport XML is rejected with 400"() {
+ when: "a body that is not a SignedMessage envelope is POSTed"
+ Response response = postXml("")
+
+ then: "the endpoint maps the failure to 400 Bad Request"
+ response.status == 400
+ response.close()
+
+ and: "nothing is delivered to the broker"
+ wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH)))
+ }
+
+ def "a correctly-formed message whose signature does not match the resolved public key is rejected with 400"() {
+ given: "the address book returns a public key that does not match the DSO signing key"
+ def lazySodium = new LazySodiumJava(new SodiumJava())
+ def wrongPublicKeyB64 = Base64.encoder.encodeToString(lazySodium.cryptoSignKeypair().publicKey.asBytes)
+ stubAddressBook(wrongPublicKeyB64)
+
+ when: "a validly-signed FlexRequest is POSTed"
+ Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN))
+
+ then: "signature verification fails and the endpoint returns 400 (the UftpConnectorException branch)"
+ response.status == 400
+ response.close()
+
+ and: "nothing is delivered to the broker"
+ wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH)))
+ }
+
+ def "an OAuth2 token failure (#scenario) prevents participant resolution so the inbound message is rejected with 400"() {
+ given: "the token endpoint is unhealthy"
+ wireMock.stubFor(post(urlPathEqualTo(OAUTH_PATH)).willReturn(tokenResponse))
+
+ when: "a validly-signed FlexRequest is POSTed"
+ Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN))
+
+ then: "without a bearer token the DSO cannot be resolved, verification fails and the endpoint returns 400"
+ response.status == 400
+ response.close()
+
+ and: "nothing is delivered to the broker"
+ wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH)))
+
+ where:
+ scenario | tokenResponse
+ "500 error" | aResponse().withStatus(500)
+ "invalid token JSON" | okJson("not-json")
+ }
+
+ def "an address-book 404 for the sender domain is rejected with 400"() {
+ given: "the address book does not know the DSO"
+ wireMock.stubFor(get(urlPathEqualTo(ADDRESS_BOOK_PATH)).willReturn(aResponse().withStatus(404)))
+
+ when: "a validly-signed FlexRequest is POSTed"
+ Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN))
+
+ then: "the sender cannot be resolved, verification fails and the endpoint returns 400"
+ response.status == 400
+ response.close()
+
+ and: "nothing is delivered to the broker"
+ wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH)))
+ }
+
+ def "the resolved participant is cached so a second inbound message does not trigger a second address-book lookup"() {
+ given:
+ def conditions = new PollingConditions(timeout: 10, delay: 0.2)
+
+ when: "two signed in-scope FlexRequests are POSTed"
+ postSignedAsDso(flexRequestXml(CONTRACTED_EAN)).close()
+ conditions.eventually {
+ wireMock.verify(getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH)))
+ }
+ postSignedAsDso(flexRequestXml(CONTRACTED_EAN)).close()
+ // Let any (zero-delay) second lookup happen before asserting it did not.
+ Thread.sleep(1000)
+
+ then: "the address book is queried exactly once -- the second message uses the cached participant"
+ wireMock.verify(1, getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH)))
+ }
+
+ // ---- WireMock stubs ----
+
+ private void stubOAuthToken(String token) {
+ wireMock.stubFor(post(urlPathEqualTo(OAUTH_PATH)).willReturn(
+ okJson("""{"access_token":"${token}","token_type":"Bearer","expires_in":3600,"scope":"uftp"}""")))
+ }
+
+ private void stubAddressBook(String publicKeyB64) {
+ wireMock.stubFor(get(urlPathEqualTo(ADDRESS_BOOK_PATH)).willReturn(
+ okJson("""{"domain":"${DSO_DOMAIN}","publicKey":"${publicKeyB64}"}""")))
+ }
+
+ private void stubBrokerAccepts() {
+ wireMock.stubFor(post(urlPathEqualTo(BROKER_PATH)).willReturn(aResponse().withStatus(200)))
+ }
+
+ // ---- HTTP helpers ----
+
+ // Signs the payload XML as the DSO and POSTs the resulting transport XML to the deployed endpoint.
+ private Response postSignedAsDso(String payloadXml) {
+ def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO)
+ SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, dsoPrivateKeyB64)
+ return postXml(GOPACSHandler.serializer.toXml(signed))
+ }
+
+ // The broker receives SignedMessage envelopes carrying the payload in the base64 "Body" attribute. The
+ // decoded bytes are the libsodium-signed payload (a 64-byte signature prefixed to the payload XML), so the
+ // outgoing message type is still a substring of the decoded text and can be asserted on the wire.
+ private List decodedBrokerPayloads() {
+ wireMock.findAll(postRequestedFor(urlPathEqualTo(BROKER_PATH))).collect { req ->
+ def matcher = (req.bodyAsString =~ /Body="([^"]+)"/)
+ matcher ? new String(Base64.decoder.decode(matcher[0][1] as String)) : ""
+ }
+ }
+
+ private Response postXml(String transportXml) {
+ // The deployment registers a realm-path-extractor filter (realmIndex 0), so the realm is the first
+ // segment after the /gopacs context path: /gopacs/{realm}/message -> resource @Path("message").
+ createClient(null)
+ .target(serverUri(serverPort).path("gopacs").path(MASTER_REALM).path("message"))
+ .request()
+ .post(Entity.entity(transportXml, "text/xml"))
+ }
+
+ // ---- Embedded UFTP payload fixtures (attribute-style XML, matching the example message format) ----
+
+ private static String flexRequestXml(String congestionPoint) {
+ """
+
+
+
+"""
+ }
+
+ private static String flexOrderXml(String congestionPoint, List powers) {
+ def isps = (0..
+ """ """
+ }.join("\n")
+ """
+
+${isps}
+"""
+ }
+}
diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy
new file mode 100644
index 0000000..98bdfb0
--- /dev/null
+++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2026, OpenRemote Inc.
+ *
+ * See the CONTRIBUTORS.txt file in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.openremote.extension.ems.manager.gopacs
+
+import com.goterl.lazysodium.LazySodiumJava
+import com.goterl.lazysodium.SodiumJava
+import com.goterl.lazysodium.utils.KeyPair
+import org.lfenergy.shapeshifter.api.*
+import org.lfenergy.shapeshifter.api.model.UftpParticipantInformation
+import org.lfenergy.shapeshifter.core.model.OutgoingUftpMessage
+import org.lfenergy.shapeshifter.core.model.UftpParticipant
+import org.openremote.container.timer.TimerService
+import org.openremote.manager.asset.AssetProcessingService
+import org.openremote.manager.datapoint.AssetPredictedDatapointService
+import spock.lang.Specification
+
+import java.time.LocalDate
+import java.time.ZoneOffset
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.ScheduledFuture
+import java.util.concurrent.TimeUnit
+
+/**
+ * Exercises the day-ahead UFTP message flow handled by {@link GOPACSHandler} on the AGR (EMS) side.
+ *
+ * Real signed messages are driven through the handler's actual entry point ({@code processRawMessage})
+ * using libsodium crypto -- no database, no container. The embedded payload XML matches the wire format
+ * of the example messages (an outbound FlexRequestResponse + its SignedMessage envelope), with the
+ * inbound direction being DSO (nilsgrid.net) -> AGR (openremote.io). Outbound messages -- both the
+ * library-generated responses and the handler-built FlexOffer -- are captured by overriding
+ * {@code notifyNewOutgoingMessage}.
+ */
+class GOPACSHandlerTest extends Specification {
+
+ static final String CONTRACTED_EAN = "ean.871234567890123456"
+ static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu"
+ static final String AGR_DOMAIN = "openremote.io" // our AGR: recipient of inbound, sender of replies
+ static final String DSO_DOMAIN = "nilsgrid.net" // the DSO: sender of inbound, recipient of replies
+ static final LocalDate PERIOD = LocalDate.of(2026, 6, 4)
+ // IDs taken from the provided example payload.xml so that file is literally the expected FlexRequest reply.
+ static final String FLEX_REQUEST_MESSAGE_ID = "b3030b8b-2f45-43f4-8bf3-f00ef6fd74fc"
+ static final String CONVERSATION_ID = "5f7c9c4d-5988-4a17-a479-2f716051fd6d"
+
+ AssetProcessingService assetProcessingService
+ AssetPredictedDatapointService assetPredictedDatapointService
+ TimerService timerService
+ ScheduledExecutorService executor
+ RecordingGOPACSHandler handler
+
+ def setup() {
+ assetProcessingService = Mock(AssetProcessingService)
+ assetPredictedDatapointService = Mock(AssetPredictedDatapointService)
+ timerService = Stub(TimerService) {
+ getCurrentTimeMillis() >> PERIOD.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()
+ }
+ // Run every scheduled task inline so processRawMessage is fully synchronous.
+ executor = Stub(ScheduledExecutorService) {
+ schedule(_ as Runnable, _ as Long, _ as TimeUnit) >> { Runnable r, long d, TimeUnit u ->
+ r.run()
+ Stub(ScheduledFuture)
+ }
+ }
+
+ // Generate an ed25519 keypair. The handler's crypto pool (LazySodiumBase64Pool) decodes both the
+ // signing secret key and the verifying public key as base64.
+ def lazySodium = new LazySodiumJava(new SodiumJava())
+ KeyPair kp = lazySodium.cryptoSignKeypair()
+ String privKeyB64 = Base64.encoder.encodeToString(kp.secretKey.asBytes)
+ String pubB64 = Base64.encoder.encodeToString(kp.publicKey.asBytes)
+
+ handler = new RecordingGOPACSHandler(CONTRACTED_EAN, "master", ASSET_ID,
+ assetProcessingService, assetPredictedDatapointService, timerService, executor, privKeyB64)
+
+ // Pre-seed the DSO participant so inbound signature verification never makes an HTTP call.
+ handler.participants.put(DSO_DOMAIN,
+ new UftpParticipantInformation(DSO_DOMAIN, pubB64, "https://nilsgrid.net/shapeshifter/api/v3/message", true))
+ }
+
+ def "FlexRequest updates the asset from request ISPs and replies with FlexRequestResponse then FlexOffer"() {
+ given: "a signed in-scope FlexRequest in the example wire format"
+ def xml = flexRequestXml(CONTRACTED_EAN)
+
+ when: "it is processed"
+ signAndProcess(xml)
+
+ then: "predicted datapoints are written: max=importMax, min=exportMax"
+ 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerMaximumFlexRequest", { List dps ->
+ dps.size() == 2 && dps.collect { it.value as double } == [5.0d, 6.0d]
+ })
+ 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerMinimumFlexRequest", { List dps ->
+ dps.size() == 2 && dps.collect { it.value as double } == [3.0d, 4.0d]
+ })
+
+ and: "a FlexRequestResponse is sent, followed by a FlexOffer for the same congestion point"
+ handler.sent.size() == 2
+ handler.sent[0] instanceof FlexRequestResponse
+ handler.sent[1] instanceof FlexOffer
+ ((FlexOffer) handler.sent[1]).congestionPoint == CONTRACTED_EAN
+ !((FlexOffer) handler.sent[1]).offerOptions.isEmpty()
+
+ and: "the emitted FlexRequestResponse matches the example payload.xml shape"
+ def response = (FlexRequestResponse) handler.sent[0]
+ response.result == AcceptedRejectedType.ACCEPTED
+ response.flexRequestMessageID == FLEX_REQUEST_MESSAGE_ID
+ response.senderDomain == AGR_DOMAIN
+ response.recipientDomain == DSO_DOMAIN
+ response.version == "3.0.0"
+ response.conversationID == CONVERSATION_ID
+ }
+
+ def "FlexOfferResponse (#result) is handled without mutating the asset or sending a reply"() {
+ when: "a signed FlexOfferResponse is processed"
+ signAndProcess(flexOfferResponseXml(result))
+
+ then: "no asset mutation and no outbound message"
+ 0 * assetPredictedDatapointService.updateValues(_, _, _)
+ 0 * assetProcessingService.sendAttributeEvent(_, _)
+ handler.sent.isEmpty()
+
+ where:
+ result << [AcceptedRejectedType.ACCEPTED, AcceptedRejectedType.REJECTED]
+ }
+
+ def "FlexOrder with offtake power updates currentPower and the max-profile and replies with FlexOrderResponse"() {
+ when: "a signed in-scope FlexOrder with positive (offtake) power is processed"
+ signAndProcess(flexOrderXml(CONTRACTED_EAN, [4000, 8000])) // 4.0, 8.0 kW
+
+ then: "current power and the offtake (max) profile are written; the feed-in (min) profile is not"
+ 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps ->
+ dps.size() == 2 && dps.collect { it.value as double } == [4.0d, 8.0d]
+ })
+ 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMaximumProfileFlexOrder", { List dps ->
+ dps.size() == 2 && dps.collect { it.value as double } == [4.0d, 8.0d]
+ })
+ 0 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMinimumProfileFlexOrder", _)
+
+ and: "a FlexOrderResponse (Accepted) is sent back"
+ handler.sent.size() == 1
+ handler.sent[0] instanceof FlexOrderResponse
+ ((FlexOrderResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED
+ }
+
+ def "FlexOrder with feed-in power updates currentPower and the min-profile"() {
+ when: "a signed in-scope FlexOrder with negative (feed-in) power is processed"
+ signAndProcess(flexOrderXml(CONTRACTED_EAN, [-2000, -5000])) // -2.0, -5.0 kW
+
+ then: "current power and the feed-in (min) profile are written; the offtake (max) profile is not"
+ 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps ->
+ dps.size() == 2 && dps.collect { it.value as double } == [-2.0d, -5.0d]
+ })
+ 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMinimumProfileFlexOrder", { List dps ->
+ dps.size() == 2 && dps.collect { it.value as double } == [-2.0d, -5.0d]
+ })
+ 0 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMaximumProfileFlexOrder", _)
+
+ and: "a FlexOrderResponse (Accepted) is sent back"
+ handler.sent.size() == 1
+ handler.sent[0] instanceof FlexOrderResponse
+ ((FlexOrderResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED
+ }
+
+ def "a validly-signed FlexRequest for a different congestion point is dropped with no mutation and no reply"() {
+ when: "a signed out-of-scope FlexRequest (different EAN) is processed"
+ signAndProcess(flexRequestXml("ean.999999999999999999"))
+
+ then: "the asset is not mutated and nothing is sent back"
+ 0 * assetPredictedDatapointService.updateValues(_, _, _)
+ 0 * assetProcessingService.sendAttributeEvent(_, _)
+ handler.sent.isEmpty()
+ }
+
+ // ---- Embedded UFTP payload fixtures (attribute-style XML, matching the example message format) ----
+
+ private static String flexRequestXml(String congestionPoint) {
+ """
+
+
+
+"""
+ }
+
+ private static String flexOrderXml(String congestionPoint, List powers) {
+ def isps = (0..
+ """ """
+ }.join("\n")
+ """
+
+${isps}
+"""
+ }
+
+ private static String flexOfferResponseXml(AcceptedRejectedType result) {
+ def reason = result == AcceptedRejectedType.REJECTED ? ' RejectionReason="insufficient flexibility"' : ''
+ """
+"""
+ }
+
+ // Signs the payload XML as the DSO and feeds the transport XML through the real entry point.
+ private void signAndProcess(String payloadXml) {
+ def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO)
+ SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, handler.privateKey)
+ handler.processRawMessage(GOPACSHandler.serializer.toXml(signed))
+ }
+
+ // Records outbound messages (library-generated responses + handler-built FlexOffer) instead of sending them.
+ static class RecordingGOPACSHandler extends GOPACSHandler {
+ final List sent = new ArrayList<>()
+
+ RecordingGOPACSHandler(String ean, String realm, String assetId,
+ AssetProcessingService aps, AssetPredictedDatapointService apds,
+ TimerService ts, ScheduledExecutorService exec, String privateKey) {
+ super(ean, realm, assetId, aps, apds, ts, exec, privateKey)
+ }
+
+ @Override
+ void notifyNewOutgoingMessage(OutgoingUftpMessage extends PayloadMessageType> message) {
+ sent.add(message.payloadMessage())
+ }
+ }
+}
diff --git a/gradle.properties b/gradle.properties
index 75531c3..aa28d06 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -20,3 +20,4 @@ jacksonVersion = 2.21.3
resteasyVersion = 6.2.16.Final
shapeshifterVersion = 3.5.0
testLoggerVersion = 4.0.0
+wiremockVersion = 3.13.1