Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Changelog
All notable changes to this project will be documented in this file.

## [9.5.0]
### Added
- investment service intraday generation and ingestion function
### Changed
- fix investment asset universe from create to upsert implementation

## [9.4.x]
### Changed

## [9.3.0](https://github.com/Backbase/stream-services/compare/9.2.0...9.3.0)
### Changed
- fix for NoSuchElementException (No value present) thrown while update data groups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.backbase.stream.investment.service.InvestmentAssetPriceService;
import com.backbase.stream.investment.service.InvestmentAssetUniverseService;
import com.backbase.stream.investment.service.InvestmentClientService;
import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService;
import com.backbase.stream.investment.service.InvestmentModelPortfolioService;
import com.backbase.stream.investment.service.InvestmentPortfolioAllocationService;
import com.backbase.stream.investment.service.InvestmentPortfolioService;
Expand Down Expand Up @@ -73,6 +74,11 @@ public InvestmentAssetPriceService investmentAssetPriceService(AssetUniverseApi
return new InvestmentAssetPriceService(assetUniverseApi);
}

@Bean
public InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService(AssetUniverseApi assetUniverseApi) {
return new InvestmentIntradayAssetPriceService(assetUniverseApi);
}

@Bean
public InvestmentPortfolioAllocationService investmentPortfolioAllocationService(AllocationsApi allocationsApi,
AssetUniverseApi assetUniverseApi, InvestmentApi investmentApi,
Expand All @@ -96,9 +102,10 @@ public InvestmentSaga investmentSaga(InvestmentClientService investmentClientSer
public InvestmentAssetUniversSaga investmentStaticDataSaga(
InvestmentAssetUniverseService investmentAssetUniverseService,
InvestmentAssetPriceService investmentAssetPriceService,
InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService,
InvestmentIngestionConfigurationProperties coreConfigurationProperties) {
return new InvestmentAssetUniversSaga(investmentAssetUniverseService, investmentAssetPriceService,
coreConfigurationProperties);
investmentIntradayAssetPriceService, coreConfigurationProperties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class InvestmentAssetData {
private List<Asset> assets;
private List<AssetPrice> assetPrices;
private List<GroupResult> priceAsyncTasks;
private List<GroupResult> intradayPriceAsyncTasks;

public Map<String, AssetPrice> getPriceByAsset() {
return Objects.requireNonNullElse(assetPrices, List.<AssetPrice>of()).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ public InvestmentAssetsTask setPriceTasks(List<GroupResult> tasks) {
return this;
}

public InvestmentAssetsTask setIntradayPriceTasks(List<GroupResult> tasks) {
data.setIntradayPriceAsyncTasks(tasks);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.backbase.stream.investment.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.UUID;

/**
* Representation of an asset returned by the List Assets API when the response includes expanded
* market and latest price details and custom fields.
* Example JSON:
* <pre>
* {
* "uuid": "123e4567-e89b-12d3-a456-426614174000",
* "market": {
* "code": "NYSE",
* "name": "New York Stock Exchange",
* "is_open": true,
* "today_session_starts": "2024-01-01T09:30:00+00:00Z",
* "today_session_ends": "2024-01-01T16:00:00+00:00Z",
* "market_reopens": null
* },
* "latest_price": {
* "amount": 123.45,
* "datetime": "2024-01-01T15:30:00+00:00Z",
* "open_price": 120.00,
* "high_price": 125.00,
* "low_price": 119.50,
* "previous_close_price": 121.00
* }
* }
* </pre>
*
* @param uuid Asset identifier (mapped from JSON property `uuid`)
* @param expandedMarket Expanded market details (mapped from JSON property `market`)
* @param expandedLatestPrice Expanded latest price details (mapped from JSON property `latest_price`)
*/
public record AssetWithMarketAndLatestPrice(@JsonProperty("uuid") UUID uuid,
@JsonProperty("market") ExpandedMarket expandedMarket,
@JsonProperty("latest_price") ExpandedLatestPrice expandedLatestPrice) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.backbase.stream.investment.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.OffsetDateTime;

/**
* Representation of an expanded latest price returned by the List Assets API when the response is
* requested with expansion of latest price details and custom fields.
* Example JSON:
* <pre>
* "latest_price": {
* "amount": 123.45,
* "datetime": "2026-01-21T12:58:00.000000Z",
* "open_price": 120.00,
* "high_price": 125.00,
* "low_price": 119.50,
* "previous_close_price": 121.00
* }
* </pre>
*/
public record ExpandedLatestPrice(
@JsonProperty("amount") Double amount,
@JsonProperty("datetime") OffsetDateTime datetime,
@JsonProperty("open_price") Double openPrice,
@JsonProperty("high_price") Double highPrice,
@JsonProperty("low_price") Double lowPrice,
@JsonProperty("previous_close_price") Double previousClosePrice
) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.backbase.stream.investment.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.OffsetDateTime;

/**
* Representation of an expanded market returned by the List Assets API when the response is
* requested with expansion of market details and custom fields.
* Example JSON:
* <pre>
*"market": {
* "code": "XETR",
* "name": "XETRA",
* "is_open": true,
* "today_session_starts": "2026-01-22T09:00:00.000000Z",
* "today_session_ends": "2026-01-22T17:30:00.000000Z",
* "market_reopens": "2026-01-23T09:00:00.000000Z"
* }
* </pre>
*
*/
public record ExpandedMarket(@JsonProperty("code") String code,
@JsonProperty("name") String name,
@JsonProperty("is_open") Boolean isOpen,
@JsonProperty("today_session_starts") OffsetDateTime todaySessionStarts,
@JsonProperty("today_session_ends") OffsetDateTime todaySessionEnds,
@JsonProperty("market_reopens") OffsetDateTime marketReopens) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.backbase.stream.investment.model;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode
@Data
@Builder
public class PaginatedExpandedAssetList {

public static final String JSON_PROPERTY_COUNT = "count";
private Integer count;

public static final String JSON_PROPERTY_NEXT = "next";
private URI next;

public static final String JSON_PROPERTY_PREVIOUS = "previous";
private URI previous;

public static final String JSON_PROPERTY_RESULTS = "results";
private List<AssetWithMarketAndLatestPrice> results = new ArrayList<>();

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.backbase.stream.investment.service.InvestmentAssetPriceService;
import com.backbase.stream.investment.service.InvestmentAssetUniverseService;
import com.backbase.stream.investment.service.InvestmentClientService;
import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService;
import com.backbase.stream.investment.service.InvestmentPortfolioService;
import com.backbase.stream.worker.StreamTaskExecutor;
import com.backbase.stream.worker.model.StreamTask;
Expand Down Expand Up @@ -59,32 +60,34 @@ public class InvestmentAssetUniversSaga implements StreamTaskExecutor<Investment

private final InvestmentAssetUniverseService assetUniverseService;
private final InvestmentAssetPriceService investmentAssetPriceService;
private final InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService;
private final InvestmentIngestionConfigurationProperties coreConfigurationProperties;

@Override
public Mono<InvestmentAssetsTask> executeTask(InvestmentAssetsTask streamTask) {
if (!coreConfigurationProperties.isAssetUniversEnabled()) {
log.warn("Skip investment asset univers saga execution: taskId={}, taskName={}",
log.warn("Skip investment asset universe saga execution: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName());
return Mono.just(streamTask);
}
log.info("Starting investment saga execution: taskId={}, taskName={}",
log.info("Starting investment asset universe saga execution: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName());
return createMarkets(streamTask)
.flatMap(this::createMarketSpecialDays)
.flatMap(this::createAssetCategoryTypes)
.flatMap(this::createAssetCategories)
return upsertMarkets(streamTask)
.flatMap(this::upsertMarketSpecialDays)
.flatMap(this::upsertAssetCategoryTypes)
.flatMap(this::upsertAssetCategories)
.flatMap(this::createAssets)
.flatMap(this::upsertPrices)
.flatMap(this::createIntradayPrices)
.doOnSuccess(completedTask -> log.info(
"Successfully completed investment saga: taskId={}, taskName={}, state={}",
"Successfully completed investment asset universe saga: taskId={}, taskName={}, state={}",
completedTask.getId(), completedTask.getName(), completedTask.getState()))
.doOnError(throwable -> {
log.error("Failed to execute investment saga: taskId={}, taskName={}",
log.error("Failed to execute investment asset universe saga: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName(), throwable);
streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED,
streamTask.getName(), streamTask.getId(),
"Investment saga failed: " + throwable.getMessage());
"Investment asset universe saga failed: " + throwable.getMessage());
streamTask.setState(State.FAILED);
})
.onErrorResume(throwable -> Mono.just(streamTask));
Expand All @@ -96,6 +99,11 @@ private Mono<InvestmentAssetsTask> upsertPrices(InvestmentAssetsTask investmentT
.map(investmentTask::setPriceTasks);
}

private Mono<InvestmentAssetsTask> createIntradayPrices(InvestmentAssetsTask investmentTask) {
return investmentIntradayAssetPriceService.ingestIntradayPrices()
.map(investmentTask::setIntradayPriceTasks);
}

/**
* Rollback is not implemented for investment saga.
*
Expand All @@ -112,7 +120,7 @@ public Mono<InvestmentAssetsTask> rollBack(InvestmentAssetsTask streamTask) {
return Mono.empty();
}

public Mono<InvestmentAssetsTask> createMarkets(InvestmentAssetsTask investmentTask) {
public Mono<InvestmentAssetsTask> upsertMarkets(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int marketCount = investmentData.getMarkets() != null ? investmentData.getMarkets().size() : 0;
log.info("Starting investment market creation: taskId={}, marketCount={}",
Expand All @@ -130,7 +138,7 @@ public Mono<InvestmentAssetsTask> createMarkets(InvestmentAssetsTask investmentT

// Process each market: create or get from asset universe service
return Flux.fromIterable(investmentData.getMarkets())
.flatMap(market -> assetUniverseService.getOrCreateMarket(
.flatMap(market -> assetUniverseService.upsertMarket(
new MarketRequest()
.code(market.getCode())
.name(market.getName())
Expand Down Expand Up @@ -161,15 +169,15 @@ public Mono<InvestmentAssetsTask> createMarkets(InvestmentAssetsTask investmentT
}

/**
* Creates or upserts market special days for the investment task.
* Upserts market special days for the investment task.
*
* <p>This method processes each market special day in the task data by invoking
* the asset universe service. It updates the task state and logs progress for observability.
*
* @param investmentTask the investment task containing market special day data
* @return Mono emitting the updated investment task with market special days set
*/
public Mono<InvestmentAssetsTask> createMarketSpecialDays(InvestmentAssetsTask investmentTask) {
public Mono<InvestmentAssetsTask> upsertMarketSpecialDays(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int marketSpecialDayCount =
investmentData.getMarketSpecialDays() != null ? investmentData.getMarketSpecialDays().size() : 0;
Expand All @@ -188,7 +196,7 @@ public Mono<InvestmentAssetsTask> createMarketSpecialDays(InvestmentAssetsTask i

// Process each market special day: create or get from asset universe service
return Flux.fromIterable(investmentData.getMarketSpecialDays())
.flatMap(marketSpecialDay -> assetUniverseService.getOrCreateMarketSpecialDay(
.flatMap(marketSpecialDay -> assetUniverseService.upsertMarketSpecialDay(
new MarketSpecialDayRequest()
.date(marketSpecialDay.getDate())
.market(marketSpecialDay.getMarket())
Expand Down Expand Up @@ -219,15 +227,15 @@ public Mono<InvestmentAssetsTask> createMarketSpecialDays(InvestmentAssetsTask i
}

/**
* Creates or upserts asset categories for the investment task.
* Upserts asset categories for the investment task.
*
* <p>This method processes each asset category in the task data by invoking
* the asset universe service. It updates the task state and logs progress for observability.
*
* @param investmentTask the investment task containing asset category data
* @return Mono emitting the updated investment task with asset categories set
*/
public Mono<InvestmentAssetsTask> createAssetCategories(InvestmentAssetsTask investmentTask) {
public Mono<InvestmentAssetsTask> upsertAssetCategories(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int categoryCount =
investmentData.getAssetCategories() != null ? investmentData.getAssetCategories().size() : 0;
Expand All @@ -252,7 +260,7 @@ public Mono<InvestmentAssetsTask> createAssetCategories(InvestmentAssetsTask inv
.order(assetCategory.getOrder())
.type(assetCategory.getType())
.description(assetCategory.getDescription());
return assetUniverseService.createAssetCategory(request);
return assetUniverseService.upsertAssetCategory(request);
})
.collectList()
.map(assetCategories -> {
Expand All @@ -274,7 +282,7 @@ public Mono<InvestmentAssetsTask> createAssetCategories(InvestmentAssetsTask inv
});
}

public Mono<InvestmentAssetsTask> createAssetCategoryTypes(InvestmentAssetsTask investmentTask) {
public Mono<InvestmentAssetsTask> upsertAssetCategoryTypes(InvestmentAssetsTask investmentTask) {
InvestmentAssetData investmentData = investmentTask.getData();
int typeCount =
investmentData.getAssetCategoryTypes() != null ? investmentData.getAssetCategoryTypes().size() : 0;
Expand All @@ -296,7 +304,7 @@ public Mono<InvestmentAssetsTask> createAssetCategoryTypes(InvestmentAssetsTask
AssetCategoryTypeRequest request = new AssetCategoryTypeRequest()
.name(assetCategoryType.getName())
.code(assetCategoryType.getCode());
return assetUniverseService.createAssetCategoryType(request);
return assetUniverseService.upsertAssetCategoryType(request);
})
.collectList()
.map(assetCategoryTypes -> {
Expand Down
Loading
Loading