diff --git a/ems/build.gradle b/ems/build.gradle index 5557af3..1a99709 100644 --- a/ems/build.gradle +++ b/ems/build.gradle @@ -14,6 +14,10 @@ dependencies { api "org.jboss.resteasy:resteasy-client-api:$resteasyVersion" implementation "org.lfenergy.shapeshifter:shapeshifter-core:$shapeshifterVersion" testImplementation "io.openremote:openremote-test:$openremoteVersion" + // Standalone (shaded) WireMock so its embedded Jetty/Jackson don't clash with the manager's web stack; + // used to mock the GOPACS broker / OAuth2 / address-book HTTP endpoints (the broker send goes through + // java.net.http.HttpClient, which a JAX-RS ClientRequestFilter cannot intercept). + testImplementation "org.wiremock:wiremock-standalone:$wiremockVersion" } jar { diff --git a/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java b/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java index 15cfc2d..108dc6a 100644 --- a/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java +++ b/ems/src/main/java/org/openremote/extension/ems/manager/gopacs/GOPACSHandler.java @@ -211,6 +211,52 @@ protected GOPACSHandler(String contractedEAN, String realm, String electricitySu deploy(container); } + /** + * Test-support constructor. Wires the message-processing collaborators directly and skips + * remote configuration (OAuth client, private-key file) and JAX-RS deployment, so the + * day-ahead UFTP message flow can be exercised in isolation. Not used in production wiring. + */ + protected GOPACSHandler(String contractedEAN, + String realm, + String electricitySupplierAssetId, + AssetProcessingService assetProcessingService, + AssetPredictedDatapointService assetPredictedDatapointService, + TimerService timerService, + ScheduledExecutorService scheduledExecutorService, + String privateKey) { + this.devMode = false; + this.contractedEAN = contractedEAN; + this.realm = realm; + this.electricitySupplierAssetId = electricitySupplierAssetId; + this.participants = new HashMap<>(); + + this.assetProcessingService = assetProcessingService; + this.assetPredictedDatapointService = assetPredictedDatapointService; + this.timerService = timerService; + this.scheduledExecutorService = scheduledExecutorService; + this.webService = null; + + this.gopacsBrokerUrl = ""; + this.responseDelaySeconds = 0; + this.flexOfferDelaySeconds = 0; + this.clientId = null; + this.clientSecret = null; + this.privateKey = privateKey; + + this.client = null; + this.gopacsAddressBookResource = null; + this.gopacsAuthResource = null; + this.gopacsServerResource = null; + + this.participantResolutionService = new ParticipantResolutionService(this); + this.cryptoService = new UftpCryptoService(participantResolutionService, new LazySodiumFactory(), new LazySodiumBase64Pool()); + this.uftpValidationService = new UftpValidationService(new ArrayList<>()); + this.uftpReceivedMessageService = new UftpReceivedMessageService(uftpValidationService, this); + this.uftpSendMessageService = new UftpSendMessageService(serializer, cryptoService, participantResolutionService, this, uftpValidationService); + + this.objectMapper = new ObjectMapper(); + } + protected static String getDeploymentName(String contractedEAN) { return "GOPACS: " + contractedEAN; } diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy new file mode 100644 index 0000000..50d9df1 --- /dev/null +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerHttpTest.groovy @@ -0,0 +1,391 @@ +/* + * Copyright 2026, OpenRemote Inc. + * + * See the CONTRIBUTORS.txt file in the distribution for a + * full listing of individual contributors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.openremote.extension.ems.manager.gopacs + +import com.github.tomakehurst.wiremock.WireMockServer +import com.goterl.lazysodium.LazySodiumJava +import com.goterl.lazysodium.SodiumJava +import com.goterl.lazysodium.utils.KeyPair +import jakarta.ws.rs.client.Entity +import jakarta.ws.rs.core.Response +import org.lfenergy.shapeshifter.api.USEFRoleType +import org.lfenergy.shapeshifter.api.SignedMessage +import org.lfenergy.shapeshifter.core.model.UftpParticipant +import org.openremote.container.web.WebService +import org.openremote.extension.ems.manager.EmsOptimisationService +import org.openremote.extension.ems.manager.EmsOptimisationSetupService +import org.openremote.model.ContainerService +import org.openremote.test.ManagerContainerTrait +import spock.lang.IgnoreIf +import spock.lang.Shared +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.time.LocalDate + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse +import static com.github.tomakehurst.wiremock.client.WireMock.containing +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo +import static com.github.tomakehurst.wiremock.client.WireMock.get +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor +import static com.github.tomakehurst.wiremock.client.WireMock.okJson +import static com.github.tomakehurst.wiremock.client.WireMock.post +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo +import static org.openremote.model.Constants.MASTER_REALM + +/** + * HTTP-contract integration test for {@link GOPACSHandler}, complementing the in-process message-flow + * unit test in {@code GOPACSHandlerTest}. Where that test calls {@code processRawMessage} directly and + * records outgoing payload objects, this test exercises the real transport chain: + * + * + * + * The broker send uses the JDK {@code HttpClient}, which a JAX-RS {@code ClientRequestFilter} (the style + * used by {@code EntsoeProtocolTest}) cannot intercept; hence the WireMock server, matching the approach + * shapeshifter itself uses to test {@code UftpSendMessageService}. + * + * Like the other manager-container integration tests in this repository (e.g. {@code EntsoeProtocolTest}), + * this requires a running OpenRemote dev stack and is skipped on CI. + * + * Asset mutation (predicted datapoints / attribute events) is intentionally not re-asserted here -- those + * writes are scheduled asynchronously and are already covered precisely by {@code GOPACSHandlerTest}. This + * test focuses on the HTTP contract, so a placeholder asset id is sufficient. + */ +@IgnoreIf({ System.getenv("GITHUB_ACTIONS") == "true" }) +class GOPACSHandlerHttpTest extends Specification implements ManagerContainerTrait { + + static final String CONTRACTED_EAN = "ean.871234567890123456" + static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu" + static final String AGR_DOMAIN = "openremote.io" // our AGR: recipient of inbound, sender of replies + static final String DSO_DOMAIN = "nilsgrid.net" // the DSO: sender of inbound, recipient of replies + static final LocalDate PERIOD = LocalDate.of(2026, 6, 4) + static final String FLEX_REQUEST_MESSAGE_ID = "b3030b8b-2f45-43f4-8bf3-f00ef6fd74fc" + static final String CONVERSATION_ID = "5f7c9c4d-5988-4a17-a479-2f716051fd6d" + static final String ACCESS_TOKEN = "test-access-token" + + static final String OAUTH_PATH = "/oauth/token" + static final String ADDRESS_BOOK_PATH = "/uftp-participants/v3/participants/" + DSO_DOMAIN + static final String BROKER_PATH = "/shapeshifter/api/v3/message" + + @Shared + WireMockServer wireMock + @Shared + String dsoPrivateKeyB64 + @Shared + String dsoPublicKeyB64 + @Shared + File privateKeyFile + @Shared + GOPACSHandler handler + @Shared + int serverPort + + def setupSpec() { + // Two ed25519 keypairs: the DSO signs the inbound messages (its public key is served by the + // address-book stub so the handler can verify them), and the AGR signs its outbound replies. + def lazySodium = new LazySodiumJava(new SodiumJava()) + KeyPair dso = lazySodium.cryptoSignKeypair() + dsoPrivateKeyB64 = Base64.encoder.encodeToString(dso.secretKey.asBytes) + dsoPublicKeyB64 = Base64.encoder.encodeToString(dso.publicKey.asBytes) + + KeyPair agr = lazySodium.cryptoSignKeypair() + privateKeyFile = File.createTempFile("gopacs-agr-private-key", ".txt") + privateKeyFile.deleteOnExit() + privateKeyFile.text = Base64.encoder.encodeToString(agr.secretKey.asBytes) + + wireMock = new WireMockServer(0) + wireMock.start() + } + + def cleanupSpec() { + handler?.undeploy() + wireMock?.stop() + } + + def setup() { + // Must be a plain String (not a Groovy GString): OpenRemote's Config.init casts every config value to String. + String base = "http://localhost:${wireMock.port()}".toString() + def config = defaultConfig() << [ + (GOPACSHandler.GOPACS_CLIENT_ID) : "test-client", + (GOPACSHandler.GOPACS_CLIENT_SECRET) : "test-secret", + (GOPACSHandler.GOPACS_PRIVATE_KEY_FILE) : privateKeyFile.absolutePath, + (GOPACSHandler.GOPACS_OAUTH2_URL) : base + OAUTH_PATH, + (GOPACSHandler.GOPACS_PARTICIPANT_URL) : base, + (GOPACSHandler.GOPACS_BROKER_URL) : base, + // Send replies immediately so assertions don't wait on the production response/offer delays. + (GOPACSHandler.GOPACS_RESPONSE_DELAY_SECONDS) : "0", + (GOPACSHandler.GOPACS_FLEX_OFFER_DELAY_SECONDS): "0" + ] + + // startContainer reuses the already-running container when config (ignoring the web-server port) + // and services match, so this is cheap after the first feature. + def runningContainer = startContainer(config, gopacsServices()) + + if (handler == null) { + serverPort = runningContainer.getConfig().get(WebService.OR_WEBSERVER_LISTEN_PORT) as int + + // Construct the production handler directly (real RESTEasy client, real send service, real + // JAX-RS deployment). EmsOptimisationService is excluded from the service list above so it + // cannot spin up a second handler on the same /gopacs deployment path. + handler = new GOPACSHandler(CONTRACTED_EAN, MASTER_REALM, ASSET_ID, runningContainer) + } + + // Reset cross-test state: the participant cache leaks across features otherwise, and the request + // journal / stubs must start clean for the per-feature verifications. + handler.participants.clear() + wireMock.resetAll() + stubOAuthToken(ACCESS_TOKEN) + stubAddressBook(dsoPublicKeyB64) + stubBrokerAccepts() + } + + // The manager services minus the EMS optimisation services, which would otherwise deploy their own + // GOPACS handler (on the same /gopacs path) from any persisted EmsGOPACSAsset. + private Iterable gopacsServices() { + defaultServices().findAll { + !(it instanceof EmsOptimisationService) && !(it instanceof EmsOptimisationSetupService) + } + } + + def "a signed in-scope FlexRequest POSTed to /gopacs/message is accepted and the reply is delivered to the broker with a bearer token"() { + given: "polling for the asynchronous outbound deliveries" + def conditions = new PollingConditions(timeout: 10, delay: 0.2) + + when: "a signed FlexRequest is POSTed to the deployed endpoint" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "the transport call succeeds" + response.statusInfo.family == Response.Status.Family.SUCCESSFUL + response.close() + + and: "verifying the signature fetched an OAuth2 token and resolved the DSO via the address book" + conditions.eventually { + wireMock.verify(postRequestedFor(urlPathEqualTo(OAUTH_PATH)) + .withRequestBody(containing("grant_type=client_credentials"))) + wireMock.verify(getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH)) + .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN))) + } + + and: "the FlexRequestResponse and FlexOffer are actually POSTed to the broker as signed XML with a bearer token" + conditions.eventually { + wireMock.verify(postRequestedFor(urlPathEqualTo(BROKER_PATH)) + .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN)) + .withRequestBody(containing("SignedMessage"))) + def payloads = decodedBrokerPayloads() + assert payloads.any { it.contains("FlexRequestResponse") } + assert payloads.any { it.contains("FlexOffer") } + } + } + + def "a signed in-scope FlexOrder POSTed to /gopacs/message is accepted and a FlexOrderResponse is delivered to the broker"() { + given: + def conditions = new PollingConditions(timeout: 10, delay: 0.2) + + when: "a signed FlexOrder is POSTed to the deployed endpoint" + Response response = postSignedAsDso(flexOrderXml(CONTRACTED_EAN, [4000, 8000])) + + then: "the transport call succeeds" + response.statusInfo.family == Response.Status.Family.SUCCESSFUL + response.close() + + and: "a FlexOrderResponse is POSTed to the broker as signed XML with a bearer token" + conditions.eventually { + wireMock.verify(postRequestedFor(urlPathEqualTo(BROKER_PATH)) + .withHeader("Authorization", equalTo("Bearer " + ACCESS_TOKEN)) + .withRequestBody(containing("SignedMessage"))) + assert decodedBrokerPayloads().any { it.contains("FlexOrderResponse") } + } + } + + def "a validly-signed FlexRequest for a different congestion point is accepted but produces no broker delivery"() { + when: "a signed out-of-scope FlexRequest (different EAN) is POSTed" + Response response = postSignedAsDso(flexRequestXml("ean.999999999999999999")) + + then: "the transport call still succeeds (the signed envelope is accepted)" + response.statusInfo.family == Response.Status.Family.SUCCESSFUL + response.close() + + and: "no message is ever delivered to the broker" + // The drop happens before any reply is scheduled; allow the (zero-delay) scheduler to run first. + Thread.sleep(1000) + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "malformed transport XML is rejected with 400"() { + when: "a body that is not a SignedMessage envelope is POSTed" + Response response = postXml("") + + then: "the endpoint maps the failure to 400 Bad Request" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "a correctly-formed message whose signature does not match the resolved public key is rejected with 400"() { + given: "the address book returns a public key that does not match the DSO signing key" + def lazySodium = new LazySodiumJava(new SodiumJava()) + def wrongPublicKeyB64 = Base64.encoder.encodeToString(lazySodium.cryptoSignKeypair().publicKey.asBytes) + stubAddressBook(wrongPublicKeyB64) + + when: "a validly-signed FlexRequest is POSTed" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "signature verification fails and the endpoint returns 400 (the UftpConnectorException branch)" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "an OAuth2 token failure (#scenario) prevents participant resolution so the inbound message is rejected with 400"() { + given: "the token endpoint is unhealthy" + wireMock.stubFor(post(urlPathEqualTo(OAUTH_PATH)).willReturn(tokenResponse)) + + when: "a validly-signed FlexRequest is POSTed" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "without a bearer token the DSO cannot be resolved, verification fails and the endpoint returns 400" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + + where: + scenario | tokenResponse + "500 error" | aResponse().withStatus(500) + "invalid token JSON" | okJson("not-json") + } + + def "an address-book 404 for the sender domain is rejected with 400"() { + given: "the address book does not know the DSO" + wireMock.stubFor(get(urlPathEqualTo(ADDRESS_BOOK_PATH)).willReturn(aResponse().withStatus(404))) + + when: "a validly-signed FlexRequest is POSTed" + Response response = postSignedAsDso(flexRequestXml(CONTRACTED_EAN)) + + then: "the sender cannot be resolved, verification fails and the endpoint returns 400" + response.status == 400 + response.close() + + and: "nothing is delivered to the broker" + wireMock.verify(0, postRequestedFor(urlPathEqualTo(BROKER_PATH))) + } + + def "the resolved participant is cached so a second inbound message does not trigger a second address-book lookup"() { + given: + def conditions = new PollingConditions(timeout: 10, delay: 0.2) + + when: "two signed in-scope FlexRequests are POSTed" + postSignedAsDso(flexRequestXml(CONTRACTED_EAN)).close() + conditions.eventually { + wireMock.verify(getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH))) + } + postSignedAsDso(flexRequestXml(CONTRACTED_EAN)).close() + // Let any (zero-delay) second lookup happen before asserting it did not. + Thread.sleep(1000) + + then: "the address book is queried exactly once -- the second message uses the cached participant" + wireMock.verify(1, getRequestedFor(urlPathEqualTo(ADDRESS_BOOK_PATH))) + } + + // ---- WireMock stubs ---- + + private void stubOAuthToken(String token) { + wireMock.stubFor(post(urlPathEqualTo(OAUTH_PATH)).willReturn( + okJson("""{"access_token":"${token}","token_type":"Bearer","expires_in":3600,"scope":"uftp"}"""))) + } + + private void stubAddressBook(String publicKeyB64) { + wireMock.stubFor(get(urlPathEqualTo(ADDRESS_BOOK_PATH)).willReturn( + okJson("""{"domain":"${DSO_DOMAIN}","publicKey":"${publicKeyB64}"}"""))) + } + + private void stubBrokerAccepts() { + wireMock.stubFor(post(urlPathEqualTo(BROKER_PATH)).willReturn(aResponse().withStatus(200))) + } + + // ---- HTTP helpers ---- + + // Signs the payload XML as the DSO and POSTs the resulting transport XML to the deployed endpoint. + private Response postSignedAsDso(String payloadXml) { + def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO) + SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, dsoPrivateKeyB64) + return postXml(GOPACSHandler.serializer.toXml(signed)) + } + + // The broker receives SignedMessage envelopes carrying the payload in the base64 "Body" attribute. The + // decoded bytes are the libsodium-signed payload (a 64-byte signature prefixed to the payload XML), so the + // outgoing message type is still a substring of the decoded text and can be asserted on the wire. + private List decodedBrokerPayloads() { + wireMock.findAll(postRequestedFor(urlPathEqualTo(BROKER_PATH))).collect { req -> + def matcher = (req.bodyAsString =~ /Body="([^"]+)"/) + matcher ? new String(Base64.decoder.decode(matcher[0][1] as String)) : "" + } + } + + private Response postXml(String transportXml) { + // The deployment registers a realm-path-extractor filter (realmIndex 0), so the realm is the first + // segment after the /gopacs context path: /gopacs/{realm}/message -> resource @Path("message"). + createClient(null) + .target(serverUri(serverPort).path("gopacs").path(MASTER_REALM).path("message")) + .request() + .post(Entity.entity(transportXml, "text/xml")) + } + + // ---- Embedded UFTP payload fixtures (attribute-style XML, matching the example message format) ---- + + private static String flexRequestXml(String congestionPoint) { + """ + + + +""" + } + + private static String flexOrderXml(String congestionPoint, List powers) { + def isps = (0.. + """ """ + }.join("\n") + """ + +${isps} +""" + } +} diff --git a/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy new file mode 100644 index 0000000..98bdfb0 --- /dev/null +++ b/ems/src/test/groovy/org/openremote/extension/ems/manager/gopacs/GOPACSHandlerTest.groovy @@ -0,0 +1,245 @@ +/* + * Copyright 2026, OpenRemote Inc. + * + * See the CONTRIBUTORS.txt file in the distribution for a + * full listing of individual contributors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.openremote.extension.ems.manager.gopacs + +import com.goterl.lazysodium.LazySodiumJava +import com.goterl.lazysodium.SodiumJava +import com.goterl.lazysodium.utils.KeyPair +import org.lfenergy.shapeshifter.api.* +import org.lfenergy.shapeshifter.api.model.UftpParticipantInformation +import org.lfenergy.shapeshifter.core.model.OutgoingUftpMessage +import org.lfenergy.shapeshifter.core.model.UftpParticipant +import org.openremote.container.timer.TimerService +import org.openremote.manager.asset.AssetProcessingService +import org.openremote.manager.datapoint.AssetPredictedDatapointService +import spock.lang.Specification + +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit + +/** + * Exercises the day-ahead UFTP message flow handled by {@link GOPACSHandler} on the AGR (EMS) side. + * + * Real signed messages are driven through the handler's actual entry point ({@code processRawMessage}) + * using libsodium crypto -- no database, no container. The embedded payload XML matches the wire format + * of the example messages (an outbound FlexRequestResponse + its SignedMessage envelope), with the + * inbound direction being DSO (nilsgrid.net) -> AGR (openremote.io). Outbound messages -- both the + * library-generated responses and the handler-built FlexOffer -- are captured by overriding + * {@code notifyNewOutgoingMessage}. + */ +class GOPACSHandlerTest extends Specification { + + static final String CONTRACTED_EAN = "ean.871234567890123456" + static final String ASSET_ID = "0abcDEFghiJKLmnoPQRstu" + static final String AGR_DOMAIN = "openremote.io" // our AGR: recipient of inbound, sender of replies + static final String DSO_DOMAIN = "nilsgrid.net" // the DSO: sender of inbound, recipient of replies + static final LocalDate PERIOD = LocalDate.of(2026, 6, 4) + // IDs taken from the provided example payload.xml so that file is literally the expected FlexRequest reply. + static final String FLEX_REQUEST_MESSAGE_ID = "b3030b8b-2f45-43f4-8bf3-f00ef6fd74fc" + static final String CONVERSATION_ID = "5f7c9c4d-5988-4a17-a479-2f716051fd6d" + + AssetProcessingService assetProcessingService + AssetPredictedDatapointService assetPredictedDatapointService + TimerService timerService + ScheduledExecutorService executor + RecordingGOPACSHandler handler + + def setup() { + assetProcessingService = Mock(AssetProcessingService) + assetPredictedDatapointService = Mock(AssetPredictedDatapointService) + timerService = Stub(TimerService) { + getCurrentTimeMillis() >> PERIOD.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() + } + // Run every scheduled task inline so processRawMessage is fully synchronous. + executor = Stub(ScheduledExecutorService) { + schedule(_ as Runnable, _ as Long, _ as TimeUnit) >> { Runnable r, long d, TimeUnit u -> + r.run() + Stub(ScheduledFuture) + } + } + + // Generate an ed25519 keypair. The handler's crypto pool (LazySodiumBase64Pool) decodes both the + // signing secret key and the verifying public key as base64. + def lazySodium = new LazySodiumJava(new SodiumJava()) + KeyPair kp = lazySodium.cryptoSignKeypair() + String privKeyB64 = Base64.encoder.encodeToString(kp.secretKey.asBytes) + String pubB64 = Base64.encoder.encodeToString(kp.publicKey.asBytes) + + handler = new RecordingGOPACSHandler(CONTRACTED_EAN, "master", ASSET_ID, + assetProcessingService, assetPredictedDatapointService, timerService, executor, privKeyB64) + + // Pre-seed the DSO participant so inbound signature verification never makes an HTTP call. + handler.participants.put(DSO_DOMAIN, + new UftpParticipantInformation(DSO_DOMAIN, pubB64, "https://nilsgrid.net/shapeshifter/api/v3/message", true)) + } + + def "FlexRequest updates the asset from request ISPs and replies with FlexRequestResponse then FlexOffer"() { + given: "a signed in-scope FlexRequest in the example wire format" + def xml = flexRequestXml(CONTRACTED_EAN) + + when: "it is processed" + signAndProcess(xml) + + then: "predicted datapoints are written: max=importMax, min=exportMax" + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerMaximumFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [5.0d, 6.0d] + }) + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerMinimumFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [3.0d, 4.0d] + }) + + and: "a FlexRequestResponse is sent, followed by a FlexOffer for the same congestion point" + handler.sent.size() == 2 + handler.sent[0] instanceof FlexRequestResponse + handler.sent[1] instanceof FlexOffer + ((FlexOffer) handler.sent[1]).congestionPoint == CONTRACTED_EAN + !((FlexOffer) handler.sent[1]).offerOptions.isEmpty() + + and: "the emitted FlexRequestResponse matches the example payload.xml shape" + def response = (FlexRequestResponse) handler.sent[0] + response.result == AcceptedRejectedType.ACCEPTED + response.flexRequestMessageID == FLEX_REQUEST_MESSAGE_ID + response.senderDomain == AGR_DOMAIN + response.recipientDomain == DSO_DOMAIN + response.version == "3.0.0" + response.conversationID == CONVERSATION_ID + } + + def "FlexOfferResponse (#result) is handled without mutating the asset or sending a reply"() { + when: "a signed FlexOfferResponse is processed" + signAndProcess(flexOfferResponseXml(result)) + + then: "no asset mutation and no outbound message" + 0 * assetPredictedDatapointService.updateValues(_, _, _) + 0 * assetProcessingService.sendAttributeEvent(_, _) + handler.sent.isEmpty() + + where: + result << [AcceptedRejectedType.ACCEPTED, AcceptedRejectedType.REJECTED] + } + + def "FlexOrder with offtake power updates currentPower and the max-profile and replies with FlexOrderResponse"() { + when: "a signed in-scope FlexOrder with positive (offtake) power is processed" + signAndProcess(flexOrderXml(CONTRACTED_EAN, [4000, 8000])) // 4.0, 8.0 kW + + then: "current power and the offtake (max) profile are written; the feed-in (min) profile is not" + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [4.0d, 8.0d] + }) + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMaximumProfileFlexOrder", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [4.0d, 8.0d] + }) + 0 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMinimumProfileFlexOrder", _) + + and: "a FlexOrderResponse (Accepted) is sent back" + handler.sent.size() == 1 + handler.sent[0] instanceof FlexOrderResponse + ((FlexOrderResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED + } + + def "FlexOrder with feed-in power updates currentPower and the min-profile"() { + when: "a signed in-scope FlexOrder with negative (feed-in) power is processed" + signAndProcess(flexOrderXml(CONTRACTED_EAN, [-2000, -5000])) // -2.0, -5.0 kW + + then: "current power and the feed-in (min) profile are written; the offtake (max) profile is not" + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "currentPowerFlexRequest", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [-2.0d, -5.0d] + }) + 1 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMinimumProfileFlexOrder", { List dps -> + dps.size() == 2 && dps.collect { it.value as double } == [-2.0d, -5.0d] + }) + 0 * assetPredictedDatapointService.updateValues(ASSET_ID, "powerLimitMaximumProfileFlexOrder", _) + + and: "a FlexOrderResponse (Accepted) is sent back" + handler.sent.size() == 1 + handler.sent[0] instanceof FlexOrderResponse + ((FlexOrderResponse) handler.sent[0]).result == AcceptedRejectedType.ACCEPTED + } + + def "a validly-signed FlexRequest for a different congestion point is dropped with no mutation and no reply"() { + when: "a signed out-of-scope FlexRequest (different EAN) is processed" + signAndProcess(flexRequestXml("ean.999999999999999999")) + + then: "the asset is not mutated and nothing is sent back" + 0 * assetPredictedDatapointService.updateValues(_, _, _) + 0 * assetProcessingService.sendAttributeEvent(_, _) + handler.sent.isEmpty() + } + + // ---- Embedded UFTP payload fixtures (attribute-style XML, matching the example message format) ---- + + private static String flexRequestXml(String congestionPoint) { + """ + + + +""" + } + + private static String flexOrderXml(String congestionPoint, List powers) { + def isps = (0.. + """ """ + }.join("\n") + """ + +${isps} +""" + } + + private static String flexOfferResponseXml(AcceptedRejectedType result) { + def reason = result == AcceptedRejectedType.REJECTED ? ' RejectionReason="insufficient flexibility"' : '' + """ +""" + } + + // Signs the payload XML as the DSO and feeds the transport XML through the real entry point. + private void signAndProcess(String payloadXml) { + def sender = new UftpParticipant(DSO_DOMAIN, USEFRoleType.DSO) + SignedMessage signed = handler.cryptoService.signMessage(payloadXml, sender, handler.privateKey) + handler.processRawMessage(GOPACSHandler.serializer.toXml(signed)) + } + + // Records outbound messages (library-generated responses + handler-built FlexOffer) instead of sending them. + static class RecordingGOPACSHandler extends GOPACSHandler { + final List sent = new ArrayList<>() + + RecordingGOPACSHandler(String ean, String realm, String assetId, + AssetProcessingService aps, AssetPredictedDatapointService apds, + TimerService ts, ScheduledExecutorService exec, String privateKey) { + super(ean, realm, assetId, aps, apds, ts, exec, privateKey) + } + + @Override + void notifyNewOutgoingMessage(OutgoingUftpMessage message) { + sent.add(message.payloadMessage()) + } + } +} diff --git a/gradle.properties b/gradle.properties index 75531c3..aa28d06 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,3 +20,4 @@ jacksonVersion = 2.21.3 resteasyVersion = 6.2.16.Final shapeshifterVersion = 3.5.0 testLoggerVersion = 4.0.0 +wiremockVersion = 3.13.1