createMarkets(InvestmentAssetsTask investmentT
}
/**
- * Creates or upserts market special days for the investment task.
+ * Upserts market special days for the investment task.
*
* This method processes each market special day in the task data by invoking
* the asset universe service. It updates the task state and logs progress for observability.
@@ -169,7 +176,7 @@ public Mono createMarkets(InvestmentAssetsTask investmentT
* @param investmentTask the investment task containing market special day data
* @return Mono emitting the updated investment task with market special days set
*/
- public Mono createMarketSpecialDays(InvestmentAssetsTask investmentTask) {
+ public Mono upsertMarketSpecialDays(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int marketSpecialDayCount =
investmentData.getMarketSpecialDays() != null ? investmentData.getMarketSpecialDays().size() : 0;
@@ -188,7 +195,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i
// Process each market special day: create or get from asset universe service
return Flux.fromIterable(investmentData.getMarketSpecialDays())
- .flatMap(marketSpecialDay -> assetUniverseService.getOrCreateMarketSpecialDay(
+ .flatMap(marketSpecialDay -> assetUniverseService.upsertMarketSpecialDay(
new MarketSpecialDayRequest()
.date(marketSpecialDay.getDate())
.market(marketSpecialDay.getMarket())
@@ -219,7 +226,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i
}
/**
- * Creates or upserts asset categories for the investment task.
+ * Upserts asset categories for the investment task.
*
* This method processes each asset category in the task data by invoking
* the asset universe service. It updates the task state and logs progress for observability.
@@ -227,7 +234,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i
* @param investmentTask the investment task containing asset category data
* @return Mono emitting the updated investment task with asset categories set
*/
- public Mono createAssetCategories(InvestmentAssetsTask investmentTask) {
+ public Mono upsertAssetCategories(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int categoryCount =
investmentData.getAssetCategories() != null ? investmentData.getAssetCategories().size() : 0;
@@ -245,18 +252,10 @@ public Mono createAssetCategories(InvestmentAssetsTask inv
}
return Flux.fromIterable(investmentData.getAssetCategories())
- .flatMap(assetCategory -> {
- AssetCategoryRequest request = new AssetCategoryRequest()
- .name(assetCategory.getName())
- .code(assetCategory.getCode())
- .order(assetCategory.getOrder())
- .type(assetCategory.getType())
- .description(assetCategory.getDescription());
- return assetUniverseService.createAssetCategory(request);
- })
+ .flatMap(assetUniverseService::upsertAssetCategory)
.collectList()
.map(assetCategories -> {
- investmentTask.setAssetCategories(assetCategories);
+ investmentTask.setInsertedAssetCategories(assetCategories);
investmentTask.info(INVESTMENT, OP_CREATE, RESULT_CREATED, investmentTask.getName(),
investmentTask.getId(),
RESULT_CREATED + " " + assetCategories.size() + " Investment Asset Categories");
@@ -274,7 +273,7 @@ public Mono createAssetCategories(InvestmentAssetsTask inv
});
}
- public Mono createAssetCategoryTypes(InvestmentAssetsTask investmentTask) {
+ public Mono upsertAssetCategoryTypes(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int typeCount =
investmentData.getAssetCategoryTypes() != null ? investmentData.getAssetCategoryTypes().size() : 0;
@@ -296,7 +295,7 @@ public Mono createAssetCategoryTypes(InvestmentAssetsTask
AssetCategoryTypeRequest request = new AssetCategoryTypeRequest()
.name(assetCategoryType.getName())
.code(assetCategoryType.getCode());
- return assetUniverseService.createAssetCategoryType(request);
+ return assetUniverseService.upsertAssetCategoryType(request);
})
.collectList()
.map(assetCategoryTypes -> {
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java
new file mode 100644
index 000000000..4021c9a7d
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java
@@ -0,0 +1,101 @@
+package com.backbase.stream.investment.saga;
+
+import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties;
+import com.backbase.stream.investment.InvestmentContentTask;
+import com.backbase.stream.investment.service.InvestmentClientService;
+import com.backbase.stream.investment.service.InvestmentPortfolioService;
+import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService;
+import com.backbase.stream.worker.StreamTaskExecutor;
+import com.backbase.stream.worker.model.StreamTask;
+import com.backbase.stream.worker.model.StreamTask.State;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Mono;
+
+/**
+ * Saga orchestrating the complete investment client ingestion workflow.
+ *
+ * This saga implements a multi-step process for ingesting investment data:
+ *
+ * - Upsert investment clients - Creates or updates client records
+ * - Upsert investment products - Creates or updates portfolio products
+ * - Upsert investment portfolios - Creates or updates portfolios with client associations
+ *
+ *
+ * The saga uses idempotent operations to ensure safe re-execution and writes progress
+ * to the {@link StreamTask} history for observability. Each step builds upon the previous
+ * step's results, creating a complete investment setup.
+ *
+ *
Design notes:
+ *
+ * - All operations are idempotent (safe to retry)
+ * - Progress is tracked via StreamTask state and history
+ * - Failures are logged with complete context for debugging
+ * - All reactive operations include proper success and error handlers
+ *
+ *
+ * @see InvestmentClientService
+ * @see InvestmentPortfolioService
+ * @see StreamTaskExecutor
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class InvestmentContentSaga implements StreamTaskExecutor {
+
+ public static final String INVESTMENT = "investment-content";
+ public static final String OP_UPSERT = "upsert";
+ public static final String RESULT_FAILED = "failed";
+
+ private final InvestmentRestNewsContentService investmentRestNewsContentService;
+ private final InvestmentIngestionConfigurationProperties coreConfigurationProperties;
+
+ @Override
+ public Mono executeTask(InvestmentContentTask streamTask) {
+ if (!coreConfigurationProperties.isContentEnabled()) {
+ log.warn("Skip investment content saga execution: taskId={}, taskName={}",
+ streamTask.getId(), streamTask.getName());
+ return Mono.just(streamTask);
+ }
+ log.info("Starting investment content saga execution: taskId={}, taskName={}",
+ streamTask.getId(), streamTask.getName());
+ log.info("Starting investment saga execution: taskId={}, taskName={}",
+ streamTask.getId(), streamTask.getName());
+ return upsertNewsContent(streamTask)
+ .doOnSuccess(completedTask -> log.info(
+ "Successfully completed investment saga: taskId={}, taskName={}, state={}",
+ completedTask.getId(), completedTask.getName(), completedTask.getState()))
+ .doOnError(throwable -> {
+ log.error("Failed to execute investment saga: taskId={}, taskName={}",
+ streamTask.getId(), streamTask.getName(), throwable);
+ streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED,
+ streamTask.getName(), streamTask.getId(),
+ "Investment saga failed: " + throwable.getMessage());
+ streamTask.setState(State.FAILED);
+ })
+ .onErrorResume(throwable -> Mono.just(streamTask));
+ }
+
+ private Mono upsertNewsContent(InvestmentContentTask investmentContentTask) {
+ return investmentRestNewsContentService.upsertContent(investmentContentTask.getData().getMarketNews())
+ .thenReturn(investmentContentTask);
+ }
+
+
+ /**
+ * Rollback is not implemented for investment saga.
+ *
+ * Investment operations are idempotent and designed to be retried safely.
+ * Manual cleanup should be performed if necessary through the Investment Service API.
+ *
+ * @param streamTask the task to rollback
+ * @return null - rollback not implemented
+ */
+ @Override
+ public Mono rollBack(InvestmentContentTask streamTask) {
+ log.warn("Rollback requested for investment saga but not implemented: taskId={}, taskName={}",
+ streamTask.getId(), streamTask.getName());
+ return Mono.empty();
+ }
+
+}
+
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java
index 5734b8143..807e77c6e 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java
@@ -21,6 +21,7 @@ public interface AssetMapper {
OASAssetRequestDataRequest map(Asset asset, Map categoryIdByCode);
@Mapping(target = "categories", source = "categories", qualifiedByName = "mapCategories")
+ @Mapping(target = "logo", ignore = true)
Asset map(com.backbase.investment.api.service.v1.model.Asset asset);
@AfterMapping
@@ -29,7 +30,7 @@ default void postMap(@MappingTarget OASAssetRequestDataRequest requestDataReques
if (requestDataRequest == null) {
return;
}
- requestDataRequest.setCategories(Objects.requireNonNullElse(asset.categories(), new ArrayList())
+ requestDataRequest.setCategories(Objects.requireNonNullElse(asset.getCategories(), new ArrayList())
.stream().filter(Objects::nonNull).map(categoryIdByCode::get)
.filter(Objects::nonNull).toList());
}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java
index 327409408..32c3c7cba 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java
@@ -11,7 +11,6 @@
import reactor.core.publisher.Mono;
@Slf4j
-@Service
@RequiredArgsConstructor
public class AsyncTaskService {
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java
index 10a124b52..6972625d5 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java
@@ -54,9 +54,9 @@ private Mono>> generatePrices(List assets, Map priceByAsset,
.orElse(defaultRandomParam);
double price = Optional.ofNullable(lastPrice)
.or(() -> Optional.ofNullable(configAssetPrice).map(AssetPrice::price)
- .or(() -> Optional.of(a).map(Asset::defaultPrice)))
+ .or(() -> Optional.of(a).map(Asset::getDefaultPrice)))
.orElse(DEFAULT_START_PRICE);
return new RandomPriceParam(price, randomParam);
}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java
index e9f7d1215..eddaa88e4 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java
@@ -12,6 +12,8 @@
import com.backbase.investment.api.service.v1.model.MarketSpecialDayRequest;
import com.backbase.investment.api.service.v1.model.OASAssetRequestDataRequest;
import com.backbase.investment.api.service.v1.model.PaginatedAssetCategoryList;
+import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService;
+import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.List;
@@ -33,6 +35,7 @@
public class InvestmentAssetUniverseService {
private final AssetUniverseApi assetUniverseApi;
+ private final InvestmentRestAssetUniverseService investmentRestAssetUniverseService;
private final CustomIntegrationApiService customIntegrationApiService;
private final AssetMapper assetMapper = Mappers.getMapper(AssetMapper.class);
@@ -43,7 +46,7 @@ public class InvestmentAssetUniverseService {
* @param marketRequest the market request details
* @return Mono representing the existing or newly created market
*/
- public Mono getOrCreateMarket(MarketRequest marketRequest) {
+ public Mono upsertMarket(MarketRequest marketRequest) {
log.debug("Creating market: {}", marketRequest);
return assetUniverseApi.getMarket(marketRequest.getCode())
// If getMarket returns 404 NOT_FOUND, treat as "not found" and return Mono.empty()
@@ -59,7 +62,17 @@ public Mono getOrCreateMarket(MarketRequest marketRequest) {
.flatMap(existingMarket -> {
log.info("Market already exists: {}", existingMarket.getCode());
log.debug("Market already exists: {}", existingMarket);
- return Mono.just(existingMarket);
+ return assetUniverseApi.updateMarket(existingMarket.getCode(), marketRequest)
+ .doOnSuccess(updatedMarket -> log.info("Updated market: {}", updatedMarket))
+ .doOnError(error -> {
+ if (error instanceof WebClientResponseException w) {
+ log.error("Error updating market: {} : HTTP {} -> {}", marketRequest.getCode(),
+ w.getStatusCode(), w.getResponseBodyAsString());
+ } else {
+ log.error("Error updating market: {} : {}", marketRequest.getCode(),
+ error.getMessage(), error);
+ }
+ });
})
// If Mono is empty (market not found), create the market
.switchIfEmpty(assetUniverseApi.createMarket(marketRequest)
@@ -69,14 +82,15 @@ public Mono getOrCreateMarket(MarketRequest marketRequest) {
}
/**
- * Gets an existing asset by its identifier, or creates it if not found (404). Handles 404 NOT_FOUND from getAsset
- * by returning Mono.empty(), which triggers asset creation via switchIfEmpty.
+ * Gets an existing asset by its identifier, or creates it if not found (404). Handles 404 NOT_FOUND from
+ * getAsset by returning Mono.empty(), which triggers asset creation via switchIfEmpty.
*
* @param assetRequest the asset request details
+ * @param logo the thumbnail image
* @return Mono representing the existing or newly created asset
* @throws IOException if an I/O error occurs
*/
- public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest) {
+ public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest, File logo) {
log.debug("Creating asset: {}", assetRequest);
// Build a unique asset identifier using ISIN, market, and currency
@@ -95,12 +109,16 @@ public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest) {
return Mono.error(error);
})
// If asset exists, log and return it
- .flatMap(existingAsset -> {
+ .map(existingAsset -> {
log.info("Asset already exists with Asset Identifier : {}", assetIdentifier);
- return Mono.just(existingAsset);
+ return existingAsset;
})
+ .flatMap(a -> investmentRestAssetUniverseService.setAssetLogo(a, logo)
+ .thenReturn(a))
// If Mono is empty (asset not found), create the asset
.switchIfEmpty(customIntegrationApiService.createAsset(assetRequest)
+ .flatMap(a -> investmentRestAssetUniverseService.setAssetLogo(a, logo)
+ .thenReturn(a))
.doOnSuccess(createdAsset -> log.info("Created asset with assetIdentifier: {}", assetIdentifier))
.doOnError(error -> {
if (error instanceof WebClientResponseException w) {
@@ -115,13 +133,13 @@ public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest) {
}
/**
- * Gets an existing market special day by date and market, or creates it if not found. Handles 404 or empty results
- * by creating the market special day.
+ * Gets an existing market special day by date and market, or creates it if not found. Handles 404 or empty
+ * results by creating the market special day.
*
* @param marketSpecialDayRequest the request containing market and date details
* @return Mono\ representing the existing or newly created market special day
*/
- public Mono getOrCreateMarketSpecialDay(MarketSpecialDayRequest marketSpecialDayRequest) {
+ public Mono upsertMarketSpecialDay(MarketSpecialDayRequest marketSpecialDayRequest) {
log.debug("Creating market special day: {}", marketSpecialDayRequest);
LocalDate date = marketSpecialDayRequest.getDate();
@@ -141,7 +159,20 @@ public Mono getOrCreateMarketSpecialDay(MarketSpecialDayReques
.findFirst();
if (matchingSpecialDay.isPresent()) {
log.info("Market special day already exists for day: {}", marketSpecialDayRequest);
- return Mono.just(matchingSpecialDay.get());
+ return assetUniverseApi.updateMarketSpecialDay(matchingSpecialDay.get().getUuid().toString(),
+ marketSpecialDayRequest)
+ .doOnSuccess(updatedMarketSpecialDay ->
+ log.info("Updated market special day: {}", updatedMarketSpecialDay))
+ .doOnError(error -> {
+ if (error instanceof WebClientResponseException w) {
+ log.error("Error updating market special day : {} : HTTP {} -> {}",
+ marketSpecialDayRequest,
+ w.getStatusCode(), w.getResponseBodyAsString());
+ } else {
+ log.error("Error updating market special day {} : {}", marketSpecialDayRequest,
+ error.getMessage(), error);
+ }
+ });
} else {
log.debug("No market special day exists for day: {}", marketSpecialDayRequest);
return Mono.empty();
@@ -179,26 +210,28 @@ public Flux createAssets(List {
Map categoryIdByCode = categories.stream()
.collect(Collectors.toMap(AssetCategory::getCode, AssetCategory::getUuid));
-
return Flux.fromIterable(assets)
.flatMap(asset -> {
OASAssetRequestDataRequest assetRequest = assetMapper.map(asset, categoryIdByCode);
- return this.getOrCreateAsset(assetRequest).map(assetMapper::map);
+ return this.getOrCreateAsset(assetRequest, asset.getLogo()).map(assetMapper::map);
});
});
}
/**
- * Gets an existing asset category by its code, or creates it if not found. Handles empty results by creating the
- * asset category.
+ * Gets an existing asset category by its code, or creates it if not found. Handles empty results by creating
+ * the asset category.
*
* @param assetCategoryRequest the request containing asset category details
* @return Mono representing the existing or newly created asset category
*/
- public Mono createAssetCategory(AssetCategoryRequest assetCategoryRequest) {
+ public Mono upsertAssetCategory(AssetCategoryRequest assetCategoryRequest) {
if (assetCategoryRequest == null) {
return Mono.empty();
}
+ File logo = assetCategoryRequest.getImage();
+ // Post request cannot insert file directly, so set to null for the initial creation call
+ assetCategoryRequest.setImage(null);
return assetUniverseApi.listAssetCategories(assetCategoryRequest.getCode(), 100,
assetCategoryRequest.getName(), 0, assetCategoryRequest.getOrder(), assetCategoryRequest.getType())
.flatMap(paginatedAssetCategoryList -> {
@@ -212,7 +245,18 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor
.findFirst();
if (matchingCategory.isPresent()) {
log.info("Asset category already exists for code: {}", assetCategoryRequest.getCode());
- return Mono.just(matchingCategory.get());
+ return assetUniverseApi.updateAssetCategory(matchingCategory.get().getUuid().toString(), assetCategoryRequest)
+ .doOnSuccess(updatedCategory -> log.info("Updated asset category: {}", updatedCategory))
+ .doOnError(error -> {
+ if (error instanceof WebClientResponseException w) {
+ log.error("Error updating asset category: {} : HTTP {} -> {}", assetCategoryRequest.getCode(),
+ w.getStatusCode(), w.getResponseBodyAsString());
+ } else {
+ log.error("Error updating asset category: {} : {}", assetCategoryRequest.getCode(),
+ error.getMessage(), error);
+ }
+ })
+ .onErrorResume(e -> Mono.empty());
} else {
log.debug("No asset category exists for code: {}", assetCategoryRequest.getCode());
return Mono.empty();
@@ -232,7 +276,11 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor
error.getMessage(), error);
}
})
- );
+ )
+ .flatMap(ac -> investmentRestAssetUniverseService.setAssetCategoryLogo(ac.getUuid(), logo)
+ .thenReturn(ac)
+ )
+ .onErrorResume(e -> Mono.empty());
}
/**
@@ -242,7 +290,7 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor
* @param assetCategoryTypeRequest the request containing asset category type details
* @return Mono representing the existing or newly created asset category type
*/
- public Mono createAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest) {
+ public Mono upsertAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest) {
if (assetCategoryTypeRequest == null) {
return Mono.empty();
}
@@ -259,7 +307,18 @@ public Mono createAssetCategoryType(AssetCategoryTypeRequest
.findFirst();
if (matchingType.isPresent()) {
log.info("Asset category type already exists for code: {}", assetCategoryTypeRequest.getCode());
- return Mono.just(matchingType.get());
+ return assetUniverseApi.updateAssetCategoryType(matchingType.get().getUuid().toString(), assetCategoryTypeRequest)
+ .doOnSuccess(updatedType -> log.info("Updated asset category type: {}", updatedType))
+ .doOnError(error -> {
+ if (error instanceof WebClientResponseException w) {
+ log.error("Error updating asset category type: {} : HTTP {} -> {}",
+ assetCategoryTypeRequest.getCode(), w.getStatusCode(), w.getResponseBodyAsString());
+ } else {
+ log.error("Error updating asset category type: {} : {}",
+ assetCategoryTypeRequest.getCode(), error.getMessage(), error);
+ }
+ })
+ .onErrorResume(e -> Mono.empty());
} else {
log.debug("No asset category type exists for code: {}", assetCategoryTypeRequest.getCode());
return Mono.empty();
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java
new file mode 100644
index 000000000..51be1511d
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java
@@ -0,0 +1,253 @@
+package com.backbase.stream.investment.service;
+
+import com.backbase.investment.api.service.v1.AssetUniverseApi;
+import com.backbase.investment.api.service.v1.model.GroupResult;
+import com.backbase.investment.api.service.v1.model.OASCreatePriceRequest;
+import com.backbase.investment.api.service.v1.model.TypeEnum;
+import com.backbase.stream.investment.model.AssetWithMarketAndLatestPrice;
+import com.backbase.stream.investment.model.PaginatedExpandedAssetList;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nonnull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Service responsible for generating and ingesting intraday asset prices.
+ *
+ * Responsibilities:
+ * - Read assets with latest prices via {@link AssetUniverseApi}.
+ * - Generate a series of intraday OHLC price points per asset.
+ * - Submit intraday prices back to the Asset API using bulk create.
+ *
+ *
Notes:
+ * - The generator uses a randomised model constrained to realistic percentage ranges.
+ * - The service is reactive and non-blocking: ingestion returns a {@link Mono} that completes
+ * after all async submissions are triggered.
+ *
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class InvestmentIntradayAssetPriceService {
+
+ private final AssetUniverseApi assetUniverseApi;
+
+ /**
+ * Generates and triggers ingestion of intraday prices for all assets that have a latest price.
+ *
+ * @return a {@link Mono} that emits the combined list of results for batch operations when available.
+ */
+ public Mono> ingestIntradayPrices() {
+ return generateIntradayPrices()
+ .map(asyncTasks -> asyncTasks.stream().flatMap(Collection::stream).toList());
+ }
+
+ /**
+ * Internal reactive pipeline that fetches assets and creates intraday price creation tasks.
+ *
+ * @return a {@link Mono} emitting a list of lists of created {@link GroupResult}s (one list per asset request).
+ */
+ @Nonnull
+ private Mono>> generateIntradayPrices() {
+ log.info("Generating Intraday Prices for Assets");
+ return assetUniverseApi.listAssetsWithResponseSpec(
+ null, null, null, null,
+ List.of("market","latest_price"),
+ null, null,
+ "uuid,market,latest_price",
+ null, null, null, null, null,
+ null, null, null, null
+ ) // Above API returns custom projection with market and latest price expanded only, hence needs a custom return type.
+ .bodyToMono(PaginatedExpandedAssetList.class)
+ .flatMap(paginatedAssetList -> {
+
+ if (paginatedAssetList.getCount() == 0) {
+ log.warn("No assets found with latest prices to generate intraday prices");
+ return Mono.just(List.>of());
+ }
+
+ return Flux.fromIterable(paginatedAssetList.getResults())
+ .flatMap(assetWithMarketAndLatestPrice -> {
+ List requests =
+ generateIntradayPricesForAsset(assetWithMarketAndLatestPrice);
+
+ log.debug("Generated intraday price requests: {}", requests);
+
+ if (requests.isEmpty()) {
+ return Mono.empty();
+ }
+
+ return assetUniverseApi.bulkCreateIntradayAssetPrice(requests, null, null, null)
+ .collectList()
+ .doOnSuccess(created ->
+ log.info(
+ "Successfully triggered creation of {} intraday prices for asset ({})",
+ requests.size(),
+ assetWithMarketAndLatestPrice.uuid()
+ )
+ )
+ .doOnError(WebClientResponseException.class, ex ->
+ log.error(
+ "Failed to create intraday prices for asset ({}): status={}, body={}",
+ assetWithMarketAndLatestPrice.uuid(),
+ ex.getStatusCode(),
+ ex.getResponseBodyAsString(),
+ ex
+ )
+ )
+ .onErrorResume(e -> Mono.empty());
+ })
+ .collectList();
+ })
+ .doOnError(error -> {
+ if (error instanceof WebClientResponseException w) {
+ log.error(
+ "Error generating intraday prices for assets: HTTP {} -> {}",
+ w.getStatusCode(),
+ w.getResponseBodyAsString()
+ );
+ } else {
+ log.error(
+ "Error generating intraday prices for assets: {}",
+ error.getMessage(),
+ error
+ );
+ }
+ });
+
+ }
+
+ /**
+ * Generate a list of intraday price create requests for a single asset.
+ *
+ * Each request represents a 15-minute / 15-candle sequence.
+ *
+ * @param assetWithMarketAndLatestPrice the asset data with latest price information
+ * @return a list of {@link OASCreatePriceRequest} ready to be submitted
+ */
+ private List generateIntradayPricesForAsset(
+ AssetWithMarketAndLatestPrice assetWithMarketAndLatestPrice) {
+ List requests = new ArrayList<>();
+
+ // Base previous close
+ Double previousClose = assetWithMarketAndLatestPrice.expandedLatestPrice().previousClosePrice();
+
+ // Today
+ OffsetDateTime todaySessionStarts = assetWithMarketAndLatestPrice.expandedMarket().todaySessionStarts();
+ LocalDate today = todaySessionStarts.toLocalDate();
+ LocalTime time = intradayStartTime(todaySessionStarts);
+
+ for (int i = 0; i < 15; i++) {
+ // Generate intraday OHLC
+ Ohlc ohlc = generateIntradayOhlc(previousClose);
+
+ // Create request
+ requests.add(createIntradayRequest(assetWithMarketAndLatestPrice, ohlc, previousClose,
+ OffsetDateTime.of(today, time, ZoneOffset.UTC)));
+
+ // Next candle starts from this close
+ previousClose = ohlc.close();
+
+ // Move time forward by 15 minutes
+ time = time.plusMinutes(15);
+ }
+ return requests;
+ }
+
+ private LocalTime intradayStartTime(OffsetDateTime offsetDateTime) {
+ return offsetDateTime.toLocalTime().plusMinutes(ThreadLocalRandom.current().nextInt(1, 15));
+ }
+
+ private OASCreatePriceRequest createIntradayRequest(AssetWithMarketAndLatestPrice asset, Ohlc ohlc, Double previousClose,
+ OffsetDateTime dateTime) {
+
+ return new OASCreatePriceRequest()
+ .amount(ohlc.close())
+ .asset(Map.of("uuid", asset.uuid().toString()))
+ .datetime(dateTime)
+ .open(ohlc.open())
+ .high(ohlc.high())
+ .low(ohlc.low())
+ .previousClose(previousClose)
+ .type(TypeEnum.INTRADAY);
+ }
+
+ /**
+ * Deterministic randomised OHLC generator based on a previous close.
+ *
+ * Algorithm constraints:
+ * - Total intraday range: 2–5% (implemented as 4–9 per mille in code).
+ * - Opening gap: ±0.1–0.3%.
+ * - Candle body: 0.2–1.2%.
+ * - Wicks are larger than body and distributed to respect direction.
+ *
+ * @param previousClose the prior close price (must be positive)
+ * @return a map with keys \"open\", \"high\", \"low\", \"close\" rounded to 6 decimal places
+ * @throws IllegalArgumentException if previousClose is null or not positive
+ */
+ public static Ohlc generateIntradayOhlc(Double previousClose) {
+ if (previousClose == null || previousClose <= 0) {
+ throw new IllegalArgumentException("Previous close must be positive");
+ }
+
+ ThreadLocalRandom r = ThreadLocalRandom.current();
+
+ // Total intraday range: 2–5%
+ double totalRangePct = r.nextDouble(4.0, 9.01) / 100.0;
+
+ // Small opening gap: ±0.1–0.3%
+ double openGapPct = r.nextDouble(-0.3, 0.31) / 100.0;
+ double open = previousClose * (1 + openGapPct);
+
+ // Direction
+ boolean bullish = r.nextBoolean();
+
+ // Candle body: 0.2–1.2%
+ double bodyPct = r.nextDouble(0.2, 1.21) / 100.0;
+ double close = bullish
+ ? open * (1 + bodyPct)
+ : open * (1 - bodyPct);
+
+ // Wicks larger than body
+ double wickBudget = Math.max(totalRangePct - bodyPct, totalRangePct * 0.6);
+
+ double upperWickPct;
+ double lowerWickPct;
+
+ if (bullish) {
+ upperWickPct = wickBudget * r.nextDouble(0.6, 0.9);
+ lowerWickPct = wickBudget - upperWickPct;
+ } else {
+ lowerWickPct = wickBudget * r.nextDouble(0.6, 0.9);
+ upperWickPct = wickBudget - lowerWickPct;
+ }
+
+ double high = Math.max(open, close) * (1 + upperWickPct);
+ double low = Math.min(open, close) * (1 - lowerWickPct);
+
+ return new Ohlc(round6(open), round6(high), round6(low), round6(close));
+ }
+
+ private static double round6(double value) {
+ return BigDecimal
+ .valueOf(value)
+ .setScale(6, RoundingMode.HALF_UP)
+ .doubleValue();
+ }
+
+ public record Ohlc(double open, double high, double low, double close) {}
+
+}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java
index 08ab51c2f..7aa13305b 100644
--- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java
@@ -260,7 +260,7 @@ private Mono getPortfolioModel(PortfolioList portfolio, List new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.currency()), 0.2)).toList())
+ .map(a -> new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.getCurrency()), 0.2)).toList())
.build()));
}
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java
new file mode 100644
index 000000000..112c4eecf
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java
@@ -0,0 +1,120 @@
+package com.backbase.stream.investment.service.resttemplate;
+
+import com.backbase.investment.api.service.sync.ApiClient;
+import com.backbase.investment.api.service.sync.v1.AssetUniverseApi;
+import com.backbase.investment.api.service.sync.v1.model.AssetCategory;
+import com.backbase.investment.api.service.v1.model.Asset;
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.client.HttpClientErrorException;
+import reactor.core.publisher.Mono;
+
+@Slf4j
+@RequiredArgsConstructor
+public class InvestmentRestAssetUniverseService {
+
+ private final AssetUniverseApi assetUniverseApi;
+ private final ApiClient apiClient;
+
+ public Mono setAssetLogo(Asset asset, File logo) {
+ String assetUuid = asset.getUuid().toString();
+
+ if (logo == null) {
+ log.debug("Skipping logo attachment: assetUuid={}", assetUuid);
+ return Mono.just(asset);
+ }
+
+ log.info(
+ "Starting logo attachment for asset: assetUuid={}, assetName='{}', logoFile='{}', logoSize={}",
+ assetUuid, asset.getName(), logo.getName(), logo.length());
+
+ return Mono.defer(() -> Mono.just(assetUniverseApi.patchAsset(assetUuid, null, logo))).map(patchedAsset -> {
+ log.info(
+ "Logo attached successfully to asset:assetUuid={}, assetName='{}', logoFile='{}'", assetUuid,
+ asset.getName(), logo.getName());
+ return asset;
+ }).onErrorResume(throwable -> {
+ log.error(
+ "Logo attachment failed for asset:assetUuid={}, assetName='{}', logoFile='{}', errorType={}, errorMessage={}",
+ assetUuid, asset.getName(), logo.getName(), throwable.getClass().getSimpleName(),
+ throwable.getMessage(), throwable);
+ log.warn("Asset processing continuing without logo:assetUuid={}", assetUuid);
+ return Mono.just(asset);
+ });
+ }
+
+ public Mono setAssetCategoryLogo(UUID assetCategoryId, File logo) {
+ String assetCategoryUuid = assetCategoryId.toString();
+
+ if (logo == null) {
+ log.debug("Skipping logo attachment: operation=setLogo, assetCategoryUuid={}, reason=noLogoProvided",
+ assetCategoryUuid);
+ return Mono.empty();
+ }
+
+ log.info(
+ "Starting logo attachment for asset category: operation=setLogo, assetCategoryUuid={}, logoFile='{}', logoSize={}, action=start",
+ assetCategoryUuid, logo.getName(), logo.length());
+
+ return Mono.defer(() -> {
+ // verify the required parameter 'uuid' is set
+ if (assetCategoryUuid == null) {
+ throw new HttpClientErrorException(HttpStatus.BAD_REQUEST,
+ "Missing the required parameter 'uuid' when calling partialUpdateAssetCategory");
+ }
+
+ // create path and map variables
+ final Map uriVariables = new HashMap();
+ uriVariables.put("uuid", assetCategoryUuid);
+
+ final MultiValueMap localVarQueryParams = new LinkedMultiValueMap();
+ final HttpHeaders localVarHeaderParams = new HttpHeaders();
+ final MultiValueMap localVarCookieParams = new LinkedMultiValueMap();
+ final MultiValueMap localVarFormParams = new LinkedMultiValueMap();
+
+ localVarFormParams.add("image", new FileSystemResource(logo));
+
+ final String[] localVarAccepts = {"application/json"};
+ final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
+ final String[] localVarContentTypes = {"multipart/form-data"};
+ final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
+
+ String[] localVarAuthNames = new String[]{};
+
+ ParameterizedTypeReference localReturnType = new ParameterizedTypeReference() {
+ };
+ AssetCategory assetCategory = apiClient.invokeAPI("/service-api/v2/asset/asset-categories/{uuid}/",
+ HttpMethod.PATCH, uriVariables, localVarQueryParams, null, localVarHeaderParams,
+ localVarCookieParams, localVarFormParams, localVarAccept, localVarContentType, localVarAuthNames,
+ localReturnType).getBody();
+ return Mono.justOrEmpty(assetCategory.getUuid());
+ }).map(patchedAssetCategoryUuId -> {
+ log.info(
+ "Logo attached successfully to asset category: assetCategoryUuid={}, logoFile='{}'",
+ patchedAssetCategoryUuId, logo.getName());
+ return patchedAssetCategoryUuId;
+ }).onErrorResume(throwable -> {
+ log.error(
+ "Logo attachment failed for asset category: assetCategoryUuid={}, logoFile='{}', errorType={}, errorMessage={}",
+ assetCategoryId, logo.getName(), throwable.getClass().getSimpleName(), throwable.getMessage(),
+ throwable);
+ log.warn(
+ "Asset processing continuing without logo: assetCategoryUuid={}", assetCategoryId);
+ return Mono.just(assetCategoryId);
+ });
+ }
+
+}
\ No newline at end of file
diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java
new file mode 100644
index 000000000..c1a732a46
--- /dev/null
+++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java
@@ -0,0 +1,196 @@
+package com.backbase.stream.investment.service.resttemplate;
+
+import com.backbase.investment.api.service.sync.ApiClient;
+import com.backbase.investment.api.service.sync.v1.ContentApi;
+import com.backbase.investment.api.service.sync.v1.model.Entry;
+import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdate;
+import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdateRequest;
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Design notes. (see CODING_RULES_COPILOT.md)
+ *
+ * - No direct manipulation of generated API classes beyond construction & mapping
+ * - Side-effecting operations are logged at info (create) or debug (patch) levels
+ * - Exceptions from the underlying WebClient are propagated (caller decides retry strategy)
+ * - All reactive operations include proper success and error handlers for observability
+ *
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class InvestmentRestNewsContentService {
+
+ public static final int CONTENT_RETRIEVE_LIMIT = 100;
+ private final ContentApi contentApi;
+ private final ApiClient apiClient;
+
+ /**
+ * Upserts a list of content entries. For each entry, checks if content with the same title exists. If exists,
+ * updates it; otherwise creates a new entry. Continues processing remaining entries even if individual entries
+ * fail.
+ *
+ * @param contentEntries List of content entries to upsert
+ * @return Mono that completes when all entries have been processed
+ */
+ public Mono upsertContent(List contentEntries) {
+ log.info("Starting content upsert batch operation:, totalEntries={}", contentEntries.size());
+ log.debug("Content upsert batch details: entries={}", contentEntries);
+
+ return findEntriesNewContent(contentEntries).flatMap(this::upsertSingleEntry).doOnComplete(
+ () -> log.info("Content upsert batch completed successfully: totalEntriesProcessed={}",
+ contentEntries.size())).doOnError(
+ error -> log.error("Content upsert batch failed: totalEntries={}, errorType={}, errorMessage={}",
+ contentEntries.size(), error.getClass().getSimpleName(), error.getMessage(), error)).then();
+ }
+
+ /**
+ * Upserts a single content entry. Checks if an entry with the same title exists, and either updates the existing
+ * entry or creates a new one. Errors are logged and swallowed to allow processing of remaining entries.
+ *
+ * @param request The content entry to upsert
+ * @return Mono that completes when the entry has been processed
+ */
+ private Mono upsertSingleEntry(EntryCreateUpdateRequest request) {
+ log.debug("Processing content entry: title='{}', hasThumbnail={}", request.getTitle(),
+ request.getThumbnail() != null);
+
+ return createNewEntry(request)
+ .doOnSuccess(
+ result -> log.info("Content entry processed successfully: title='{}', uuid={}", request.getTitle(),
+ result.getUuid())
+ ).doOnError(throwable -> {
+ if (throwable instanceof WebClientResponseException ex) {
+ log.error(
+ "Content entry processing failed with API error: title='{}', httpStatus={}, errorResponse={}",
+ request.getTitle(), ex.getStatusCode(), ex.getResponseBodyAsString(), ex);
+ } else {
+ log.error(
+ "Content entry processing failed with unexpected error: title='{}', errorType={}, errorMessage={}",
+ request.getTitle(), throwable.getClass().getSimpleName(), throwable.getMessage(), throwable);
+ }
+ })
+ .onErrorResume(error -> {
+ log.warn("Skipping failed content entry in batch: title='{}', decision=skip, reason={}",
+ request.getTitle(),
+ error.getMessage());
+ return Mono.empty();
+ });
+ }
+
+ private Flux findEntriesNewContent(List contentEntries) {
+ Map entryByTitle = contentEntries.stream()
+ .collect(Collectors.toMap(EntryCreateUpdateRequest::getTitle, Function.identity()));
+ log.debug("Filtering content entries: requestedTitles={}", entryByTitle.keySet());
+
+ List existsNews = contentApi.listContentEntries(null, CONTENT_RETRIEVE_LIMIT, 0, null, null, null, null)
+ .getResults().stream().filter(Objects::nonNull).toList();
+
+ if (existsNews.isEmpty()) {
+ log.info("No existing content found in system:requestedEntries={}, existingEntries=0, newEntries={}",
+ entryByTitle.size(), entryByTitle.size());
+ return Flux.fromIterable(entryByTitle.values());
+ }
+
+ Set existTitles = existsNews.stream().map(Entry::getTitle).collect(Collectors.toSet());
+ List newEntries = contentEntries.stream()
+ .filter(c -> existTitles.stream().noneMatch(e -> c.getTitle().contains(e))).toList();
+
+ log.info(
+ "Content filtering completed: requestedEntries={}, existingEntriesFound={}, newEntriesToCreate={}, duplicatesSkipped={}",
+ entryByTitle.size(), existsNews.size(), newEntries.size(), entryByTitle.size() - newEntries.size());
+ log.debug("Filtered new content titles: newTitles={}",
+ newEntries.stream().map(EntryCreateUpdateRequest::getTitle).collect(Collectors.toList()));
+
+ return Flux.fromIterable(newEntries);
+ }
+
+ /**
+ * Creates a new content entry.
+ *
+ * @param request The content data for the new entry
+ * @return Mono containing the created entry
+ */
+ private Mono createNewEntry(EntryCreateUpdateRequest request) {
+ log.debug("Creating new content entry: title='{}', hasThumbnail={}", request.getTitle(),
+ request.getThumbnail() != null);
+ File thumbnail = request.getThumbnail();
+ request.setThumbnail(null);
+ return Mono.defer(() -> Mono.just(contentApi.createContentEntry(request)))
+ .flatMap(e -> addThumbnail(e, thumbnail))
+ .doOnSuccess(
+ created -> log.info("Content entry created successfully: title='{}', uuid={}, thumbnailAttached={}",
+ request.getTitle(), created.getUuid(), thumbnail != null))
+ .doOnError(
+ error -> log.error("Content entry creation failed: title='{}', errorType={}, errorMessage={}",
+ request.getTitle(), error.getClass().getSimpleName(), error.getMessage(), error))
+ .onErrorResume(error -> Mono.empty());
+ }
+
+ private Mono addThumbnail(EntryCreateUpdate entry, File thumbnail) {
+ UUID uuid = entry.getUuid();
+
+ if (thumbnail == null) {
+ log.debug("Skipping thumbnail attachment: uuid={}", uuid);
+ return Mono.just(entry);
+ }
+
+ log.debug("Attaching thumbnail to content entry: uuid={}, thumbnailFile='{}', thumbnailSize={}", uuid,
+ thumbnail.getName(), thumbnail.length());
+
+ return Mono.defer(() -> {
+ // create path and map variables
+ Map uriVariables = new HashMap<>();
+ uriVariables.put("uuid", uuid);
+
+ MultiValueMap localVarQueryParams = new LinkedMultiValueMap<>();
+ HttpHeaders localVarHeaderParams = new HttpHeaders();
+ MultiValueMap localVarCookieParams = new LinkedMultiValueMap<>();
+ MultiValueMap localVarFormParams = new LinkedMultiValueMap<>();
+
+ FileSystemResource value = new FileSystemResource(thumbnail);
+ localVarFormParams.add("thumbnail", value);
+
+ final String[] localVarAccepts = {"application/json"};
+ final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
+ final String[] localVarContentTypes = {"multipart/form-data"};
+ final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
+
+ String[] localVarAuthNames = new String[]{};
+
+ ParameterizedTypeReference localReturnType = new ParameterizedTypeReference<>() {
+ };
+ apiClient.invokeAPI("/service-api/v2/content/entries/{uuid}/", HttpMethod.PATCH, uriVariables,
+ localVarQueryParams, null, localVarHeaderParams, localVarCookieParams, localVarFormParams,
+ localVarAccept, localVarContentType, localVarAuthNames, localReturnType);
+
+ log.info("Thumbnail attached successfully: uuid={}, thumbnailFile='{}'", uuid, thumbnail.getName());
+ return Mono.just(entry);
+ }).doOnError(error -> log.error(
+ "Thumbnail attachment failed: uuid={}, thumbnailFile='{}', errorType={}, errorMessage={}", uuid,
+ thumbnail.getName(), error.getClass().getSimpleName(), error.getMessage(), error)).onErrorResume(error -> {
+ log.warn("Content entry created without thumbnail: uuid={}, reason={}", uuid, error.getMessage());
+ return Mono.just(entry);
+ });
+ }
+
+}
diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java
similarity index 95%
rename from stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java
rename to stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java
index f6e9d3082..46848b480 100644
--- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java
+++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java
@@ -20,6 +20,7 @@
import com.backbase.stream.investment.RandomParam;
import com.backbase.stream.investment.service.InvestmentAssetPriceService;
import com.backbase.stream.investment.service.InvestmentAssetUniverseService;
+import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -33,8 +34,8 @@
import reactor.test.StepVerifier;
/**
- * Test suite for {@link InvestmentAssetUniversSaga}, focusing on the asynchronous price ingestion workflow with polling
- * and timeout behavior.
+ * Test suite for {@link InvestmentAssetUniverseSaga}, focusing on the asynchronous price ingestion
+ * workflow with polling and timeout behavior.
*
* These tests verify:
*
@@ -44,7 +45,7 @@
* - Error propagation during price ingestion
*
*/
-class InvestmentAssetUniversSagaTest {
+class InvestmentAssetUniverseSagaTest {
@Mock
private InvestmentAssetUniverseService assetUniverseService;
@@ -52,21 +53,25 @@ class InvestmentAssetUniversSagaTest {
@Mock
private InvestmentAssetPriceService investmentAssetPriceService;
+ @Mock
+ private InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService;
+
@Mock
private InvestmentIngestionConfigurationProperties configurationProperties;
- private InvestmentAssetUniversSaga saga;
+ private InvestmentAssetUniverseSaga saga;
@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
- saga = new InvestmentAssetUniversSaga(
+ saga = new InvestmentAssetUniverseSaga(
assetUniverseService,
investmentAssetPriceService,
+ investmentIntradayAssetPriceService,
configurationProperties
);
// Enable asset universe by default
- when(configurationProperties.isAssetUniversEnabled()).thenReturn(true);
+ when(configurationProperties.isAssetUniverseEnabled()).thenReturn(true);
}
@@ -236,8 +241,8 @@ void upsertPrices_error_duringIngestion() {
InvestmentAssetsTask task = createTestTask();
// Mock: Markets and market special days creation succeed
- when(assetUniverseService.getOrCreateMarket(any())).thenReturn(Mono.empty());
- when(assetUniverseService.getOrCreateMarketSpecialDay(any())).thenReturn(Mono.empty());
+ when(assetUniverseService.upsertMarket(any())).thenReturn(Mono.empty());
+ when(assetUniverseService.upsertMarketSpecialDay(any())).thenReturn(Mono.empty());
when(assetUniverseService.createAssets(anyList())).thenReturn(Flux.fromIterable(task.getData().getAssets()));
// Mock: Price ingestion fails with an exception
@@ -364,7 +369,7 @@ void upsertPrices_success_mixedStatuses() {
* @return a configured test task
*/
private InvestmentAssetsTask createTestTask() {
- // Create sample assets using the record constructor
+ // Create sample assets using the Asset constructor
Asset asset1 = new Asset(
UUID.randomUUID(),
"Apple Inc.",
@@ -376,8 +381,8 @@ private InvestmentAssetsTask createTestTask() {
Collections.emptyMap(),
AssetTypeEnum.STOCK,
List.of("Technology"),
- null,
"AAPL-001",
+ null,
"Apple Inc. Stock",
150.0
);
@@ -393,8 +398,8 @@ private InvestmentAssetsTask createTestTask() {
Collections.emptyMap(),
AssetTypeEnum.STOCK,
List.of("Technology"),
- null,
"MSFT-001",
+ null,
"Microsoft Corp. Stock",
200.0
);
diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java
index 56f7f9c3d..24627ab2c 100644
--- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java
+++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java
@@ -6,6 +6,9 @@
import com.backbase.investment.api.service.v1.model.Market;
import com.backbase.investment.api.service.v1.model.MarketRequest;
import com.backbase.investment.api.service.v1.model.OASAssetRequestDataRequest;
+import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
@@ -19,41 +22,44 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
/**
- * This is a custom implementations to avoid issues with Reactive and multipart requests
+ * This is a custom implementation to avoid issues with Reactive and multipart requests.
*/
class InvestmentAssetUniverseServiceTest {
InvestmentAssetUniverseService service;
AssetUniverseApi assetUniverseApi;
ApiClient apiClient;
+ InvestmentRestAssetUniverseService investmentRestAssetUniverseService;
CustomIntegrationApiService customIntegrationApiService;
@BeforeEach
void setUp() {
assetUniverseApi = Mockito.mock(AssetUniverseApi.class);
apiClient = Mockito.mock(ApiClient.class);
- customIntegrationApiService = Mockito.spy(new CustomIntegrationApiService(apiClient));
- service = new InvestmentAssetUniverseService(assetUniverseApi, customIntegrationApiService);
+ investmentRestAssetUniverseService = Mockito.mock(InvestmentRestAssetUniverseService.class);
+ customIntegrationApiService = Mockito.mock(CustomIntegrationApiService.class);
+ service = new InvestmentAssetUniverseService(assetUniverseApi,
+ investmentRestAssetUniverseService, customIntegrationApiService);
}
@Test
- void getOrCreateMarket_marketExists() {
+ void upsertMarket_marketExists() {
MarketRequest request = new MarketRequest().code("US");
- Market market = new Market().code("US");
+ Market market = new Market().code("US").name("Usa Market");
+ Market marketUpdated = new Market().code("US").name("Usa Market Updated");
Mockito.when(assetUniverseApi.getMarket("US")).thenReturn(Mono.just(market));
- Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.empty());
+ Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.just(market));
+ Mockito.when(assetUniverseApi.updateMarket("US",request)).thenReturn(Mono.just(marketUpdated));
- StepVerifier.create(service.getOrCreateMarket(request))
- .expectNext(market)
+ StepVerifier.create(service.upsertMarket(request))
+ .expectNext(marketUpdated)
.verifyComplete();
}
@Test
- void getOrCreateMarket_marketNotFound_createsMarket() {
+ void upsertMarket_marketNotFound_createsMarket() {
MarketRequest request = new MarketRequest().code("US");
Market createdMarket = new Market().code("US");
Mockito.when(assetUniverseApi.getMarket("US"))
@@ -66,25 +72,25 @@ void getOrCreateMarket_marketNotFound_createsMarket() {
)));
Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.just(createdMarket));
- StepVerifier.create(service.getOrCreateMarket(request))
+ StepVerifier.create(service.upsertMarket(request))
.expectNext(createdMarket)
.verifyComplete();
}
@Test
- void getOrCreateMarket_otherError_propagates() {
+ void upsertMarket_otherError_propagates() {
MarketRequest request = new MarketRequest().code("US");
Mockito.when(assetUniverseApi.getMarket("US"))
.thenReturn(Mono.error(new RuntimeException("API error")));
Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.empty());
- StepVerifier.create(service.getOrCreateMarket(request))
+ StepVerifier.create(service.upsertMarket(request))
.expectErrorMatches(e -> e instanceof RuntimeException && e.getMessage().equals("API error"))
.verify();
}
@Test
- void getOrCreateAsset_assetExists() throws IOException {
+ void getOrCreateAsset_assetExists() {
OASAssetRequestDataRequest req = new OASAssetRequestDataRequest()
.isin("ABC123").market("US").currency("USD");
Asset asset = new Asset().isin("ABC123");
@@ -109,17 +115,19 @@ void getOrCreateAsset_assetExists() throws IOException {
Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class)))
.thenReturn(Mono.just(asset));
- StepVerifier.create(service.getOrCreateAsset(req))
+ StepVerifier.create(service.getOrCreateAsset(req, null))
.expectNext(asset)
.verifyComplete();
}
@Test
- void getOrCreateAsset_assetNotFound_createsAsset() throws IOException {
+ void getOrCreateAsset_assetNotFound_createsAsset() {
OASAssetRequestDataRequest req = new OASAssetRequestDataRequest()
.isin("ABC123").market("US").currency("USD");
Asset createdAsset = new Asset().isin("ABC123");
String assetId = "ABC123_US_USD";
+ File logo = null;
+
Mockito.when(assetUniverseApi.getAsset(assetId, null, null, null))
.thenReturn(Mono.error(WebClientResponseException.create(
HttpStatus.NOT_FOUND.value(),
@@ -146,13 +154,13 @@ void getOrCreateAsset_assetNotFound_createsAsset() throws IOException {
Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class)))
.thenReturn(Mono.just(createdAsset));
- StepVerifier.create(service.getOrCreateAsset(req))
+ StepVerifier.create(service.getOrCreateAsset(req, logo))
.expectNext(createdAsset)
.verifyComplete();
}
@Test
- void getOrCreateAsset_otherError_propagates() throws IOException {
+ void getOrCreateAsset_otherError_propagates() {
OASAssetRequestDataRequest req = new OASAssetRequestDataRequest()
.isin("ABC123").market("US").currency("USD");
String assetId = "ABC123_US_USD";
@@ -177,13 +185,13 @@ void getOrCreateAsset_otherError_propagates() throws IOException {
Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class)))
.thenReturn(Mono.error(new RuntimeException("API error")));
- StepVerifier.create(service.getOrCreateAsset(req))
+ StepVerifier.create(service.getOrCreateAsset(req, null))
.expectErrorMatches(e -> e instanceof RuntimeException && e.getMessage().equals("API error"))
.verify();
}
@Test
- void getOrCreateAsset_createAssetFails_propagates() throws IOException {
+ void getOrCreateAsset_createAssetFails_propagates() {
OASAssetRequestDataRequest req = new OASAssetRequestDataRequest()
.isin("ABC123").market("US").currency("USD");
String assetId = "ABC123_US_USD";
@@ -213,22 +221,20 @@ void getOrCreateAsset_createAssetFails_propagates() throws IOException {
Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class)))
.thenReturn(Mono.error(new RuntimeException("Create asset failed")));
- StepVerifier.create(service.getOrCreateAsset(req))
+ StepVerifier.create(service.getOrCreateAsset(req, null))
.expectErrorMatches(e -> e instanceof RuntimeException && e.getMessage().equals("Create asset failed"))
.verify();
}
@Test
void getOrCreateAsset_nullRequest_returnsError() {
- StepVerifier.create(Mono.defer(() -> {
- return service.getOrCreateAsset(null);
- }))
+ StepVerifier.create(Mono.defer(() -> service.getOrCreateAsset(null, null)))
.expectError(NullPointerException.class)
.verify();
}
@Test
- void getOrCreateAsset_emptyMonoFromCreateAsset() throws IOException {
+ void getOrCreateAsset_emptyMonoFromCreateAsset() {
OASAssetRequestDataRequest req = new OASAssetRequestDataRequest()
.isin("ABC123").market("US").currency("USD");
String assetId = "ABC123_US_USD";
@@ -258,7 +264,7 @@ void getOrCreateAsset_emptyMonoFromCreateAsset() throws IOException {
Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class)))
.thenReturn(Mono.empty());
- StepVerifier.create(service.getOrCreateAsset(req))
+ StepVerifier.create(service.getOrCreateAsset(req, null))
.expectComplete()
.verify();
}
diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java
new file mode 100644
index 000000000..1180ab768
--- /dev/null
+++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java
@@ -0,0 +1,54 @@
+package com.backbase.stream.investment.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService.Ohlc;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests covering deterministic parts of the intraday price generator.
+ *
+ * Focus is on {@link InvestmentIntradayAssetPriceService#generateIntradayOhlc(Double)} which is
+ * deterministic given a provided Random seed; tests assert structural invariants and rounding.
+ */
+class InvestmentIntradayAssetPriceServiceTest {
+
+ @Test
+ void generateIntradayOhlc_shouldValidateInput() {
+ assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(null))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(0.0))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(-1.0))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @RepeatedTest(10)
+ void generateIntradayOhlc_shouldProduceValidOhlcStructure() {
+ double previous = 100.0;
+ Ohlc ohlc = InvestmentIntradayAssetPriceService.generateIntradayOhlc(previous);
+
+ double open = ohlc.open();
+ double high = ohlc.high();
+ double low = ohlc.low();
+ double close = ohlc.close();
+
+ // Basic sanity
+ assertThat(open).isGreaterThan(0.0);
+ assertThat(high).isGreaterThan(0.0);
+ assertThat(low).isGreaterThan(0.0);
+ assertThat(close).isGreaterThan(0.0);
+
+ // High must be >= max(open, close), low must be <= min(open, close)
+ assertThat(high).isGreaterThanOrEqualTo(Math.max(open, close));
+ assertThat(low).isLessThanOrEqualTo(Math.min(open, close));
+
+ // Values should be rounded to 6 decimal places: value * 1e6 should be near-integer
+ assertThat(Math.abs(Math.round(open * 1_000_000.0) - open * 1_000_000.0)).isLessThan(1e-6);
+ assertThat(Math.abs(Math.round(high * 1_000_000.0) - high * 1_000_000.0)).isLessThan(1e-6);
+ assertThat(Math.abs(Math.round(low * 1_000_000.0) - low * 1_000_000.0)).isLessThan(1e-6);
+ assertThat(Math.abs(Math.round(close * 1_000_000.0) - close * 1_000_000.0)).isLessThan(1e-6);
+ }
+}
\ No newline at end of file
diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java
new file mode 100644
index 000000000..3748f2e60
--- /dev/null
+++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java
@@ -0,0 +1,45 @@
+package com.backbase.stream.investment.service;
+
+import com.backbase.investment.api.service.sync.ApiClient;
+import com.backbase.investment.api.service.sync.v1.ContentApi;
+import com.backbase.investment.api.service.v1.model.EntryCreateUpdate;
+import com.backbase.investment.api.service.v1.model.EntryCreateUpdateRequest;
+import com.backbase.investment.api.service.v1.model.PaginatedEntryList;
+import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService;
+import java.util.List;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class InvestmentRestNewsContentServiceTest {
+
+ private ContentApi contentApi;
+ private ApiClient apiClient;
+ private InvestmentRestNewsContentService service;
+
+ @BeforeEach
+ void setUp() {
+ contentApi = Mockito.mock(ContentApi.class);
+ apiClient = Mockito.mock(ApiClient.class);
+ service = new InvestmentRestNewsContentService(contentApi, apiClient);
+ }
+
+ @Test
+ void upsertContent_createsNewEntry_whenNotExists() {
+ // Given
+ EntryCreateUpdateRequest request = new EntryCreateUpdateRequest()
+ .title("New Article")
+ .excerpt("Excerpt")
+ .tags(List.of("tag1"));
+
+ PaginatedEntryList emptyList = new PaginatedEntryList()
+ .count(0)
+ .results(List.of());
+
+ EntryCreateUpdate created = new EntryCreateUpdate();
+
+ }
+
+
+}
+