From b91f0b19cecf55d1b62f935a0d8a72958dec956d Mon Sep 17 00:00:00 2001 From: pawana_backbase Date: Tue, 20 Jan 2026 22:00:03 +0530 Subject: [PATCH 1/3] Generating Intraday Prices for the Assets --- .../InvestmentServiceConfiguration.java | 9 +- .../stream/investment/AssetLatestPrice.java | 7 + .../investment/InvestmentAssetData.java | 1 + .../investment/InvestmentAssetsTask.java | 4 + .../stream/investment/LatestPrice.java | 15 ++ .../PaginatedExpandedAssetList.java | 27 ++ .../saga/InvestmentAssetUniversSaga.java | 38 +-- .../InvestmentAssetUniverseService.java | 54 +++- .../InvestmentIntradayAssetPriceService.java | 253 ++++++++++++++++++ .../saga/InvestmentAssetUniversSagaTest.java | 5 + ...vestmentIntradayAssetPriceServiceTest.java | 56 ++++ 11 files changed, 449 insertions(+), 20 deletions(-) create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/PaginatedExpandedAssetList.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java create mode 100644 stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java index db61760f0..0b1e48d96 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java @@ -17,6 +17,7 @@ import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; import com.backbase.stream.investment.service.InvestmentClientService; +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentModelPortfolioService; import com.backbase.stream.investment.service.InvestmentPortfolioAllocationService; import com.backbase.stream.investment.service.InvestmentPortfolioService; @@ -73,6 +74,11 @@ public InvestmentAssetPriceService investmentAssetPriceService(AssetUniverseApi return new InvestmentAssetPriceService(assetUniverseApi); } + @Bean + public InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService(AssetUniverseApi assetUniverseApi) { + return new InvestmentIntradayAssetPriceService(assetUniverseApi); + } + @Bean public InvestmentPortfolioAllocationService investmentPortfolioAllocationService(AllocationsApi allocationsApi, AssetUniverseApi assetUniverseApi, InvestmentApi investmentApi, @@ -96,9 +102,10 @@ public InvestmentSaga investmentSaga(InvestmentClientService investmentClientSer public InvestmentAssetUniversSaga investmentStaticDataSaga( InvestmentAssetUniverseService investmentAssetUniverseService, InvestmentAssetPriceService investmentAssetPriceService, + InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService, InvestmentIngestionConfigurationProperties coreConfigurationProperties) { return new InvestmentAssetUniversSaga(investmentAssetUniverseService, investmentAssetPriceService, - coreConfigurationProperties); + investmentIntradayAssetPriceService, coreConfigurationProperties); } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java new file mode 100644 index 000000000..29d507e05 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java @@ -0,0 +1,7 @@ +package com.backbase.stream.investment; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.UUID; + +public record AssetLatestPrice(@JsonProperty("uuid") UUID uuid, + @JsonProperty("latest_price") LatestPrice latestPrice) {} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java index c577f1e7e..6da6ccb5b 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java @@ -27,6 +27,7 @@ public class InvestmentAssetData { private List assets; private List assetPrices; private List priceAsyncTasks; + private List intradayPriceAsyncTasks; public Map getPriceByAsset() { return Objects.requireNonNullElse(assetPrices, List.of()).stream() diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java index c4146c379..e1583fa73 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java @@ -51,4 +51,8 @@ public InvestmentAssetsTask setPriceTasks(List tasks) { return this; } + public InvestmentAssetsTask setIntradayPriceTasks(List tasks) { + data.setIntradayPriceAsyncTasks(tasks); + return this; + } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java new file mode 100644 index 000000000..93ee660fc --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java @@ -0,0 +1,15 @@ +package com.backbase.stream.investment; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.OffsetDateTime; + +public record LatestPrice( + @JsonProperty("amount") Double amount, + @JsonProperty("datetime") OffsetDateTime datetime, + @JsonProperty("open_price") Double openPrice, + @JsonProperty("high_price") Double highPrice, + @JsonProperty("low_price") Double lowPrice, + @JsonProperty("previous_close_price") Double previousClosePrice +) { + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/PaginatedExpandedAssetList.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/PaginatedExpandedAssetList.java new file mode 100644 index 000000000..207444820 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/PaginatedExpandedAssetList.java @@ -0,0 +1,27 @@ +package com.backbase.stream.investment; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode +@Data +@Builder +public class PaginatedExpandedAssetList { + + public static final String JSON_PROPERTY_COUNT = "count"; + private Integer count; + + public static final String JSON_PROPERTY_NEXT = "next"; + private URI next; + + public static final String JSON_PROPERTY_PREVIOUS = "previous"; + private URI previous; + + public static final String JSON_PROPERTY_RESULTS = "results"; + private List results = new ArrayList<>(); + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java index e7ab469b7..9fea4f4d5 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java @@ -10,6 +10,7 @@ import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; import com.backbase.stream.investment.service.InvestmentClientService; +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentPortfolioService; import com.backbase.stream.worker.StreamTaskExecutor; import com.backbase.stream.worker.model.StreamTask; @@ -59,32 +60,34 @@ public class InvestmentAssetUniversSaga implements StreamTaskExecutor executeTask(InvestmentAssetsTask streamTask) { if (!coreConfigurationProperties.isAssetUniversEnabled()) { - log.warn("Skip investment asset univers saga execution: taskId={}, taskName={}", + log.warn("Skip investment asset universe saga execution: taskId={}, taskName={}", streamTask.getId(), streamTask.getName()); return Mono.just(streamTask); } - log.info("Starting investment saga execution: taskId={}, taskName={}", + log.info("Starting investment asset universe saga execution: taskId={}, taskName={}", streamTask.getId(), streamTask.getName()); - return createMarkets(streamTask) - .flatMap(this::createMarketSpecialDays) - .flatMap(this::createAssetCategoryTypes) - .flatMap(this::createAssetCategories) + return upsertMarkets(streamTask) + .flatMap(this::upsertMarketSpecialDays) + .flatMap(this::upsertAssetCategoryTypes) + .flatMap(this::upsertAssetCategories) .flatMap(this::createAssets) .flatMap(this::upsertPrices) + .flatMap(this::upsertIntradayPrices) .doOnSuccess(completedTask -> log.info( - "Successfully completed investment saga: taskId={}, taskName={}, state={}", + "Successfully completed investment asset universe saga: taskId={}, taskName={}, state={}", completedTask.getId(), completedTask.getName(), completedTask.getState())) .doOnError(throwable -> { - log.error("Failed to execute investment saga: taskId={}, taskName={}", + log.error("Failed to execute investment asset universe saga: taskId={}, taskName={}", streamTask.getId(), streamTask.getName(), throwable); streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, streamTask.getName(), streamTask.getId(), - "Investment saga failed: " + throwable.getMessage()); + "Investment asset universe saga failed: " + throwable.getMessage()); streamTask.setState(State.FAILED); }) .onErrorResume(throwable -> Mono.just(streamTask)); @@ -96,6 +99,11 @@ private Mono upsertPrices(InvestmentAssetsTask investmentT .map(investmentTask::setPriceTasks); } + private Mono upsertIntradayPrices(InvestmentAssetsTask investmentTask) { + return investmentIntradayAssetPriceService.ingestIntradayPrices() + .map(investmentTask::setIntradayPriceTasks); + } + /** * Rollback is not implemented for investment saga. * @@ -112,7 +120,7 @@ public Mono rollBack(InvestmentAssetsTask streamTask) { return Mono.empty(); } - public Mono createMarkets(InvestmentAssetsTask investmentTask) { + public Mono upsertMarkets(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int marketCount = investmentData.getMarkets() != null ? investmentData.getMarkets().size() : 0; log.info("Starting investment market creation: taskId={}, marketCount={}", @@ -161,7 +169,7 @@ public Mono createMarkets(InvestmentAssetsTask investmentT } /** - * Creates or upserts market special days for the investment task. + * Upserts market special days for the investment task. * *

This method processes each market special day in the task data by invoking * the asset universe service. It updates the task state and logs progress for observability. @@ -169,7 +177,7 @@ public Mono createMarkets(InvestmentAssetsTask investmentT * @param investmentTask the investment task containing market special day data * @return Mono emitting the updated investment task with market special days set */ - public Mono createMarketSpecialDays(InvestmentAssetsTask investmentTask) { + public Mono upsertMarketSpecialDays(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int marketSpecialDayCount = investmentData.getMarketSpecialDays() != null ? investmentData.getMarketSpecialDays().size() : 0; @@ -219,7 +227,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i } /** - * Creates or upserts asset categories for the investment task. + * Upserts asset categories for the investment task. * *

This method processes each asset category in the task data by invoking * the asset universe service. It updates the task state and logs progress for observability. @@ -227,7 +235,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i * @param investmentTask the investment task containing asset category data * @return Mono emitting the updated investment task with asset categories set */ - public Mono createAssetCategories(InvestmentAssetsTask investmentTask) { + public Mono upsertAssetCategories(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int categoryCount = investmentData.getAssetCategories() != null ? investmentData.getAssetCategories().size() : 0; @@ -274,7 +282,7 @@ public Mono createAssetCategories(InvestmentAssetsTask inv }); } - public Mono createAssetCategoryTypes(InvestmentAssetsTask investmentTask) { + public Mono upsertAssetCategoryTypes(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int typeCount = investmentData.getAssetCategoryTypes() != null ? investmentData.getAssetCategoryTypes().size() : 0; diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java index e9f7d1215..5da838f77 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java @@ -59,7 +59,17 @@ public Mono getOrCreateMarket(MarketRequest marketRequest) { .flatMap(existingMarket -> { log.info("Market already exists: {}", existingMarket.getCode()); log.debug("Market already exists: {}", existingMarket); - return Mono.just(existingMarket); + return assetUniverseApi.updateMarket(existingMarket.getCode(), marketRequest) + .doOnSuccess(updatedMarket -> log.info("Updated market: {}", updatedMarket)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating market: {} : HTTP {} -> {}", marketRequest.getCode(), + w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating market: {} : {}", marketRequest.getCode(), + error.getMessage(), error); + } + }); }) // If Mono is empty (market not found), create the market .switchIfEmpty(assetUniverseApi.createMarket(marketRequest) @@ -141,7 +151,20 @@ public Mono getOrCreateMarketSpecialDay(MarketSpecialDayReques .findFirst(); if (matchingSpecialDay.isPresent()) { log.info("Market special day already exists for day: {}", marketSpecialDayRequest); - return Mono.just(matchingSpecialDay.get()); + return assetUniverseApi.updateMarketSpecialDay(matchingSpecialDay.get().getUuid().toString(), + marketSpecialDayRequest) + .doOnSuccess(updatedMarketSpecialDay -> + log.info("Updated market special day: {}", updatedMarketSpecialDay)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating market special day : {} : HTTP {} -> {}", + marketSpecialDayRequest, + w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating market special day {} : {}", marketSpecialDayRequest, + error.getMessage(), error); + } + }); } else { log.debug("No market special day exists for day: {}", marketSpecialDayRequest); return Mono.empty(); @@ -212,7 +235,18 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor .findFirst(); if (matchingCategory.isPresent()) { log.info("Asset category already exists for code: {}", assetCategoryRequest.getCode()); - return Mono.just(matchingCategory.get()); + return assetUniverseApi.updateAssetCategory(matchingCategory.get().getUuid().toString(), assetCategoryRequest) + .doOnSuccess(updatedCategory -> log.info("Updated asset category: {}", updatedCategory)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating asset category: {} : HTTP {} -> {}", assetCategoryRequest.getCode(), + w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating asset category: {} : {}", assetCategoryRequest.getCode(), + error.getMessage(), error); + } + }) + .onErrorResume(e -> Mono.empty()); } else { log.debug("No asset category exists for code: {}", assetCategoryRequest.getCode()); return Mono.empty(); @@ -232,6 +266,7 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor error.getMessage(), error); } }) + .onErrorResume(e -> Mono.empty()) ); } @@ -259,7 +294,18 @@ public Mono createAssetCategoryType(AssetCategoryTypeRequest .findFirst(); if (matchingType.isPresent()) { log.info("Asset category type already exists for code: {}", assetCategoryTypeRequest.getCode()); - return Mono.just(matchingType.get()); + return assetUniverseApi.updateAssetCategoryType(matchingType.get().getUuid().toString(), assetCategoryTypeRequest) + .doOnSuccess(updatedType -> log.info("Updated asset category type: {}", updatedType)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating asset category type: {} : HTTP {} -> {}", + assetCategoryTypeRequest.getCode(), w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating asset category type: {} : {}", + assetCategoryTypeRequest.getCode(), error.getMessage(), error); + } + }) + .onErrorResume(e -> Mono.empty()); } else { log.debug("No asset category type exists for code: {}", assetCategoryTypeRequest.getCode()); return Mono.empty(); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java new file mode 100644 index 000000000..4f153409d --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java @@ -0,0 +1,253 @@ +package com.backbase.stream.investment.service; + +import com.backbase.investment.api.service.v1.AssetUniverseApi; +import com.backbase.investment.api.service.v1.model.GroupResult; +import com.backbase.investment.api.service.v1.model.OASCreatePriceRequest; +import com.backbase.investment.api.service.v1.model.TypeEnum; +import com.backbase.stream.investment.AssetLatestPrice; +import com.backbase.stream.investment.PaginatedExpandedAssetList; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Service responsible for generating and ingesting intraday asset prices. + * + *

Responsibilities: + * - Read assets with latest prices via {@link AssetUniverseApi}. + * - Generate a series of intraday OHLC price points per asset. + * - Submit intraday prices back to the Asset API using bulk create. + * + *

Notes: + * - The generator uses a randomised model constrained to realistic percentage ranges. + * - The service is reactive and non-blocking: ingestion returns a {@link Mono} that completes + * after all async submissions are triggered. + * + */ +@Slf4j +@RequiredArgsConstructor +public class InvestmentIntradayAssetPriceService { + + private final AssetUniverseApi assetUniverseApi; + + /** + * Generates and triggers ingestion of intraday prices for all assets that have a latest price. + * + * @return a {@link Mono} that emits the combined list of results for batch operations when available. + */ + public Mono> ingestIntradayPrices() { + return generateIntradayPrices() + .map(asyncTasks -> asyncTasks.stream().flatMap(Collection::stream).toList()); + } + + /** + * Internal reactive pipeline that fetches assets and creates intraday price creation tasks. + * + * @return a {@link Mono} emitting a list of lists of created {@link GroupResult}s (one list per asset request). + */ + @Nonnull + private Mono>> generateIntradayPrices() { + log.info("Generating Intraday Prices for Assets"); + return assetUniverseApi.listAssetsWithResponseSpec( + null, null, null, null, + Collections.singletonList("latest_price"), + null, null, + "uuid,latest_price", + null, null, null, null, null, + null, null, null, null + ) + .bodyToMono(PaginatedExpandedAssetList.class) + .flatMap(paginatedAssetList -> { + + if (paginatedAssetList.getCount() == 0) { + log.warn("No assets found with latest prices to generate intraday prices"); + return Mono.just(List.>of()); + } + + return Flux.fromIterable(paginatedAssetList.getResults()) + .flatMap(assetLatestPrice -> { + List requests = + generateIntradayPricesForAsset(assetLatestPrice); + + log.debug("Generated intraday price requests: {}", requests); + + if (requests.isEmpty()) { + return Mono.empty(); + } + + return assetUniverseApi.bulkCreateIntradayAssetPrice(requests, null, null, null) + .collectList() + .doOnSuccess(created -> + log.info( + "Successfully triggered creation of {} intraday prices for asset ({})", + requests.size(), + assetLatestPrice.uuid() + ) + ) + .doOnError(WebClientResponseException.class, ex -> + log.error( + "Failed to create intraday prices for asset ({}): status={}, body={}", + assetLatestPrice.uuid(), + ex.getStatusCode(), + ex.getResponseBodyAsString(), + ex + ) + ) + .onErrorResume(e -> Mono.empty()); + }) + .collectList(); + }) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error( + "Error generating intraday prices for assets: HTTP {} -> {}", + w.getStatusCode(), + w.getResponseBodyAsString() + ); + } else { + log.error( + "Error generating intraday prices for assets: {}", + error.getMessage(), + error + ); + } + }); + + } + + /** + * Generate a list of intraday price create requests for a single asset. + * + *

Each request represents a 10-minute / 15-candle sequence starting around 10:30 UTC. + * + * @param assetLatestPrice the asset data with latest price information + * @return a list of {@link OASCreatePriceRequest} ready to be submitted + */ + private List generateIntradayPricesForAsset(AssetLatestPrice assetLatestPrice) { + List requests = new ArrayList<>(); + + // Base previous close + Double previousClose = assetLatestPrice.latestPrice().previousClosePrice(); + + // Today, starting at 10:30 + LocalDate today = LocalDate.now(ZoneOffset.UTC); + LocalTime time = LocalTime.of(10, 30).plusMinutes(ThreadLocalRandom.current().nextInt(0, 10)); + + for (int i = 0; i < 15; i++) { + + // Generate intraday OHLC + Map ohlc = generateIntradayOhlc(previousClose); + + OASCreatePriceRequest oasCreatePriceRequest = new OASCreatePriceRequest(); + + try { + oasCreatePriceRequest.amount(ohlc.get("close")); + oasCreatePriceRequest.asset(Map.of("uuid", assetLatestPrice.uuid().toString())); + oasCreatePriceRequest.datetime( + OffsetDateTime.of(today, time, ZoneOffset.UTC) + ); + oasCreatePriceRequest.open(ohlc.get("open")); + oasCreatePriceRequest.high(ohlc.get("high")); + oasCreatePriceRequest.low(ohlc.get("low")); + oasCreatePriceRequest.previousClose(previousClose); + oasCreatePriceRequest.type(TypeEnum.INTRADAY); + } catch (NoSuchMethodError ignored) { + log.debug("AssetLatestPrice: {}", assetLatestPrice); + log.warn("Failed to map intraday price for asset: {}", assetLatestPrice.uuid(), ignored); + } + + requests.add(oasCreatePriceRequest); + + // Next candle starts from this close + previousClose = ohlc.get("close"); + + // Move time forward by 15 minutes + time = time.plusMinutes(10); + } + return requests; + } + + /** + * Deterministic randomised OHLC generator based on a previous close. + * + *

Algorithm constraints: + * - Total intraday range: 2–5% (implemented as 4–9 per mille in code). + * - Opening gap: ±0.1–0.3%. + * - Candle body: 0.2–1.2%. + * - Wicks are larger than body and distributed to respect direction. + * + * @param previousClose the prior close price (must be positive) + * @return a map with keys \"open\", \"high\", \"low\", \"close\" rounded to 6 decimal places + * @throws IllegalArgumentException if previousClose is null or not positive + */ + public static Map generateIntradayOhlc(Double previousClose) { + if (previousClose == null || previousClose <= 0) { + throw new IllegalArgumentException("Previous close must be positive"); + } + + ThreadLocalRandom r = ThreadLocalRandom.current(); + + // Total intraday range: 2–5% + double totalRangePct = r.nextDouble(4.0, 9.01) / 100.0; + + // Small opening gap: ±0.1–0.3% + double openGapPct = r.nextDouble(-0.3, 0.31) / 100.0; + double open = previousClose * (1 + openGapPct); + + // Direction + boolean bullish = r.nextBoolean(); + + // Candle body: 0.2–1.2% + double bodyPct = r.nextDouble(0.2, 1.21) / 100.0; + double close = bullish + ? open * (1 + bodyPct) + : open * (1 - bodyPct); + + // Wicks larger than body + double wickBudget = Math.max(totalRangePct - bodyPct, totalRangePct * 0.6); + + double upperWickPct; + double lowerWickPct; + + if (bullish) { + upperWickPct = wickBudget * r.nextDouble(0.6, 0.9); + lowerWickPct = wickBudget - upperWickPct; + } else { + lowerWickPct = wickBudget * r.nextDouble(0.6, 0.9); + upperWickPct = wickBudget - lowerWickPct; + } + + double high = Math.max(open, close) * (1 + upperWickPct); + double low = Math.min(open, close) * (1 - lowerWickPct); + + return Map.of( + "open", round6(open), + "high", round6(high), + "low", round6(low), + "close", round6(close) + ); + } + + private static double round6(double value) { + return BigDecimal + .valueOf(value) + .setScale(6, RoundingMode.HALF_UP) + .doubleValue(); + } + +} diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java index f6e9d3082..bbe382b2f 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java @@ -20,6 +20,7 @@ import com.backbase.stream.investment.RandomParam; import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -52,6 +53,9 @@ class InvestmentAssetUniversSagaTest { @Mock private InvestmentAssetPriceService investmentAssetPriceService; + @Mock + private InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService; + @Mock private InvestmentIngestionConfigurationProperties configurationProperties; @@ -63,6 +67,7 @@ void setUp() { saga = new InvestmentAssetUniversSaga( assetUniverseService, investmentAssetPriceService, + investmentIntradayAssetPriceService, configurationProperties ); // Enable asset universe by default diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java new file mode 100644 index 000000000..77c44a9a2 --- /dev/null +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java @@ -0,0 +1,56 @@ +package com.backbase.stream.investment.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Map; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +/** + * Unit tests covering deterministic parts of the intraday price generator. + * + *

Focus is on {@link InvestmentIntradayAssetPriceService#generateIntradayOhlc(Double)} which is + * deterministic given a provided Random seed; tests assert structural invariants and rounding. + */ +class InvestmentIntradayAssetPriceServiceTest { + + @Test + void generateIntradayOhlc_shouldValidateInput() { + assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(null)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(0.0)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(-1.0)) + .isInstanceOf(IllegalArgumentException.class); + } + + @RepeatedTest(10) + void generateIntradayOhlc_shouldProduceValidOhlcStructure() { + double previous = 100.0; + Map ohlc = InvestmentIntradayAssetPriceService.generateIntradayOhlc(previous); + + assertThat(ohlc).containsKeys("open", "high", "low", "close"); + + double open = ohlc.get("open"); + double high = ohlc.get("high"); + double low = ohlc.get("low"); + double close = ohlc.get("close"); + + // Basic sanity + assertThat(open).isGreaterThan(0.0); + assertThat(high).isGreaterThan(0.0); + assertThat(low).isGreaterThan(0.0); + assertThat(close).isGreaterThan(0.0); + + // High must be >= max(open, close), low must be <= min(open, close) + assertThat(high).isGreaterThanOrEqualTo(Math.max(open, close)); + assertThat(low).isLessThanOrEqualTo(Math.min(open, close)); + + // Values should be rounded to 6 decimal places: value * 1e6 should be near-integer + assertThat(Math.abs(Math.round(open * 1_000_000.0) - open * 1_000_000.0)).isLessThan(1e-6); + assertThat(Math.abs(Math.round(high * 1_000_000.0) - high * 1_000_000.0)).isLessThan(1e-6); + assertThat(Math.abs(Math.round(low * 1_000_000.0) - low * 1_000_000.0)).isLessThan(1e-6); + assertThat(Math.abs(Math.round(close * 1_000_000.0) - close * 1_000_000.0)).isLessThan(1e-6); + } +} \ No newline at end of file From 8a034c8e36ff8bab1e38314c965660f6460dfab6 Mon Sep 17 00:00:00 2001 From: pawana_backbase Date: Fri, 23 Jan 2026 13:49:51 +0530 Subject: [PATCH 2/3] Addressing review comments --- .../stream/investment/AssetLatestPrice.java | 7 -- .../stream/investment/LatestPrice.java | 15 --- .../model/AssetWithMarketAndLatestPrice.java | 40 ++++++++ .../investment/model/ExpandedLatestPrice.java | 30 ++++++ .../investment/model/ExpandedMarket.java | 29 ++++++ .../PaginatedExpandedAssetList.java | 4 +- .../saga/InvestmentAssetUniversSaga.java | 12 +-- .../InvestmentAssetUniverseService.java | 8 +- .../InvestmentIntradayAssetPriceService.java | 94 +++++++++---------- .../saga/InvestmentAssetUniversSagaTest.java | 4 +- .../InvestmentAssetUniverseServiceTest.java | 31 +++--- ...vestmentIntradayAssetPriceServiceTest.java | 14 ++- 12 files changed, 182 insertions(+), 106 deletions(-) delete mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java delete mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/AssetWithMarketAndLatestPrice.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedLatestPrice.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedMarket.java rename stream-investment/investment-core/src/main/java/com/backbase/stream/investment/{ => model}/PaginatedExpandedAssetList.java (82%) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java deleted file mode 100644 index 29d507e05..000000000 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/AssetLatestPrice.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.backbase.stream.investment; - -import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.UUID; - -public record AssetLatestPrice(@JsonProperty("uuid") UUID uuid, - @JsonProperty("latest_price") LatestPrice latestPrice) {} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java deleted file mode 100644 index 93ee660fc..000000000 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/LatestPrice.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.backbase.stream.investment; - -import com.fasterxml.jackson.annotation.JsonProperty; -import java.time.OffsetDateTime; - -public record LatestPrice( - @JsonProperty("amount") Double amount, - @JsonProperty("datetime") OffsetDateTime datetime, - @JsonProperty("open_price") Double openPrice, - @JsonProperty("high_price") Double highPrice, - @JsonProperty("low_price") Double lowPrice, - @JsonProperty("previous_close_price") Double previousClosePrice -) { - -} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/AssetWithMarketAndLatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/AssetWithMarketAndLatestPrice.java new file mode 100644 index 000000000..12fa63165 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/AssetWithMarketAndLatestPrice.java @@ -0,0 +1,40 @@ +package com.backbase.stream.investment.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.UUID; + +/** + * Representation of an asset returned by the List Assets API when the response includes expanded + * market and latest price details and custom fields. + * Example JSON: + *

+ * {
+ *   "uuid": "123e4567-e89b-12d3-a456-426614174000",
+ *   "market": {
+ *     "code": "NYSE",
+ *     "name": "New York Stock Exchange",
+ *     "is_open": true,
+ *     "today_session_starts": "2024-01-01T09:30:00+00:00Z",
+ *     "today_session_ends": "2024-01-01T16:00:00+00:00Z",
+ *     "market_reopens": null
+ *   },
+ *   "latest_price": {
+ *     "amount": 123.45,
+ *     "datetime": "2024-01-01T15:30:00+00:00Z",
+ *     "open_price": 120.00,
+ *     "high_price": 125.00,
+ *     "low_price": 119.50,
+ *     "previous_close_price": 121.00
+ *   }
+ * }
+ * 
+ * + * @param uuid Asset identifier (mapped from JSON property `uuid`) + * @param expandedMarket Expanded market details (mapped from JSON property `market`) + * @param expandedLatestPrice Expanded latest price details (mapped from JSON property `latest_price`) + */ +public record AssetWithMarketAndLatestPrice(@JsonProperty("uuid") UUID uuid, + @JsonProperty("market") ExpandedMarket expandedMarket, + @JsonProperty("latest_price") ExpandedLatestPrice expandedLatestPrice) { + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedLatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedLatestPrice.java new file mode 100644 index 000000000..353df06e0 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedLatestPrice.java @@ -0,0 +1,30 @@ +package com.backbase.stream.investment.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.OffsetDateTime; + +/** + * Representation of an expanded latest price returned by the List Assets API when the response is + * requested with expansion of latest price details and custom fields. + * Example JSON: + *
+ * "latest_price": {
+ *   "amount": 123.45,
+ *   "datetime": "2026-01-21T12:58:00.000000Z",
+ *   "open_price": 120.00,
+ *   "high_price": 125.00,
+ *   "low_price": 119.50,
+ *   "previous_close_price": 121.00
+ * }
+ * 
+ */ +public record ExpandedLatestPrice( + @JsonProperty("amount") Double amount, + @JsonProperty("datetime") OffsetDateTime datetime, + @JsonProperty("open_price") Double openPrice, + @JsonProperty("high_price") Double highPrice, + @JsonProperty("low_price") Double lowPrice, + @JsonProperty("previous_close_price") Double previousClosePrice +) { + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedMarket.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedMarket.java new file mode 100644 index 000000000..065629f21 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedMarket.java @@ -0,0 +1,29 @@ +package com.backbase.stream.investment.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.OffsetDateTime; + +/** + * Representation of an expanded market returned by the List Assets API when the response is + * requested with expansion of market details and custom fields. + * Example JSON: + *
+ *"market": {
+ *            "code": "XETR",
+ *            "name": "XETRA",
+ *            "is_open": true,
+ *            "today_session_starts": "2026-01-22T09:00:00.000000Z",
+ *            "today_session_ends": "2026-01-22T17:30:00.000000Z",
+ *            "market_reopens": "2026-01-23T09:00:00.000000Z"
+ *          }
+ * 
+ * + */ +public record ExpandedMarket(@JsonProperty("code") String code, + @JsonProperty("name") String name, + @JsonProperty("is_open") Boolean isOpen, + @JsonProperty("today_session_starts") OffsetDateTime todaySessionStarts, + @JsonProperty("today_session_ends") OffsetDateTime todaySessionEnds, + @JsonProperty("market_reopens") OffsetDateTime marketReopens) { + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/PaginatedExpandedAssetList.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/PaginatedExpandedAssetList.java similarity index 82% rename from stream-investment/investment-core/src/main/java/com/backbase/stream/investment/PaginatedExpandedAssetList.java rename to stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/PaginatedExpandedAssetList.java index 207444820..29d6f3fe1 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/PaginatedExpandedAssetList.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/PaginatedExpandedAssetList.java @@ -1,4 +1,4 @@ -package com.backbase.stream.investment; +package com.backbase.stream.investment.model; import java.net.URI; import java.util.ArrayList; @@ -22,6 +22,6 @@ public class PaginatedExpandedAssetList { private URI previous; public static final String JSON_PROPERTY_RESULTS = "results"; - private List results = new ArrayList<>(); + private List results = new ArrayList<>(); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java index 9fea4f4d5..f990d1624 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java @@ -78,7 +78,7 @@ public Mono executeTask(InvestmentAssetsTask streamTask) { .flatMap(this::upsertAssetCategories) .flatMap(this::createAssets) .flatMap(this::upsertPrices) - .flatMap(this::upsertIntradayPrices) + .flatMap(this::createIntradayPrices) .doOnSuccess(completedTask -> log.info( "Successfully completed investment asset universe saga: taskId={}, taskName={}, state={}", completedTask.getId(), completedTask.getName(), completedTask.getState())) @@ -99,7 +99,7 @@ private Mono upsertPrices(InvestmentAssetsTask investmentT .map(investmentTask::setPriceTasks); } - private Mono upsertIntradayPrices(InvestmentAssetsTask investmentTask) { + private Mono createIntradayPrices(InvestmentAssetsTask investmentTask) { return investmentIntradayAssetPriceService.ingestIntradayPrices() .map(investmentTask::setIntradayPriceTasks); } @@ -138,7 +138,7 @@ public Mono upsertMarkets(InvestmentAssetsTask investmentT // Process each market: create or get from asset universe service return Flux.fromIterable(investmentData.getMarkets()) - .flatMap(market -> assetUniverseService.getOrCreateMarket( + .flatMap(market -> assetUniverseService.upsertMarket( new MarketRequest() .code(market.getCode()) .name(market.getName()) @@ -196,7 +196,7 @@ public Mono upsertMarketSpecialDays(InvestmentAssetsTask i // Process each market special day: create or get from asset universe service return Flux.fromIterable(investmentData.getMarketSpecialDays()) - .flatMap(marketSpecialDay -> assetUniverseService.getOrCreateMarketSpecialDay( + .flatMap(marketSpecialDay -> assetUniverseService.upsertMarketSpecialDay( new MarketSpecialDayRequest() .date(marketSpecialDay.getDate()) .market(marketSpecialDay.getMarket()) @@ -260,7 +260,7 @@ public Mono upsertAssetCategories(InvestmentAssetsTask inv .order(assetCategory.getOrder()) .type(assetCategory.getType()) .description(assetCategory.getDescription()); - return assetUniverseService.createAssetCategory(request); + return assetUniverseService.upsertAssetCategory(request); }) .collectList() .map(assetCategories -> { @@ -304,7 +304,7 @@ public Mono upsertAssetCategoryTypes(InvestmentAssetsTask AssetCategoryTypeRequest request = new AssetCategoryTypeRequest() .name(assetCategoryType.getName()) .code(assetCategoryType.getCode()); - return assetUniverseService.createAssetCategoryType(request); + return assetUniverseService.upsertAssetCategoryType(request); }) .collectList() .map(assetCategoryTypes -> { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java index 5da838f77..7fa2fd1cf 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java @@ -43,7 +43,7 @@ public class InvestmentAssetUniverseService { * @param marketRequest the market request details * @return Mono representing the existing or newly created market */ - public Mono getOrCreateMarket(MarketRequest marketRequest) { + public Mono upsertMarket(MarketRequest marketRequest) { log.debug("Creating market: {}", marketRequest); return assetUniverseApi.getMarket(marketRequest.getCode()) // If getMarket returns 404 NOT_FOUND, treat as "not found" and return Mono.empty() @@ -131,7 +131,7 @@ public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest) { * @param marketSpecialDayRequest the request containing market and date details * @return Mono\ representing the existing or newly created market special day */ - public Mono getOrCreateMarketSpecialDay(MarketSpecialDayRequest marketSpecialDayRequest) { + public Mono upsertMarketSpecialDay(MarketSpecialDayRequest marketSpecialDayRequest) { log.debug("Creating market special day: {}", marketSpecialDayRequest); LocalDate date = marketSpecialDayRequest.getDate(); @@ -218,7 +218,7 @@ public Flux createAssets(List representing the existing or newly created asset category */ - public Mono createAssetCategory(AssetCategoryRequest assetCategoryRequest) { + public Mono upsertAssetCategory(AssetCategoryRequest assetCategoryRequest) { if (assetCategoryRequest == null) { return Mono.empty(); } @@ -277,7 +277,7 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor * @param assetCategoryTypeRequest the request containing asset category type details * @return Mono representing the existing or newly created asset category type */ - public Mono createAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest) { + public Mono upsertAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest) { if (assetCategoryTypeRequest == null) { return Mono.empty(); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java index 4f153409d..51be1511d 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java @@ -4,8 +4,8 @@ import com.backbase.investment.api.service.v1.model.GroupResult; import com.backbase.investment.api.service.v1.model.OASCreatePriceRequest; import com.backbase.investment.api.service.v1.model.TypeEnum; -import com.backbase.stream.investment.AssetLatestPrice; -import com.backbase.stream.investment.PaginatedExpandedAssetList; +import com.backbase.stream.investment.model.AssetWithMarketAndLatestPrice; +import com.backbase.stream.investment.model.PaginatedExpandedAssetList; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDate; @@ -65,12 +65,12 @@ private Mono>> generateIntradayPrices() { log.info("Generating Intraday Prices for Assets"); return assetUniverseApi.listAssetsWithResponseSpec( null, null, null, null, - Collections.singletonList("latest_price"), + List.of("market","latest_price"), null, null, - "uuid,latest_price", + "uuid,market,latest_price", null, null, null, null, null, null, null, null, null - ) + ) // Above API returns custom projection with market and latest price expanded only, hence needs a custom return type. .bodyToMono(PaginatedExpandedAssetList.class) .flatMap(paginatedAssetList -> { @@ -80,9 +80,9 @@ private Mono>> generateIntradayPrices() { } return Flux.fromIterable(paginatedAssetList.getResults()) - .flatMap(assetLatestPrice -> { + .flatMap(assetWithMarketAndLatestPrice -> { List requests = - generateIntradayPricesForAsset(assetLatestPrice); + generateIntradayPricesForAsset(assetWithMarketAndLatestPrice); log.debug("Generated intraday price requests: {}", requests); @@ -96,13 +96,13 @@ private Mono>> generateIntradayPrices() { log.info( "Successfully triggered creation of {} intraday prices for asset ({})", requests.size(), - assetLatestPrice.uuid() + assetWithMarketAndLatestPrice.uuid() ) ) .doOnError(WebClientResponseException.class, ex -> log.error( "Failed to create intraday prices for asset ({}): status={}, body={}", - assetLatestPrice.uuid(), + assetWithMarketAndLatestPrice.uuid(), ex.getStatusCode(), ex.getResponseBodyAsString(), ex @@ -133,55 +133,58 @@ private Mono>> generateIntradayPrices() { /** * Generate a list of intraday price create requests for a single asset. * - *

Each request represents a 10-minute / 15-candle sequence starting around 10:30 UTC. + *

Each request represents a 15-minute / 15-candle sequence. * - * @param assetLatestPrice the asset data with latest price information + * @param assetWithMarketAndLatestPrice the asset data with latest price information * @return a list of {@link OASCreatePriceRequest} ready to be submitted */ - private List generateIntradayPricesForAsset(AssetLatestPrice assetLatestPrice) { + private List generateIntradayPricesForAsset( + AssetWithMarketAndLatestPrice assetWithMarketAndLatestPrice) { List requests = new ArrayList<>(); // Base previous close - Double previousClose = assetLatestPrice.latestPrice().previousClosePrice(); + Double previousClose = assetWithMarketAndLatestPrice.expandedLatestPrice().previousClosePrice(); - // Today, starting at 10:30 - LocalDate today = LocalDate.now(ZoneOffset.UTC); - LocalTime time = LocalTime.of(10, 30).plusMinutes(ThreadLocalRandom.current().nextInt(0, 10)); + // Today + OffsetDateTime todaySessionStarts = assetWithMarketAndLatestPrice.expandedMarket().todaySessionStarts(); + LocalDate today = todaySessionStarts.toLocalDate(); + LocalTime time = intradayStartTime(todaySessionStarts); for (int i = 0; i < 15; i++) { - // Generate intraday OHLC - Map ohlc = generateIntradayOhlc(previousClose); - - OASCreatePriceRequest oasCreatePriceRequest = new OASCreatePriceRequest(); - - try { - oasCreatePriceRequest.amount(ohlc.get("close")); - oasCreatePriceRequest.asset(Map.of("uuid", assetLatestPrice.uuid().toString())); - oasCreatePriceRequest.datetime( - OffsetDateTime.of(today, time, ZoneOffset.UTC) - ); - oasCreatePriceRequest.open(ohlc.get("open")); - oasCreatePriceRequest.high(ohlc.get("high")); - oasCreatePriceRequest.low(ohlc.get("low")); - oasCreatePriceRequest.previousClose(previousClose); - oasCreatePriceRequest.type(TypeEnum.INTRADAY); - } catch (NoSuchMethodError ignored) { - log.debug("AssetLatestPrice: {}", assetLatestPrice); - log.warn("Failed to map intraday price for asset: {}", assetLatestPrice.uuid(), ignored); - } - - requests.add(oasCreatePriceRequest); + Ohlc ohlc = generateIntradayOhlc(previousClose); + + // Create request + requests.add(createIntradayRequest(assetWithMarketAndLatestPrice, ohlc, previousClose, + OffsetDateTime.of(today, time, ZoneOffset.UTC))); // Next candle starts from this close - previousClose = ohlc.get("close"); + previousClose = ohlc.close(); // Move time forward by 15 minutes - time = time.plusMinutes(10); + time = time.plusMinutes(15); } return requests; } + private LocalTime intradayStartTime(OffsetDateTime offsetDateTime) { + return offsetDateTime.toLocalTime().plusMinutes(ThreadLocalRandom.current().nextInt(1, 15)); + } + + private OASCreatePriceRequest createIntradayRequest(AssetWithMarketAndLatestPrice asset, Ohlc ohlc, Double previousClose, + OffsetDateTime dateTime) { + + return new OASCreatePriceRequest() + .amount(ohlc.close()) + .asset(Map.of("uuid", asset.uuid().toString())) + .datetime(dateTime) + .open(ohlc.open()) + .high(ohlc.high()) + .low(ohlc.low()) + .previousClose(previousClose) + .type(TypeEnum.INTRADAY); + } + /** * Deterministic randomised OHLC generator based on a previous close. * @@ -195,7 +198,7 @@ private List generateIntradayPricesForAsset(AssetLatestPr * @return a map with keys \"open\", \"high\", \"low\", \"close\" rounded to 6 decimal places * @throws IllegalArgumentException if previousClose is null or not positive */ - public static Map generateIntradayOhlc(Double previousClose) { + public static Ohlc generateIntradayOhlc(Double previousClose) { if (previousClose == null || previousClose <= 0) { throw new IllegalArgumentException("Previous close must be positive"); } @@ -235,12 +238,7 @@ public static Map generateIntradayOhlc(Double previousClose) { double high = Math.max(open, close) * (1 + upperWickPct); double low = Math.min(open, close) * (1 - lowerWickPct); - return Map.of( - "open", round6(open), - "high", round6(high), - "low", round6(low), - "close", round6(close) - ); + return new Ohlc(round6(open), round6(high), round6(low), round6(close)); } private static double round6(double value) { @@ -250,4 +248,6 @@ private static double round6(double value) { .doubleValue(); } + public record Ohlc(double open, double high, double low, double close) {} + } diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java index bbe382b2f..9c9e4bf42 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java @@ -241,8 +241,8 @@ void upsertPrices_error_duringIngestion() { InvestmentAssetsTask task = createTestTask(); // Mock: Markets and market special days creation succeed - when(assetUniverseService.getOrCreateMarket(any())).thenReturn(Mono.empty()); - when(assetUniverseService.getOrCreateMarketSpecialDay(any())).thenReturn(Mono.empty()); + when(assetUniverseService.upsertMarket(any())).thenReturn(Mono.empty()); + when(assetUniverseService.upsertMarketSpecialDay(any())).thenReturn(Mono.empty()); when(assetUniverseService.createAssets(anyList())).thenReturn(Flux.fromIterable(task.getData().getAssets())); // Mock: Price ingestion fails with an exception diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java index 56f7f9c3d..05a28d87f 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java @@ -19,7 +19,6 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.io.IOException; import java.nio.charset.StandardCharsets; /** @@ -41,19 +40,21 @@ void setUp() { } @Test - void getOrCreateMarket_marketExists() { + void upsertMarket_marketExists() { MarketRequest request = new MarketRequest().code("US"); - Market market = new Market().code("US"); + Market market = new Market().code("US").name("Usa Market"); + Market marketUpdated = new Market().code("US").name("Usa Market Updated"); Mockito.when(assetUniverseApi.getMarket("US")).thenReturn(Mono.just(market)); - Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.empty()); + Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.just(market)); + Mockito.when(assetUniverseApi.updateMarket("US",request)).thenReturn(Mono.just(marketUpdated)); - StepVerifier.create(service.getOrCreateMarket(request)) - .expectNext(market) + StepVerifier.create(service.upsertMarket(request)) + .expectNext(marketUpdated) .verifyComplete(); } @Test - void getOrCreateMarket_marketNotFound_createsMarket() { + void upsertMarket_marketNotFound_createsMarket() { MarketRequest request = new MarketRequest().code("US"); Market createdMarket = new Market().code("US"); Mockito.when(assetUniverseApi.getMarket("US")) @@ -66,25 +67,25 @@ void getOrCreateMarket_marketNotFound_createsMarket() { ))); Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.just(createdMarket)); - StepVerifier.create(service.getOrCreateMarket(request)) + StepVerifier.create(service.upsertMarket(request)) .expectNext(createdMarket) .verifyComplete(); } @Test - void getOrCreateMarket_otherError_propagates() { + void upsertMarket_otherError_propagates() { MarketRequest request = new MarketRequest().code("US"); Mockito.when(assetUniverseApi.getMarket("US")) .thenReturn(Mono.error(new RuntimeException("API error"))); Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.empty()); - StepVerifier.create(service.getOrCreateMarket(request)) + StepVerifier.create(service.upsertMarket(request)) .expectErrorMatches(e -> e instanceof RuntimeException && e.getMessage().equals("API error")) .verify(); } @Test - void getOrCreateAsset_assetExists() throws IOException { + void getOrCreateAsset_assetExists() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); Asset asset = new Asset().isin("ABC123"); @@ -115,7 +116,7 @@ void getOrCreateAsset_assetExists() throws IOException { } @Test - void getOrCreateAsset_assetNotFound_createsAsset() throws IOException { + void getOrCreateAsset_assetNotFound_createsAsset() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); Asset createdAsset = new Asset().isin("ABC123"); @@ -152,7 +153,7 @@ void getOrCreateAsset_assetNotFound_createsAsset() throws IOException { } @Test - void getOrCreateAsset_otherError_propagates() throws IOException { + void getOrCreateAsset_otherError_propagates() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); String assetId = "ABC123_US_USD"; @@ -183,7 +184,7 @@ void getOrCreateAsset_otherError_propagates() throws IOException { } @Test - void getOrCreateAsset_createAssetFails_propagates() throws IOException { + void getOrCreateAsset_createAssetFails_propagates() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); String assetId = "ABC123_US_USD"; @@ -228,7 +229,7 @@ void getOrCreateAsset_nullRequest_returnsError() { } @Test - void getOrCreateAsset_emptyMonoFromCreateAsset() throws IOException { + void getOrCreateAsset_emptyMonoFromCreateAsset() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); String assetId = "ABC123_US_USD"; diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java index 77c44a9a2..1180ab768 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java @@ -3,7 +3,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.util.Map; +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService.Ohlc; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; @@ -28,14 +28,12 @@ void generateIntradayOhlc_shouldValidateInput() { @RepeatedTest(10) void generateIntradayOhlc_shouldProduceValidOhlcStructure() { double previous = 100.0; - Map ohlc = InvestmentIntradayAssetPriceService.generateIntradayOhlc(previous); + Ohlc ohlc = InvestmentIntradayAssetPriceService.generateIntradayOhlc(previous); - assertThat(ohlc).containsKeys("open", "high", "low", "close"); - - double open = ohlc.get("open"); - double high = ohlc.get("high"); - double low = ohlc.get("low"); - double close = ohlc.get("close"); + double open = ohlc.open(); + double high = ohlc.high(); + double low = ohlc.low(); + double close = ohlc.close(); // Basic sanity assertThat(open).isGreaterThan(0.0); From f9b0122227e7d29c2b2396ed23b1bf85dbb5b05e Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Fri, 23 Jan 2026 11:05:34 +0200 Subject: [PATCH 3/3] Update CHANGELOG.md --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 200235e5a..dff0bf569 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ # Changelog All notable changes to this project will be documented in this file. +## [9.5.0] +### Added + - investment service intraday generation and ingestion function +### Changed + - fix investment asset universe from create to upsert implementation + +## [9.4.x] +### Changed + ## [9.3.0](https://github.com/Backbase/stream-services/compare/9.2.0...9.3.0) ### Changed - fix for NoSuchElementException (No value present) thrown while update data groups