diff --git a/CHANGELOG.md b/CHANGELOG.md index 200235e5a..99740055b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,20 @@ # Changelog All notable changes to this project will be documented in this file. +## [9.5.0] +### Added + - investment service intraday generation and ingestion function + - ingest images for investment service (asset logo, asset category image, news content image) +### Changed + - fix investment asset universe from create to upsert implementation + +### Fixed + - Added resttemplate client for investment service to fix request multipart/form-data to django server. + - Django & WebFlux don't work OOTB + +## [9.4.x] +### Changed + ## [9.3.0](https://github.com/Backbase/stream-services/compare/9.2.0...9.3.0) ### Changed - fix for NoSuchElementException (No value present) thrown while update data groups diff --git a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java b/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java index 10f79e361..ae2bd2f22 100644 --- a/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java +++ b/stream-dbs-clients/src/main/java/com/backbase/stream/clients/config/InvestmentClientConfig.java @@ -5,6 +5,7 @@ import com.backbase.investment.api.service.v1.AssetUniverseApi; import com.backbase.investment.api.service.v1.AsyncBulkGroupsApi; import com.backbase.investment.api.service.v1.ClientApi; +import com.backbase.investment.api.service.v1.ContentApi; import com.backbase.investment.api.service.v1.FinancialAdviceApi; import com.backbase.investment.api.service.v1.InvestmentApi; import com.backbase.investment.api.service.v1.InvestmentProductsApi; @@ -100,6 +101,12 @@ public InvestmentApi investmentApi(ApiClient investmentApiClient) { return new InvestmentApi(investmentApiClient); } + @Bean + @ConditionalOnMissingBean + public ContentApi contentApi(ApiClient investmentApiClient) { + return new ContentApi(investmentApiClient); + } + @Bean @ConditionalOnMissingBean public PaymentsApi paymentsApi(ApiClient investmentApiClient) { diff --git a/stream-investment/investment-core/pom.xml b/stream-investment/investment-core/pom.xml index 717714ed9..58cdef470 100644 --- a/stream-investment/investment-core/pom.xml +++ b/stream-investment/investment-core/pom.xml @@ -19,6 +19,13 @@ + + com.backbase + backbase-bom + ${backbase-bom.version} + pom + import + com.backbase.buildingblocks backbase-building-blocks-release @@ -29,6 +36,11 @@ + + com.backbase.buildingblocks + api + + com.backbase.stream @@ -83,6 +95,64 @@ + + org.apache.maven.plugins + maven-dependency-plugin + 3.6.0 + + + unpack + + unpack + + generate-resources + + + + com.backbase.investment + investment-service-api + api + zip + ${project.build.directory}/yaml + true + + + **/*.yaml, **/*.json + + + + + + com.backbase.oss + boat-maven-plugin + 0.17.66 + + + generate-investment-service-api-code + + generate-rest-template-embedded + + generate-resources + + ${project.build.directory}/yaml/investment-service-api/investment-service-api-v1*.yaml + com.backbase.investment.api.service.sync.v1 + com.backbase.investment.api.service.sync.v1.model + + Etc/GMT-12=ETC_GMT_1222 + + + false + false + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java index 40d0941f7..bcdf8e8a8 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java @@ -11,7 +11,8 @@ @ConfigurationProperties(prefix = "backbase.bootstrap.ingestions.investment") public class InvestmentIngestionConfigurationProperties { - private boolean assetUniversEnabled = true; + private boolean contentEnabled = true; + private boolean assetUniverseEnabled = true; private boolean wealthEnabled = true; private int portfolioActivationPastMonths = 1; diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java new file mode 100644 index 000000000..45d6bb906 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java @@ -0,0 +1,92 @@ +package com.backbase.stream.configuration; + +import com.backbase.investment.api.service.sync.v1.AssetUniverseApi; +import com.backbase.investment.api.service.sync.v1.ContentApi; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import jakarta.validation.constraints.Pattern; +import lombok.Setter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; +import org.springframework.web.client.RestTemplate; + +@Setter +@Configuration +@ConditionalOnBean(InvestmentServiceConfiguration.class) +@ConfigurationProperties(prefix = "backbase.investment.communication.integration") +public class InvestmentRestServiceApiConfiguration { + + private String serviceId = "investment"; + private String serviceUrl = ""; + + @Value("${backbase.communication.http.default-scheme:http}") + @Pattern(regexp = "https?") + private String scheme; + + /** + * Configuration for Investment service REST client (ClientApi). + */ + @Bean + @ConditionalOnMissingBean + public com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient( + @Qualifier("interServiceRestTemplate") RestTemplate restTemplate, + @Qualifier("restInvestmentObjectMapper") ObjectMapper restInvestmentObjectMapper) { + + MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter(); + converter.setObjectMapper(restInvestmentObjectMapper); + + restTemplate.getMessageConverters().removeIf(m -> m instanceof MappingJackson2HttpMessageConverter); + restTemplate.getMessageConverters().add(converter); + + com.backbase.investment.api.service.sync.ApiClient apiClient = new com.backbase.investment.api.service.sync.ApiClient( + restTemplate); + apiClient.setBasePath(scheme + "://" + serviceId + serviceUrl); + return apiClient; + } + + @Bean + @Qualifier("restInvestmentObjectMapper") + public ObjectMapper restInvestmentObjectMapper(ObjectMapper objectMapper) { + ObjectMapper mapper = objectMapper.copy(); + mapper.setSerializationInclusion(Include.NON_EMPTY); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + return mapper; + } + + @Bean + @ConditionalOnMissingBean + public com.backbase.investment.api.service.sync.v1.ContentApi restContentApi( + com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) { + return new com.backbase.investment.api.service.sync.v1.ContentApi(restInvestmentApiClient); + } + + @Bean + @ConditionalOnMissingBean + public com.backbase.investment.api.service.sync.v1.AssetUniverseApi restAssetUniverseApi( + com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) { + return new com.backbase.investment.api.service.sync.v1.AssetUniverseApi(restInvestmentApiClient); + } + + @Bean + public InvestmentRestNewsContentService investmentNewsContentService(ContentApi restContentApi, + com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) { + return new InvestmentRestNewsContentService(restContentApi, restInvestmentApiClient); + } + + + @Bean + public InvestmentRestAssetUniverseService investmentRestAssetUniverseService(AssetUniverseApi assetUniverseApi, + com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) { + return new InvestmentRestAssetUniverseService(assetUniverseApi, restInvestmentApiClient); + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java index db61760f0..d7756091f 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java @@ -3,6 +3,7 @@ import com.backbase.investment.api.service.ApiClient; import com.backbase.investment.api.service.v1.AllocationsApi; import com.backbase.investment.api.service.v1.AssetUniverseApi; +import com.backbase.investment.api.service.v1.AsyncBulkGroupsApi; import com.backbase.investment.api.service.v1.ClientApi; import com.backbase.investment.api.service.v1.FinancialAdviceApi; import com.backbase.investment.api.service.v1.InvestmentApi; @@ -10,16 +11,20 @@ import com.backbase.investment.api.service.v1.PaymentsApi; import com.backbase.investment.api.service.v1.PortfolioApi; import com.backbase.stream.clients.autoconfigure.DbsApiClientsAutoConfiguration; -import com.backbase.stream.investment.saga.InvestmentAssetUniversSaga; +import com.backbase.stream.investment.saga.InvestmentAssetUniverseSaga; +import com.backbase.stream.investment.saga.InvestmentContentSaga; import com.backbase.stream.investment.saga.InvestmentSaga; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.CustomIntegrationApiService; import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; import com.backbase.stream.investment.service.InvestmentClientService; +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentModelPortfolioService; import com.backbase.stream.investment.service.InvestmentPortfolioAllocationService; import com.backbase.stream.investment.service.InvestmentPortfolioService; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService; import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -58,8 +63,15 @@ public InvestmentPortfolioService investmentPortfolioService(PortfolioApi portfo @Bean public InvestmentAssetUniverseService investmentAssetUniverseService(AssetUniverseApi assetUniverseApi, + InvestmentRestAssetUniverseService investmentRestAssetUniverseService, CustomIntegrationApiService customIntegrationApiService) { - return new InvestmentAssetUniverseService(assetUniverseApi, customIntegrationApiService); + return new InvestmentAssetUniverseService(assetUniverseApi, investmentRestAssetUniverseService, + customIntegrationApiService); + } + + @Bean + public AsyncTaskService asyncTaskService(AsyncBulkGroupsApi asyncBulkGroupsApi) { + return new AsyncTaskService(asyncBulkGroupsApi); } @Bean @@ -73,6 +85,11 @@ public InvestmentAssetPriceService investmentAssetPriceService(AssetUniverseApi return new InvestmentAssetPriceService(assetUniverseApi); } + @Bean + public InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService(AssetUniverseApi assetUniverseApi) { + return new InvestmentIntradayAssetPriceService(assetUniverseApi); + } + @Bean public InvestmentPortfolioAllocationService investmentPortfolioAllocationService(AllocationsApi allocationsApi, AssetUniverseApi assetUniverseApi, InvestmentApi investmentApi, @@ -93,12 +110,20 @@ public InvestmentSaga investmentSaga(InvestmentClientService investmentClientSer } @Bean - public InvestmentAssetUniversSaga investmentStaticDataSaga( + public InvestmentAssetUniverseSaga investmentStaticDataSaga( InvestmentAssetUniverseService investmentAssetUniverseService, InvestmentAssetPriceService investmentAssetPriceService, + InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService, InvestmentIngestionConfigurationProperties coreConfigurationProperties) { - return new InvestmentAssetUniversSaga(investmentAssetUniverseService, investmentAssetPriceService, - coreConfigurationProperties); + return new InvestmentAssetUniverseSaga(investmentAssetUniverseService, investmentAssetPriceService, + investmentIntradayAssetPriceService, coreConfigurationProperties); + } + + @Bean + public InvestmentContentSaga investmentContentSaga( + InvestmentRestNewsContentService investmentRestNewsContentService, + InvestmentIngestionConfigurationProperties coreConfigurationProperties) { + return new InvestmentContentSaga(investmentRestNewsContentService, coreConfigurationProperties); } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/Asset.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/Asset.java index 0ad77576e..5d9cf597f 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/Asset.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/Asset.java @@ -1,36 +1,43 @@ package com.backbase.stream.investment; -import com.backbase.investment.api.service.v1.model.AssetCategory; import com.backbase.investment.api.service.v1.model.AssetTypeEnum; import com.backbase.investment.api.service.v1.model.StatusA10Enum; import com.fasterxml.jackson.annotation.JsonProperty; -import java.net.URI; +import java.io.File; import java.util.List; import java.util.Map; import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; /** * Lightweight projection of {@link com.backbase.investment.api.service.v1.model.Asset} that keeps the DTO immutable * while providing helpers to translate to/from the generated model. */ -public record Asset( - UUID uuid, - String name, - String isin, - String ticker, - StatusA10Enum status, - String market, - String currency, +@Setter +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class Asset implements AssetKey { + + private UUID uuid; + private String name; + private String isin; + private String ticker; + private StatusA10Enum status; + private String market; + private String currency; @JsonProperty("extra_data") - Map extraData, + private Map extraData; @JsonProperty("asset_type") - AssetTypeEnum assetType, - List categories, - URI logo, - String externalId, - String description, - Double defaultPrice -) implements AssetKey { + private AssetTypeEnum assetType; + private List categories; + private String externalId; + private File logo; + private String description; + private Double defaultPrice; @Override public String getIsin() { @@ -47,40 +54,4 @@ public String getCurrency() { return currency; } - /** - * Creates a record from the generated API model. - */ - public static Asset fromModel(com.backbase.investment.api.service.v1.model.Asset asset) { - if (asset == null) { - return null; - } - List categories = asset.getCategories(); - Map extraData = asset.getExtraData(); - return new Asset( - asset.getUuid(), - asset.getName(), - asset.getIsin(), - asset.getTicker(), - asset.getStatus(), - asset.getMarket(), - asset.getCurrency(), - extraData == null ? Map.of() : Map.copyOf(extraData), - asset.getAssetType(), - categories == null ? List.of() : categories.stream().map(AssetCategory::getCode).toList(), - asset.getLogo(), - asset.getExternalId(), - asset.getDescription(), - 100d - ); - } - - /** - * Ensures the record keeps defensive copies of mutable collections. - */ - public Asset { - Map safeExtraData = extraData == null ? Map.of() : Map.copyOf(extraData); - List safeCategories = categories == null ? List.of() : List.copyOf(categories); - extraData = safeExtraData; - categories = safeCategories; - } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java index c577f1e7e..049fc3f60 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java @@ -1,6 +1,7 @@ package com.backbase.stream.investment; import com.backbase.investment.api.service.v1.model.AssetCategory; +import com.backbase.investment.api.service.v1.model.AssetCategoryRequest; import com.backbase.investment.api.service.v1.model.AssetCategoryType; import com.backbase.investment.api.service.v1.model.GroupResult; import com.backbase.investment.api.service.v1.model.Market; @@ -23,10 +24,12 @@ public class InvestmentAssetData { private List markets; private List marketSpecialDays; private List assetCategoryTypes; - private List assetCategories; + private List assetCategories; private List assets; private List assetPrices; + private List insertedAssetCategories; private List priceAsyncTasks; + private List intradayPriceAsyncTasks; public Map getPriceByAsset() { return Objects.requireNonNullElse(assetPrices, List.of()).stream() @@ -35,7 +38,7 @@ public Map getPriceByAsset() { public Map getAssetByUuid() { return Objects.requireNonNullElse(assets, List.of()).stream() - .collect(Collectors.toMap(Asset::uuid, Function.identity())); + .collect(Collectors.toMap(Asset::getUuid, Function.identity())); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java index c4146c379..8c43a913a 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java @@ -23,7 +23,7 @@ public InvestmentAssetsTask(String unitOfWorkId, InvestmentAssetData data) { @Override public String getName() { - return "investment"; + return "investment-assets"; } public void setMarkets(List markets) { @@ -38,8 +38,8 @@ public void setAssetCategoryTypes(List assetCategoryTypes) { data.setAssetCategoryTypes(assetCategoryTypes); } - public void setAssetCategories(List assetCategories) { - data.setAssetCategories(assetCategories); + public void setInsertedAssetCategories(List assetCategories) { + data.setInsertedAssetCategories(assetCategories); } public void setAssets(List assets) { @@ -51,4 +51,8 @@ public InvestmentAssetsTask setPriceTasks(List tasks) { return this; } + public InvestmentAssetsTask setIntradayPriceTasks(List tasks) { + data.setIntradayPriceAsyncTasks(tasks); + return this; + } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java new file mode 100644 index 000000000..eb05cbfc1 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java @@ -0,0 +1,16 @@ +package com.backbase.stream.investment; + +import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdateRequest; +import java.util.List; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode +@Data +@Builder +public class InvestmentContentData { + + private List marketNews; + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java new file mode 100644 index 000000000..f6356ceae --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java @@ -0,0 +1,23 @@ +package com.backbase.stream.investment; + +import com.backbase.stream.worker.model.StreamTask; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class InvestmentContentTask extends StreamTask { + + private final InvestmentContentData data; + + public InvestmentContentTask(String unitOfWorkId, InvestmentContentData data) { + super(unitOfWorkId); + this.data = data; + } + + @Override + public String getName() { + return "investment-content"; + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java index bfb687af0..c39e5648d 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java @@ -19,7 +19,7 @@ public InvestmentTask(String unitOfWorkId, InvestmentData data) { @Override public String getName() { - return "investment"; + return "investment-portfolios-clients"; } public void data(List clients) { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/AssetWithMarketAndLatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/AssetWithMarketAndLatestPrice.java new file mode 100644 index 000000000..12fa63165 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/AssetWithMarketAndLatestPrice.java @@ -0,0 +1,40 @@ +package com.backbase.stream.investment.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.UUID; + +/** + * Representation of an asset returned by the List Assets API when the response includes expanded + * market and latest price details and custom fields. + * Example JSON: + *
+ * {
+ *   "uuid": "123e4567-e89b-12d3-a456-426614174000",
+ *   "market": {
+ *     "code": "NYSE",
+ *     "name": "New York Stock Exchange",
+ *     "is_open": true,
+ *     "today_session_starts": "2024-01-01T09:30:00+00:00Z",
+ *     "today_session_ends": "2024-01-01T16:00:00+00:00Z",
+ *     "market_reopens": null
+ *   },
+ *   "latest_price": {
+ *     "amount": 123.45,
+ *     "datetime": "2024-01-01T15:30:00+00:00Z",
+ *     "open_price": 120.00,
+ *     "high_price": 125.00,
+ *     "low_price": 119.50,
+ *     "previous_close_price": 121.00
+ *   }
+ * }
+ * 
+ * + * @param uuid Asset identifier (mapped from JSON property `uuid`) + * @param expandedMarket Expanded market details (mapped from JSON property `market`) + * @param expandedLatestPrice Expanded latest price details (mapped from JSON property `latest_price`) + */ +public record AssetWithMarketAndLatestPrice(@JsonProperty("uuid") UUID uuid, + @JsonProperty("market") ExpandedMarket expandedMarket, + @JsonProperty("latest_price") ExpandedLatestPrice expandedLatestPrice) { + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedLatestPrice.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedLatestPrice.java new file mode 100644 index 000000000..353df06e0 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedLatestPrice.java @@ -0,0 +1,30 @@ +package com.backbase.stream.investment.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.OffsetDateTime; + +/** + * Representation of an expanded latest price returned by the List Assets API when the response is + * requested with expansion of latest price details and custom fields. + * Example JSON: + *
+ * "latest_price": {
+ *   "amount": 123.45,
+ *   "datetime": "2026-01-21T12:58:00.000000Z",
+ *   "open_price": 120.00,
+ *   "high_price": 125.00,
+ *   "low_price": 119.50,
+ *   "previous_close_price": 121.00
+ * }
+ * 
+ */ +public record ExpandedLatestPrice( + @JsonProperty("amount") Double amount, + @JsonProperty("datetime") OffsetDateTime datetime, + @JsonProperty("open_price") Double openPrice, + @JsonProperty("high_price") Double highPrice, + @JsonProperty("low_price") Double lowPrice, + @JsonProperty("previous_close_price") Double previousClosePrice +) { + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedMarket.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedMarket.java new file mode 100644 index 000000000..065629f21 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/ExpandedMarket.java @@ -0,0 +1,29 @@ +package com.backbase.stream.investment.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.OffsetDateTime; + +/** + * Representation of an expanded market returned by the List Assets API when the response is + * requested with expansion of market details and custom fields. + * Example JSON: + *
+ *"market": {
+ *            "code": "XETR",
+ *            "name": "XETRA",
+ *            "is_open": true,
+ *            "today_session_starts": "2026-01-22T09:00:00.000000Z",
+ *            "today_session_ends": "2026-01-22T17:30:00.000000Z",
+ *            "market_reopens": "2026-01-23T09:00:00.000000Z"
+ *          }
+ * 
+ * + */ +public record ExpandedMarket(@JsonProperty("code") String code, + @JsonProperty("name") String name, + @JsonProperty("is_open") Boolean isOpen, + @JsonProperty("today_session_starts") OffsetDateTime todaySessionStarts, + @JsonProperty("today_session_ends") OffsetDateTime todaySessionEnds, + @JsonProperty("market_reopens") OffsetDateTime marketReopens) { + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/PaginatedExpandedAssetList.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/PaginatedExpandedAssetList.java new file mode 100644 index 000000000..29d6f3fe1 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/PaginatedExpandedAssetList.java @@ -0,0 +1,27 @@ +package com.backbase.stream.investment.model; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode +@Data +@Builder +public class PaginatedExpandedAssetList { + + public static final String JSON_PROPERTY_COUNT = "count"; + private Integer count; + + public static final String JSON_PROPERTY_NEXT = "next"; + private URI next; + + public static final String JSON_PROPERTY_PREVIOUS = "previous"; + private URI previous; + + public static final String JSON_PROPERTY_RESULTS = "results"; + private List results = new ArrayList<>(); + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java similarity index 89% rename from stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java rename to stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java index e7ab469b7..6ad33ace7 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java @@ -1,6 +1,5 @@ package com.backbase.stream.investment.saga; -import com.backbase.investment.api.service.v1.model.AssetCategoryRequest; import com.backbase.investment.api.service.v1.model.AssetCategoryTypeRequest; import com.backbase.investment.api.service.v1.model.MarketRequest; import com.backbase.investment.api.service.v1.model.MarketSpecialDayRequest; @@ -10,6 +9,7 @@ import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; import com.backbase.stream.investment.service.InvestmentClientService; +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentPortfolioService; import com.backbase.stream.worker.StreamTaskExecutor; import com.backbase.stream.worker.model.StreamTask; @@ -47,7 +47,7 @@ */ @Slf4j @RequiredArgsConstructor -public class InvestmentAssetUniversSaga implements StreamTaskExecutor { +public class InvestmentAssetUniverseSaga implements StreamTaskExecutor { public static final String INVESTMENT = "investment-client"; public static final String OP_UPSERT = "upsert"; @@ -59,32 +59,34 @@ public class InvestmentAssetUniversSaga implements StreamTaskExecutor executeTask(InvestmentAssetsTask streamTask) { - if (!coreConfigurationProperties.isAssetUniversEnabled()) { - log.warn("Skip investment asset univers saga execution: taskId={}, taskName={}", + if (!coreConfigurationProperties.isAssetUniverseEnabled()) { + log.warn("Skip investment asset universe saga execution: taskId={}, taskName={}", streamTask.getId(), streamTask.getName()); return Mono.just(streamTask); } - log.info("Starting investment saga execution: taskId={}, taskName={}", + log.info("Starting investment asset universe saga execution: taskId={}, taskName={}", streamTask.getId(), streamTask.getName()); - return createMarkets(streamTask) - .flatMap(this::createMarketSpecialDays) - .flatMap(this::createAssetCategoryTypes) - .flatMap(this::createAssetCategories) + return upsertMarkets(streamTask) + .flatMap(this::upsertMarketSpecialDays) + .flatMap(this::upsertAssetCategoryTypes) + .flatMap(this::upsertAssetCategories) .flatMap(this::createAssets) .flatMap(this::upsertPrices) + .flatMap(this::createIntradayPrices) .doOnSuccess(completedTask -> log.info( - "Successfully completed investment saga: taskId={}, taskName={}, state={}", + "Successfully completed investment asset universe saga: taskId={}, taskName={}, state={}", completedTask.getId(), completedTask.getName(), completedTask.getState())) .doOnError(throwable -> { - log.error("Failed to execute investment saga: taskId={}, taskName={}", + log.error("Failed to execute investment asset universe saga: taskId={}, taskName={}", streamTask.getId(), streamTask.getName(), throwable); streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, streamTask.getName(), streamTask.getId(), - "Investment saga failed: " + throwable.getMessage()); + "Investment asset universe saga failed: " + throwable.getMessage()); streamTask.setState(State.FAILED); }) .onErrorResume(throwable -> Mono.just(streamTask)); @@ -96,6 +98,11 @@ private Mono upsertPrices(InvestmentAssetsTask investmentT .map(investmentTask::setPriceTasks); } + private Mono createIntradayPrices(InvestmentAssetsTask investmentTask) { + return investmentIntradayAssetPriceService.ingestIntradayPrices() + .map(investmentTask::setIntradayPriceTasks); + } + /** * Rollback is not implemented for investment saga. * @@ -112,7 +119,7 @@ public Mono rollBack(InvestmentAssetsTask streamTask) { return Mono.empty(); } - public Mono createMarkets(InvestmentAssetsTask investmentTask) { + public Mono upsertMarkets(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int marketCount = investmentData.getMarkets() != null ? investmentData.getMarkets().size() : 0; log.info("Starting investment market creation: taskId={}, marketCount={}", @@ -130,7 +137,7 @@ public Mono createMarkets(InvestmentAssetsTask investmentT // Process each market: create or get from asset universe service return Flux.fromIterable(investmentData.getMarkets()) - .flatMap(market -> assetUniverseService.getOrCreateMarket( + .flatMap(market -> assetUniverseService.upsertMarket( new MarketRequest() .code(market.getCode()) .name(market.getName()) @@ -161,7 +168,7 @@ public Mono createMarkets(InvestmentAssetsTask investmentT } /** - * Creates or upserts market special days for the investment task. + * Upserts market special days for the investment task. * *

This method processes each market special day in the task data by invoking * the asset universe service. It updates the task state and logs progress for observability. @@ -169,7 +176,7 @@ public Mono createMarkets(InvestmentAssetsTask investmentT * @param investmentTask the investment task containing market special day data * @return Mono emitting the updated investment task with market special days set */ - public Mono createMarketSpecialDays(InvestmentAssetsTask investmentTask) { + public Mono upsertMarketSpecialDays(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int marketSpecialDayCount = investmentData.getMarketSpecialDays() != null ? investmentData.getMarketSpecialDays().size() : 0; @@ -188,7 +195,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i // Process each market special day: create or get from asset universe service return Flux.fromIterable(investmentData.getMarketSpecialDays()) - .flatMap(marketSpecialDay -> assetUniverseService.getOrCreateMarketSpecialDay( + .flatMap(marketSpecialDay -> assetUniverseService.upsertMarketSpecialDay( new MarketSpecialDayRequest() .date(marketSpecialDay.getDate()) .market(marketSpecialDay.getMarket()) @@ -219,7 +226,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i } /** - * Creates or upserts asset categories for the investment task. + * Upserts asset categories for the investment task. * *

This method processes each asset category in the task data by invoking * the asset universe service. It updates the task state and logs progress for observability. @@ -227,7 +234,7 @@ public Mono createMarketSpecialDays(InvestmentAssetsTask i * @param investmentTask the investment task containing asset category data * @return Mono emitting the updated investment task with asset categories set */ - public Mono createAssetCategories(InvestmentAssetsTask investmentTask) { + public Mono upsertAssetCategories(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int categoryCount = investmentData.getAssetCategories() != null ? investmentData.getAssetCategories().size() : 0; @@ -245,18 +252,10 @@ public Mono createAssetCategories(InvestmentAssetsTask inv } return Flux.fromIterable(investmentData.getAssetCategories()) - .flatMap(assetCategory -> { - AssetCategoryRequest request = new AssetCategoryRequest() - .name(assetCategory.getName()) - .code(assetCategory.getCode()) - .order(assetCategory.getOrder()) - .type(assetCategory.getType()) - .description(assetCategory.getDescription()); - return assetUniverseService.createAssetCategory(request); - }) + .flatMap(assetUniverseService::upsertAssetCategory) .collectList() .map(assetCategories -> { - investmentTask.setAssetCategories(assetCategories); + investmentTask.setInsertedAssetCategories(assetCategories); investmentTask.info(INVESTMENT, OP_CREATE, RESULT_CREATED, investmentTask.getName(), investmentTask.getId(), RESULT_CREATED + " " + assetCategories.size() + " Investment Asset Categories"); @@ -274,7 +273,7 @@ public Mono createAssetCategories(InvestmentAssetsTask inv }); } - public Mono createAssetCategoryTypes(InvestmentAssetsTask investmentTask) { + public Mono upsertAssetCategoryTypes(InvestmentAssetsTask investmentTask) { InvestmentAssetData investmentData = investmentTask.getData(); int typeCount = investmentData.getAssetCategoryTypes() != null ? investmentData.getAssetCategoryTypes().size() : 0; @@ -296,7 +295,7 @@ public Mono createAssetCategoryTypes(InvestmentAssetsTask AssetCategoryTypeRequest request = new AssetCategoryTypeRequest() .name(assetCategoryType.getName()) .code(assetCategoryType.getCode()); - return assetUniverseService.createAssetCategoryType(request); + return assetUniverseService.upsertAssetCategoryType(request); }) .collectList() .map(assetCategoryTypes -> { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java new file mode 100644 index 000000000..4021c9a7d --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentContentSaga.java @@ -0,0 +1,101 @@ +package com.backbase.stream.investment.saga; + +import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; +import com.backbase.stream.investment.InvestmentContentTask; +import com.backbase.stream.investment.service.InvestmentClientService; +import com.backbase.stream.investment.service.InvestmentPortfolioService; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService; +import com.backbase.stream.worker.StreamTaskExecutor; +import com.backbase.stream.worker.model.StreamTask; +import com.backbase.stream.worker.model.StreamTask.State; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +/** + * Saga orchestrating the complete investment client ingestion workflow. + * + *

This saga implements a multi-step process for ingesting investment data: + *

    + *
  1. Upsert investment clients - Creates or updates client records
  2. + *
  3. Upsert investment products - Creates or updates portfolio products
  4. + *
  5. Upsert investment portfolios - Creates or updates portfolios with client associations
  6. + *
+ * + *

The saga uses idempotent operations to ensure safe re-execution and writes progress + * to the {@link StreamTask} history for observability. Each step builds upon the previous + * step's results, creating a complete investment setup. + * + *

Design notes: + *

    + *
  • All operations are idempotent (safe to retry)
  • + *
  • Progress is tracked via StreamTask state and history
  • + *
  • Failures are logged with complete context for debugging
  • + *
  • All reactive operations include proper success and error handlers
  • + *
+ * + * @see InvestmentClientService + * @see InvestmentPortfolioService + * @see StreamTaskExecutor + */ +@Slf4j +@RequiredArgsConstructor +public class InvestmentContentSaga implements StreamTaskExecutor { + + public static final String INVESTMENT = "investment-content"; + public static final String OP_UPSERT = "upsert"; + public static final String RESULT_FAILED = "failed"; + + private final InvestmentRestNewsContentService investmentRestNewsContentService; + private final InvestmentIngestionConfigurationProperties coreConfigurationProperties; + + @Override + public Mono executeTask(InvestmentContentTask streamTask) { + if (!coreConfigurationProperties.isContentEnabled()) { + log.warn("Skip investment content saga execution: taskId={}, taskName={}", + streamTask.getId(), streamTask.getName()); + return Mono.just(streamTask); + } + log.info("Starting investment content saga execution: taskId={}, taskName={}", + streamTask.getId(), streamTask.getName()); + log.info("Starting investment saga execution: taskId={}, taskName={}", + streamTask.getId(), streamTask.getName()); + return upsertNewsContent(streamTask) + .doOnSuccess(completedTask -> log.info( + "Successfully completed investment saga: taskId={}, taskName={}, state={}", + completedTask.getId(), completedTask.getName(), completedTask.getState())) + .doOnError(throwable -> { + log.error("Failed to execute investment saga: taskId={}, taskName={}", + streamTask.getId(), streamTask.getName(), throwable); + streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, + streamTask.getName(), streamTask.getId(), + "Investment saga failed: " + throwable.getMessage()); + streamTask.setState(State.FAILED); + }) + .onErrorResume(throwable -> Mono.just(streamTask)); + } + + private Mono upsertNewsContent(InvestmentContentTask investmentContentTask) { + return investmentRestNewsContentService.upsertContent(investmentContentTask.getData().getMarketNews()) + .thenReturn(investmentContentTask); + } + + + /** + * Rollback is not implemented for investment saga. + * + *

Investment operations are idempotent and designed to be retried safely. + * Manual cleanup should be performed if necessary through the Investment Service API. + * + * @param streamTask the task to rollback + * @return null - rollback not implemented + */ + @Override + public Mono rollBack(InvestmentContentTask streamTask) { + log.warn("Rollback requested for investment saga but not implemented: taskId={}, taskName={}", + streamTask.getId(), streamTask.getName()); + return Mono.empty(); + } + +} + diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java index 5734b8143..807e77c6e 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AssetMapper.java @@ -21,6 +21,7 @@ public interface AssetMapper { OASAssetRequestDataRequest map(Asset asset, Map categoryIdByCode); @Mapping(target = "categories", source = "categories", qualifiedByName = "mapCategories") + @Mapping(target = "logo", ignore = true) Asset map(com.backbase.investment.api.service.v1.model.Asset asset); @AfterMapping @@ -29,7 +30,7 @@ default void postMap(@MappingTarget OASAssetRequestDataRequest requestDataReques if (requestDataRequest == null) { return; } - requestDataRequest.setCategories(Objects.requireNonNullElse(asset.categories(), new ArrayList()) + requestDataRequest.setCategories(Objects.requireNonNullElse(asset.getCategories(), new ArrayList()) .stream().filter(Objects::nonNull).map(categoryIdByCode::get) .filter(Objects::nonNull).toList()); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java index 327409408..32c3c7cba 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/AsyncTaskService.java @@ -11,7 +11,6 @@ import reactor.core.publisher.Mono; @Slf4j -@Service @RequiredArgsConstructor public class AsyncTaskService { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java index 10a124b52..6972625d5 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetPriceService.java @@ -54,9 +54,9 @@ private Mono>> generatePrices(List assets, Map priceByAsset, .orElse(defaultRandomParam); double price = Optional.ofNullable(lastPrice) .or(() -> Optional.ofNullable(configAssetPrice).map(AssetPrice::price) - .or(() -> Optional.of(a).map(Asset::defaultPrice))) + .or(() -> Optional.of(a).map(Asset::getDefaultPrice))) .orElse(DEFAULT_START_PRICE); return new RandomPriceParam(price, randomParam); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java index e9f7d1215..eddaa88e4 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java @@ -12,6 +12,8 @@ import com.backbase.investment.api.service.v1.model.MarketSpecialDayRequest; import com.backbase.investment.api.service.v1.model.OASAssetRequestDataRequest; import com.backbase.investment.api.service.v1.model.PaginatedAssetCategoryList; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import java.io.File; import java.io.IOException; import java.time.LocalDate; import java.util.List; @@ -33,6 +35,7 @@ public class InvestmentAssetUniverseService { private final AssetUniverseApi assetUniverseApi; + private final InvestmentRestAssetUniverseService investmentRestAssetUniverseService; private final CustomIntegrationApiService customIntegrationApiService; private final AssetMapper assetMapper = Mappers.getMapper(AssetMapper.class); @@ -43,7 +46,7 @@ public class InvestmentAssetUniverseService { * @param marketRequest the market request details * @return Mono representing the existing or newly created market */ - public Mono getOrCreateMarket(MarketRequest marketRequest) { + public Mono upsertMarket(MarketRequest marketRequest) { log.debug("Creating market: {}", marketRequest); return assetUniverseApi.getMarket(marketRequest.getCode()) // If getMarket returns 404 NOT_FOUND, treat as "not found" and return Mono.empty() @@ -59,7 +62,17 @@ public Mono getOrCreateMarket(MarketRequest marketRequest) { .flatMap(existingMarket -> { log.info("Market already exists: {}", existingMarket.getCode()); log.debug("Market already exists: {}", existingMarket); - return Mono.just(existingMarket); + return assetUniverseApi.updateMarket(existingMarket.getCode(), marketRequest) + .doOnSuccess(updatedMarket -> log.info("Updated market: {}", updatedMarket)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating market: {} : HTTP {} -> {}", marketRequest.getCode(), + w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating market: {} : {}", marketRequest.getCode(), + error.getMessage(), error); + } + }); }) // If Mono is empty (market not found), create the market .switchIfEmpty(assetUniverseApi.createMarket(marketRequest) @@ -69,14 +82,15 @@ public Mono getOrCreateMarket(MarketRequest marketRequest) { } /** - * Gets an existing asset by its identifier, or creates it if not found (404). Handles 404 NOT_FOUND from getAsset - * by returning Mono.empty(), which triggers asset creation via switchIfEmpty. + * Gets an existing asset by its identifier, or creates it if not found (404). Handles 404 NOT_FOUND from + * getAsset by returning Mono.empty(), which triggers asset creation via switchIfEmpty. * * @param assetRequest the asset request details + * @param logo the thumbnail image * @return Mono representing the existing or newly created asset * @throws IOException if an I/O error occurs */ - public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest) { + public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest, File logo) { log.debug("Creating asset: {}", assetRequest); // Build a unique asset identifier using ISIN, market, and currency @@ -95,12 +109,16 @@ public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest) { return Mono.error(error); }) // If asset exists, log and return it - .flatMap(existingAsset -> { + .map(existingAsset -> { log.info("Asset already exists with Asset Identifier : {}", assetIdentifier); - return Mono.just(existingAsset); + return existingAsset; }) + .flatMap(a -> investmentRestAssetUniverseService.setAssetLogo(a, logo) + .thenReturn(a)) // If Mono is empty (asset not found), create the asset .switchIfEmpty(customIntegrationApiService.createAsset(assetRequest) + .flatMap(a -> investmentRestAssetUniverseService.setAssetLogo(a, logo) + .thenReturn(a)) .doOnSuccess(createdAsset -> log.info("Created asset with assetIdentifier: {}", assetIdentifier)) .doOnError(error -> { if (error instanceof WebClientResponseException w) { @@ -115,13 +133,13 @@ public Mono getOrCreateAsset(OASAssetRequestDataRequest assetRequest) { } /** - * Gets an existing market special day by date and market, or creates it if not found. Handles 404 or empty results - * by creating the market special day. + * Gets an existing market special day by date and market, or creates it if not found. Handles 404 or empty + * results by creating the market special day. * * @param marketSpecialDayRequest the request containing market and date details * @return Mono\ representing the existing or newly created market special day */ - public Mono getOrCreateMarketSpecialDay(MarketSpecialDayRequest marketSpecialDayRequest) { + public Mono upsertMarketSpecialDay(MarketSpecialDayRequest marketSpecialDayRequest) { log.debug("Creating market special day: {}", marketSpecialDayRequest); LocalDate date = marketSpecialDayRequest.getDate(); @@ -141,7 +159,20 @@ public Mono getOrCreateMarketSpecialDay(MarketSpecialDayReques .findFirst(); if (matchingSpecialDay.isPresent()) { log.info("Market special day already exists for day: {}", marketSpecialDayRequest); - return Mono.just(matchingSpecialDay.get()); + return assetUniverseApi.updateMarketSpecialDay(matchingSpecialDay.get().getUuid().toString(), + marketSpecialDayRequest) + .doOnSuccess(updatedMarketSpecialDay -> + log.info("Updated market special day: {}", updatedMarketSpecialDay)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating market special day : {} : HTTP {} -> {}", + marketSpecialDayRequest, + w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating market special day {} : {}", marketSpecialDayRequest, + error.getMessage(), error); + } + }); } else { log.debug("No market special day exists for day: {}", marketSpecialDayRequest); return Mono.empty(); @@ -179,26 +210,28 @@ public Flux createAssets(List { Map categoryIdByCode = categories.stream() .collect(Collectors.toMap(AssetCategory::getCode, AssetCategory::getUuid)); - return Flux.fromIterable(assets) .flatMap(asset -> { OASAssetRequestDataRequest assetRequest = assetMapper.map(asset, categoryIdByCode); - return this.getOrCreateAsset(assetRequest).map(assetMapper::map); + return this.getOrCreateAsset(assetRequest, asset.getLogo()).map(assetMapper::map); }); }); } /** - * Gets an existing asset category by its code, or creates it if not found. Handles empty results by creating the - * asset category. + * Gets an existing asset category by its code, or creates it if not found. Handles empty results by creating + * the asset category. * * @param assetCategoryRequest the request containing asset category details * @return Mono representing the existing or newly created asset category */ - public Mono createAssetCategory(AssetCategoryRequest assetCategoryRequest) { + public Mono upsertAssetCategory(AssetCategoryRequest assetCategoryRequest) { if (assetCategoryRequest == null) { return Mono.empty(); } + File logo = assetCategoryRequest.getImage(); + // Post request cannot insert file directly, so set to null for the initial creation call + assetCategoryRequest.setImage(null); return assetUniverseApi.listAssetCategories(assetCategoryRequest.getCode(), 100, assetCategoryRequest.getName(), 0, assetCategoryRequest.getOrder(), assetCategoryRequest.getType()) .flatMap(paginatedAssetCategoryList -> { @@ -212,7 +245,18 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor .findFirst(); if (matchingCategory.isPresent()) { log.info("Asset category already exists for code: {}", assetCategoryRequest.getCode()); - return Mono.just(matchingCategory.get()); + return assetUniverseApi.updateAssetCategory(matchingCategory.get().getUuid().toString(), assetCategoryRequest) + .doOnSuccess(updatedCategory -> log.info("Updated asset category: {}", updatedCategory)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating asset category: {} : HTTP {} -> {}", assetCategoryRequest.getCode(), + w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating asset category: {} : {}", assetCategoryRequest.getCode(), + error.getMessage(), error); + } + }) + .onErrorResume(e -> Mono.empty()); } else { log.debug("No asset category exists for code: {}", assetCategoryRequest.getCode()); return Mono.empty(); @@ -232,7 +276,11 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor error.getMessage(), error); } }) - ); + ) + .flatMap(ac -> investmentRestAssetUniverseService.setAssetCategoryLogo(ac.getUuid(), logo) + .thenReturn(ac) + ) + .onErrorResume(e -> Mono.empty()); } /** @@ -242,7 +290,7 @@ public Mono createAssetCategory(AssetCategoryRequest assetCategor * @param assetCategoryTypeRequest the request containing asset category type details * @return Mono representing the existing or newly created asset category type */ - public Mono createAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest) { + public Mono upsertAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest) { if (assetCategoryTypeRequest == null) { return Mono.empty(); } @@ -259,7 +307,18 @@ public Mono createAssetCategoryType(AssetCategoryTypeRequest .findFirst(); if (matchingType.isPresent()) { log.info("Asset category type already exists for code: {}", assetCategoryTypeRequest.getCode()); - return Mono.just(matchingType.get()); + return assetUniverseApi.updateAssetCategoryType(matchingType.get().getUuid().toString(), assetCategoryTypeRequest) + .doOnSuccess(updatedType -> log.info("Updated asset category type: {}", updatedType)) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error("Error updating asset category type: {} : HTTP {} -> {}", + assetCategoryTypeRequest.getCode(), w.getStatusCode(), w.getResponseBodyAsString()); + } else { + log.error("Error updating asset category type: {} : {}", + assetCategoryTypeRequest.getCode(), error.getMessage(), error); + } + }) + .onErrorResume(e -> Mono.empty()); } else { log.debug("No asset category type exists for code: {}", assetCategoryTypeRequest.getCode()); return Mono.empty(); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java new file mode 100644 index 000000000..51be1511d --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceService.java @@ -0,0 +1,253 @@ +package com.backbase.stream.investment.service; + +import com.backbase.investment.api.service.v1.AssetUniverseApi; +import com.backbase.investment.api.service.v1.model.GroupResult; +import com.backbase.investment.api.service.v1.model.OASCreatePriceRequest; +import com.backbase.investment.api.service.v1.model.TypeEnum; +import com.backbase.stream.investment.model.AssetWithMarketAndLatestPrice; +import com.backbase.stream.investment.model.PaginatedExpandedAssetList; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Service responsible for generating and ingesting intraday asset prices. + * + *

Responsibilities: + * - Read assets with latest prices via {@link AssetUniverseApi}. + * - Generate a series of intraday OHLC price points per asset. + * - Submit intraday prices back to the Asset API using bulk create. + * + *

Notes: + * - The generator uses a randomised model constrained to realistic percentage ranges. + * - The service is reactive and non-blocking: ingestion returns a {@link Mono} that completes + * after all async submissions are triggered. + * + */ +@Slf4j +@RequiredArgsConstructor +public class InvestmentIntradayAssetPriceService { + + private final AssetUniverseApi assetUniverseApi; + + /** + * Generates and triggers ingestion of intraday prices for all assets that have a latest price. + * + * @return a {@link Mono} that emits the combined list of results for batch operations when available. + */ + public Mono> ingestIntradayPrices() { + return generateIntradayPrices() + .map(asyncTasks -> asyncTasks.stream().flatMap(Collection::stream).toList()); + } + + /** + * Internal reactive pipeline that fetches assets and creates intraday price creation tasks. + * + * @return a {@link Mono} emitting a list of lists of created {@link GroupResult}s (one list per asset request). + */ + @Nonnull + private Mono>> generateIntradayPrices() { + log.info("Generating Intraday Prices for Assets"); + return assetUniverseApi.listAssetsWithResponseSpec( + null, null, null, null, + List.of("market","latest_price"), + null, null, + "uuid,market,latest_price", + null, null, null, null, null, + null, null, null, null + ) // Above API returns custom projection with market and latest price expanded only, hence needs a custom return type. + .bodyToMono(PaginatedExpandedAssetList.class) + .flatMap(paginatedAssetList -> { + + if (paginatedAssetList.getCount() == 0) { + log.warn("No assets found with latest prices to generate intraday prices"); + return Mono.just(List.>of()); + } + + return Flux.fromIterable(paginatedAssetList.getResults()) + .flatMap(assetWithMarketAndLatestPrice -> { + List requests = + generateIntradayPricesForAsset(assetWithMarketAndLatestPrice); + + log.debug("Generated intraday price requests: {}", requests); + + if (requests.isEmpty()) { + return Mono.empty(); + } + + return assetUniverseApi.bulkCreateIntradayAssetPrice(requests, null, null, null) + .collectList() + .doOnSuccess(created -> + log.info( + "Successfully triggered creation of {} intraday prices for asset ({})", + requests.size(), + assetWithMarketAndLatestPrice.uuid() + ) + ) + .doOnError(WebClientResponseException.class, ex -> + log.error( + "Failed to create intraday prices for asset ({}): status={}, body={}", + assetWithMarketAndLatestPrice.uuid(), + ex.getStatusCode(), + ex.getResponseBodyAsString(), + ex + ) + ) + .onErrorResume(e -> Mono.empty()); + }) + .collectList(); + }) + .doOnError(error -> { + if (error instanceof WebClientResponseException w) { + log.error( + "Error generating intraday prices for assets: HTTP {} -> {}", + w.getStatusCode(), + w.getResponseBodyAsString() + ); + } else { + log.error( + "Error generating intraday prices for assets: {}", + error.getMessage(), + error + ); + } + }); + + } + + /** + * Generate a list of intraday price create requests for a single asset. + * + *

Each request represents a 15-minute / 15-candle sequence. + * + * @param assetWithMarketAndLatestPrice the asset data with latest price information + * @return a list of {@link OASCreatePriceRequest} ready to be submitted + */ + private List generateIntradayPricesForAsset( + AssetWithMarketAndLatestPrice assetWithMarketAndLatestPrice) { + List requests = new ArrayList<>(); + + // Base previous close + Double previousClose = assetWithMarketAndLatestPrice.expandedLatestPrice().previousClosePrice(); + + // Today + OffsetDateTime todaySessionStarts = assetWithMarketAndLatestPrice.expandedMarket().todaySessionStarts(); + LocalDate today = todaySessionStarts.toLocalDate(); + LocalTime time = intradayStartTime(todaySessionStarts); + + for (int i = 0; i < 15; i++) { + // Generate intraday OHLC + Ohlc ohlc = generateIntradayOhlc(previousClose); + + // Create request + requests.add(createIntradayRequest(assetWithMarketAndLatestPrice, ohlc, previousClose, + OffsetDateTime.of(today, time, ZoneOffset.UTC))); + + // Next candle starts from this close + previousClose = ohlc.close(); + + // Move time forward by 15 minutes + time = time.plusMinutes(15); + } + return requests; + } + + private LocalTime intradayStartTime(OffsetDateTime offsetDateTime) { + return offsetDateTime.toLocalTime().plusMinutes(ThreadLocalRandom.current().nextInt(1, 15)); + } + + private OASCreatePriceRequest createIntradayRequest(AssetWithMarketAndLatestPrice asset, Ohlc ohlc, Double previousClose, + OffsetDateTime dateTime) { + + return new OASCreatePriceRequest() + .amount(ohlc.close()) + .asset(Map.of("uuid", asset.uuid().toString())) + .datetime(dateTime) + .open(ohlc.open()) + .high(ohlc.high()) + .low(ohlc.low()) + .previousClose(previousClose) + .type(TypeEnum.INTRADAY); + } + + /** + * Deterministic randomised OHLC generator based on a previous close. + * + *

Algorithm constraints: + * - Total intraday range: 2–5% (implemented as 4–9 per mille in code). + * - Opening gap: ±0.1–0.3%. + * - Candle body: 0.2–1.2%. + * - Wicks are larger than body and distributed to respect direction. + * + * @param previousClose the prior close price (must be positive) + * @return a map with keys \"open\", \"high\", \"low\", \"close\" rounded to 6 decimal places + * @throws IllegalArgumentException if previousClose is null or not positive + */ + public static Ohlc generateIntradayOhlc(Double previousClose) { + if (previousClose == null || previousClose <= 0) { + throw new IllegalArgumentException("Previous close must be positive"); + } + + ThreadLocalRandom r = ThreadLocalRandom.current(); + + // Total intraday range: 2–5% + double totalRangePct = r.nextDouble(4.0, 9.01) / 100.0; + + // Small opening gap: ±0.1–0.3% + double openGapPct = r.nextDouble(-0.3, 0.31) / 100.0; + double open = previousClose * (1 + openGapPct); + + // Direction + boolean bullish = r.nextBoolean(); + + // Candle body: 0.2–1.2% + double bodyPct = r.nextDouble(0.2, 1.21) / 100.0; + double close = bullish + ? open * (1 + bodyPct) + : open * (1 - bodyPct); + + // Wicks larger than body + double wickBudget = Math.max(totalRangePct - bodyPct, totalRangePct * 0.6); + + double upperWickPct; + double lowerWickPct; + + if (bullish) { + upperWickPct = wickBudget * r.nextDouble(0.6, 0.9); + lowerWickPct = wickBudget - upperWickPct; + } else { + lowerWickPct = wickBudget * r.nextDouble(0.6, 0.9); + upperWickPct = wickBudget - lowerWickPct; + } + + double high = Math.max(open, close) * (1 + upperWickPct); + double low = Math.min(open, close) * (1 - lowerWickPct); + + return new Ohlc(round6(open), round6(high), round6(low), round6(close)); + } + + private static double round6(double value) { + return BigDecimal + .valueOf(value) + .setScale(6, RoundingMode.HALF_UP) + .doubleValue(); + } + + public record Ohlc(double open, double high, double low, double close) {} + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java index 08ab51c2f..7aa13305b 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java @@ -260,7 +260,7 @@ private Mono getPortfolioModel(PortfolioList portfolio, List new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.currency()), 0.2)).toList()) + .map(a -> new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.getCurrency()), 0.2)).toList()) .build())); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java new file mode 100644 index 000000000..112c4eecf --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java @@ -0,0 +1,120 @@ +package com.backbase.stream.investment.service.resttemplate; + +import com.backbase.investment.api.service.sync.ApiClient; +import com.backbase.investment.api.service.sync.v1.AssetUniverseApi; +import com.backbase.investment.api.service.sync.v1.model.AssetCategory; +import com.backbase.investment.api.service.v1.model.Asset; +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.HttpClientErrorException; +import reactor.core.publisher.Mono; + +@Slf4j +@RequiredArgsConstructor +public class InvestmentRestAssetUniverseService { + + private final AssetUniverseApi assetUniverseApi; + private final ApiClient apiClient; + + public Mono setAssetLogo(Asset asset, File logo) { + String assetUuid = asset.getUuid().toString(); + + if (logo == null) { + log.debug("Skipping logo attachment: assetUuid={}", assetUuid); + return Mono.just(asset); + } + + log.info( + "Starting logo attachment for asset: assetUuid={}, assetName='{}', logoFile='{}', logoSize={}", + assetUuid, asset.getName(), logo.getName(), logo.length()); + + return Mono.defer(() -> Mono.just(assetUniverseApi.patchAsset(assetUuid, null, logo))).map(patchedAsset -> { + log.info( + "Logo attached successfully to asset:assetUuid={}, assetName='{}', logoFile='{}'", assetUuid, + asset.getName(), logo.getName()); + return asset; + }).onErrorResume(throwable -> { + log.error( + "Logo attachment failed for asset:assetUuid={}, assetName='{}', logoFile='{}', errorType={}, errorMessage={}", + assetUuid, asset.getName(), logo.getName(), throwable.getClass().getSimpleName(), + throwable.getMessage(), throwable); + log.warn("Asset processing continuing without logo:assetUuid={}", assetUuid); + return Mono.just(asset); + }); + } + + public Mono setAssetCategoryLogo(UUID assetCategoryId, File logo) { + String assetCategoryUuid = assetCategoryId.toString(); + + if (logo == null) { + log.debug("Skipping logo attachment: operation=setLogo, assetCategoryUuid={}, reason=noLogoProvided", + assetCategoryUuid); + return Mono.empty(); + } + + log.info( + "Starting logo attachment for asset category: operation=setLogo, assetCategoryUuid={}, logoFile='{}', logoSize={}, action=start", + assetCategoryUuid, logo.getName(), logo.length()); + + return Mono.defer(() -> { + // verify the required parameter 'uuid' is set + if (assetCategoryUuid == null) { + throw new HttpClientErrorException(HttpStatus.BAD_REQUEST, + "Missing the required parameter 'uuid' when calling partialUpdateAssetCategory"); + } + + // create path and map variables + final Map uriVariables = new HashMap(); + uriVariables.put("uuid", assetCategoryUuid); + + final MultiValueMap localVarQueryParams = new LinkedMultiValueMap(); + final HttpHeaders localVarHeaderParams = new HttpHeaders(); + final MultiValueMap localVarCookieParams = new LinkedMultiValueMap(); + final MultiValueMap localVarFormParams = new LinkedMultiValueMap(); + + localVarFormParams.add("image", new FileSystemResource(logo)); + + final String[] localVarAccepts = {"application/json"}; + final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + final String[] localVarContentTypes = {"multipart/form-data"}; + final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[]{}; + + ParameterizedTypeReference localReturnType = new ParameterizedTypeReference() { + }; + AssetCategory assetCategory = apiClient.invokeAPI("/service-api/v2/asset/asset-categories/{uuid}/", + HttpMethod.PATCH, uriVariables, localVarQueryParams, null, localVarHeaderParams, + localVarCookieParams, localVarFormParams, localVarAccept, localVarContentType, localVarAuthNames, + localReturnType).getBody(); + return Mono.justOrEmpty(assetCategory.getUuid()); + }).map(patchedAssetCategoryUuId -> { + log.info( + "Logo attached successfully to asset category: assetCategoryUuid={}, logoFile='{}'", + patchedAssetCategoryUuId, logo.getName()); + return patchedAssetCategoryUuId; + }).onErrorResume(throwable -> { + log.error( + "Logo attachment failed for asset category: assetCategoryUuid={}, logoFile='{}', errorType={}, errorMessage={}", + assetCategoryId, logo.getName(), throwable.getClass().getSimpleName(), throwable.getMessage(), + throwable); + log.warn( + "Asset processing continuing without logo: assetCategoryUuid={}", assetCategoryId); + return Mono.just(assetCategoryId); + }); + } + +} \ No newline at end of file diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java new file mode 100644 index 000000000..c1a732a46 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestNewsContentService.java @@ -0,0 +1,196 @@ +package com.backbase.stream.investment.service.resttemplate; + +import com.backbase.investment.api.service.sync.ApiClient; +import com.backbase.investment.api.service.sync.v1.ContentApi; +import com.backbase.investment.api.service.sync.v1.model.Entry; +import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdate; +import com.backbase.investment.api.service.sync.v1.model.EntryCreateUpdateRequest; +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Design notes. (see CODING_RULES_COPILOT.md) + *

    + *
  • No direct manipulation of generated API classes beyond construction & mapping
  • + *
  • Side-effecting operations are logged at info (create) or debug (patch) levels
  • + *
  • Exceptions from the underlying WebClient are propagated (caller decides retry strategy)
  • + *
  • All reactive operations include proper success and error handlers for observability
  • + *
+ */ +@Slf4j +@RequiredArgsConstructor +public class InvestmentRestNewsContentService { + + public static final int CONTENT_RETRIEVE_LIMIT = 100; + private final ContentApi contentApi; + private final ApiClient apiClient; + + /** + * Upserts a list of content entries. For each entry, checks if content with the same title exists. If exists, + * updates it; otherwise creates a new entry. Continues processing remaining entries even if individual entries + * fail. + * + * @param contentEntries List of content entries to upsert + * @return Mono that completes when all entries have been processed + */ + public Mono upsertContent(List contentEntries) { + log.info("Starting content upsert batch operation:, totalEntries={}", contentEntries.size()); + log.debug("Content upsert batch details: entries={}", contentEntries); + + return findEntriesNewContent(contentEntries).flatMap(this::upsertSingleEntry).doOnComplete( + () -> log.info("Content upsert batch completed successfully: totalEntriesProcessed={}", + contentEntries.size())).doOnError( + error -> log.error("Content upsert batch failed: totalEntries={}, errorType={}, errorMessage={}", + contentEntries.size(), error.getClass().getSimpleName(), error.getMessage(), error)).then(); + } + + /** + * Upserts a single content entry. Checks if an entry with the same title exists, and either updates the existing + * entry or creates a new one. Errors are logged and swallowed to allow processing of remaining entries. + * + * @param request The content entry to upsert + * @return Mono that completes when the entry has been processed + */ + private Mono upsertSingleEntry(EntryCreateUpdateRequest request) { + log.debug("Processing content entry: title='{}', hasThumbnail={}", request.getTitle(), + request.getThumbnail() != null); + + return createNewEntry(request) + .doOnSuccess( + result -> log.info("Content entry processed successfully: title='{}', uuid={}", request.getTitle(), + result.getUuid()) + ).doOnError(throwable -> { + if (throwable instanceof WebClientResponseException ex) { + log.error( + "Content entry processing failed with API error: title='{}', httpStatus={}, errorResponse={}", + request.getTitle(), ex.getStatusCode(), ex.getResponseBodyAsString(), ex); + } else { + log.error( + "Content entry processing failed with unexpected error: title='{}', errorType={}, errorMessage={}", + request.getTitle(), throwable.getClass().getSimpleName(), throwable.getMessage(), throwable); + } + }) + .onErrorResume(error -> { + log.warn("Skipping failed content entry in batch: title='{}', decision=skip, reason={}", + request.getTitle(), + error.getMessage()); + return Mono.empty(); + }); + } + + private Flux findEntriesNewContent(List contentEntries) { + Map entryByTitle = contentEntries.stream() + .collect(Collectors.toMap(EntryCreateUpdateRequest::getTitle, Function.identity())); + log.debug("Filtering content entries: requestedTitles={}", entryByTitle.keySet()); + + List existsNews = contentApi.listContentEntries(null, CONTENT_RETRIEVE_LIMIT, 0, null, null, null, null) + .getResults().stream().filter(Objects::nonNull).toList(); + + if (existsNews.isEmpty()) { + log.info("No existing content found in system:requestedEntries={}, existingEntries=0, newEntries={}", + entryByTitle.size(), entryByTitle.size()); + return Flux.fromIterable(entryByTitle.values()); + } + + Set existTitles = existsNews.stream().map(Entry::getTitle).collect(Collectors.toSet()); + List newEntries = contentEntries.stream() + .filter(c -> existTitles.stream().noneMatch(e -> c.getTitle().contains(e))).toList(); + + log.info( + "Content filtering completed: requestedEntries={}, existingEntriesFound={}, newEntriesToCreate={}, duplicatesSkipped={}", + entryByTitle.size(), existsNews.size(), newEntries.size(), entryByTitle.size() - newEntries.size()); + log.debug("Filtered new content titles: newTitles={}", + newEntries.stream().map(EntryCreateUpdateRequest::getTitle).collect(Collectors.toList())); + + return Flux.fromIterable(newEntries); + } + + /** + * Creates a new content entry. + * + * @param request The content data for the new entry + * @return Mono containing the created entry + */ + private Mono createNewEntry(EntryCreateUpdateRequest request) { + log.debug("Creating new content entry: title='{}', hasThumbnail={}", request.getTitle(), + request.getThumbnail() != null); + File thumbnail = request.getThumbnail(); + request.setThumbnail(null); + return Mono.defer(() -> Mono.just(contentApi.createContentEntry(request))) + .flatMap(e -> addThumbnail(e, thumbnail)) + .doOnSuccess( + created -> log.info("Content entry created successfully: title='{}', uuid={}, thumbnailAttached={}", + request.getTitle(), created.getUuid(), thumbnail != null)) + .doOnError( + error -> log.error("Content entry creation failed: title='{}', errorType={}, errorMessage={}", + request.getTitle(), error.getClass().getSimpleName(), error.getMessage(), error)) + .onErrorResume(error -> Mono.empty()); + } + + private Mono addThumbnail(EntryCreateUpdate entry, File thumbnail) { + UUID uuid = entry.getUuid(); + + if (thumbnail == null) { + log.debug("Skipping thumbnail attachment: uuid={}", uuid); + return Mono.just(entry); + } + + log.debug("Attaching thumbnail to content entry: uuid={}, thumbnailFile='{}', thumbnailSize={}", uuid, + thumbnail.getName(), thumbnail.length()); + + return Mono.defer(() -> { + // create path and map variables + Map uriVariables = new HashMap<>(); + uriVariables.put("uuid", uuid); + + MultiValueMap localVarQueryParams = new LinkedMultiValueMap<>(); + HttpHeaders localVarHeaderParams = new HttpHeaders(); + MultiValueMap localVarCookieParams = new LinkedMultiValueMap<>(); + MultiValueMap localVarFormParams = new LinkedMultiValueMap<>(); + + FileSystemResource value = new FileSystemResource(thumbnail); + localVarFormParams.add("thumbnail", value); + + final String[] localVarAccepts = {"application/json"}; + final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + final String[] localVarContentTypes = {"multipart/form-data"}; + final MediaType localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[]{}; + + ParameterizedTypeReference localReturnType = new ParameterizedTypeReference<>() { + }; + apiClient.invokeAPI("/service-api/v2/content/entries/{uuid}/", HttpMethod.PATCH, uriVariables, + localVarQueryParams, null, localVarHeaderParams, localVarCookieParams, localVarFormParams, + localVarAccept, localVarContentType, localVarAuthNames, localReturnType); + + log.info("Thumbnail attached successfully: uuid={}, thumbnailFile='{}'", uuid, thumbnail.getName()); + return Mono.just(entry); + }).doOnError(error -> log.error( + "Thumbnail attachment failed: uuid={}, thumbnailFile='{}', errorType={}, errorMessage={}", uuid, + thumbnail.getName(), error.getClass().getSimpleName(), error.getMessage(), error)).onErrorResume(error -> { + log.warn("Content entry created without thumbnail: uuid={}, reason={}", uuid, error.getMessage()); + return Mono.just(entry); + }); + } + +} diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java similarity index 95% rename from stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java rename to stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java index f6e9d3082..46848b480 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniversSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java @@ -20,6 +20,7 @@ import com.backbase.stream.investment.RandomParam; import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -33,8 +34,8 @@ import reactor.test.StepVerifier; /** - * Test suite for {@link InvestmentAssetUniversSaga}, focusing on the asynchronous price ingestion workflow with polling - * and timeout behavior. + * Test suite for {@link InvestmentAssetUniverseSaga}, focusing on the asynchronous price ingestion + * workflow with polling and timeout behavior. * *

These tests verify: *

    @@ -44,7 +45,7 @@ *
  • Error propagation during price ingestion
  • *
*/ -class InvestmentAssetUniversSagaTest { +class InvestmentAssetUniverseSagaTest { @Mock private InvestmentAssetUniverseService assetUniverseService; @@ -52,21 +53,25 @@ class InvestmentAssetUniversSagaTest { @Mock private InvestmentAssetPriceService investmentAssetPriceService; + @Mock + private InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService; + @Mock private InvestmentIngestionConfigurationProperties configurationProperties; - private InvestmentAssetUniversSaga saga; + private InvestmentAssetUniverseSaga saga; @BeforeEach void setUp() { MockitoAnnotations.openMocks(this); - saga = new InvestmentAssetUniversSaga( + saga = new InvestmentAssetUniverseSaga( assetUniverseService, investmentAssetPriceService, + investmentIntradayAssetPriceService, configurationProperties ); // Enable asset universe by default - when(configurationProperties.isAssetUniversEnabled()).thenReturn(true); + when(configurationProperties.isAssetUniverseEnabled()).thenReturn(true); } @@ -236,8 +241,8 @@ void upsertPrices_error_duringIngestion() { InvestmentAssetsTask task = createTestTask(); // Mock: Markets and market special days creation succeed - when(assetUniverseService.getOrCreateMarket(any())).thenReturn(Mono.empty()); - when(assetUniverseService.getOrCreateMarketSpecialDay(any())).thenReturn(Mono.empty()); + when(assetUniverseService.upsertMarket(any())).thenReturn(Mono.empty()); + when(assetUniverseService.upsertMarketSpecialDay(any())).thenReturn(Mono.empty()); when(assetUniverseService.createAssets(anyList())).thenReturn(Flux.fromIterable(task.getData().getAssets())); // Mock: Price ingestion fails with an exception @@ -364,7 +369,7 @@ void upsertPrices_success_mixedStatuses() { * @return a configured test task */ private InvestmentAssetsTask createTestTask() { - // Create sample assets using the record constructor + // Create sample assets using the Asset constructor Asset asset1 = new Asset( UUID.randomUUID(), "Apple Inc.", @@ -376,8 +381,8 @@ private InvestmentAssetsTask createTestTask() { Collections.emptyMap(), AssetTypeEnum.STOCK, List.of("Technology"), - null, "AAPL-001", + null, "Apple Inc. Stock", 150.0 ); @@ -393,8 +398,8 @@ private InvestmentAssetsTask createTestTask() { Collections.emptyMap(), AssetTypeEnum.STOCK, List.of("Technology"), - null, "MSFT-001", + null, "Microsoft Corp. Stock", 200.0 ); diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java index 56f7f9c3d..24627ab2c 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentAssetUniverseServiceTest.java @@ -6,6 +6,9 @@ import com.backbase.investment.api.service.v1.model.Market; import com.backbase.investment.api.service.v1.model.MarketRequest; import com.backbase.investment.api.service.v1.model.OASAssetRequestDataRequest; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import java.io.File; +import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; @@ -19,41 +22,44 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.io.IOException; -import java.nio.charset.StandardCharsets; /** - * This is a custom implementations to avoid issues with Reactive and multipart requests + * This is a custom implementation to avoid issues with Reactive and multipart requests. */ class InvestmentAssetUniverseServiceTest { InvestmentAssetUniverseService service; AssetUniverseApi assetUniverseApi; ApiClient apiClient; + InvestmentRestAssetUniverseService investmentRestAssetUniverseService; CustomIntegrationApiService customIntegrationApiService; @BeforeEach void setUp() { assetUniverseApi = Mockito.mock(AssetUniverseApi.class); apiClient = Mockito.mock(ApiClient.class); - customIntegrationApiService = Mockito.spy(new CustomIntegrationApiService(apiClient)); - service = new InvestmentAssetUniverseService(assetUniverseApi, customIntegrationApiService); + investmentRestAssetUniverseService = Mockito.mock(InvestmentRestAssetUniverseService.class); + customIntegrationApiService = Mockito.mock(CustomIntegrationApiService.class); + service = new InvestmentAssetUniverseService(assetUniverseApi, + investmentRestAssetUniverseService, customIntegrationApiService); } @Test - void getOrCreateMarket_marketExists() { + void upsertMarket_marketExists() { MarketRequest request = new MarketRequest().code("US"); - Market market = new Market().code("US"); + Market market = new Market().code("US").name("Usa Market"); + Market marketUpdated = new Market().code("US").name("Usa Market Updated"); Mockito.when(assetUniverseApi.getMarket("US")).thenReturn(Mono.just(market)); - Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.empty()); + Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.just(market)); + Mockito.when(assetUniverseApi.updateMarket("US",request)).thenReturn(Mono.just(marketUpdated)); - StepVerifier.create(service.getOrCreateMarket(request)) - .expectNext(market) + StepVerifier.create(service.upsertMarket(request)) + .expectNext(marketUpdated) .verifyComplete(); } @Test - void getOrCreateMarket_marketNotFound_createsMarket() { + void upsertMarket_marketNotFound_createsMarket() { MarketRequest request = new MarketRequest().code("US"); Market createdMarket = new Market().code("US"); Mockito.when(assetUniverseApi.getMarket("US")) @@ -66,25 +72,25 @@ void getOrCreateMarket_marketNotFound_createsMarket() { ))); Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.just(createdMarket)); - StepVerifier.create(service.getOrCreateMarket(request)) + StepVerifier.create(service.upsertMarket(request)) .expectNext(createdMarket) .verifyComplete(); } @Test - void getOrCreateMarket_otherError_propagates() { + void upsertMarket_otherError_propagates() { MarketRequest request = new MarketRequest().code("US"); Mockito.when(assetUniverseApi.getMarket("US")) .thenReturn(Mono.error(new RuntimeException("API error"))); Mockito.when(assetUniverseApi.createMarket(request)).thenReturn(Mono.empty()); - StepVerifier.create(service.getOrCreateMarket(request)) + StepVerifier.create(service.upsertMarket(request)) .expectErrorMatches(e -> e instanceof RuntimeException && e.getMessage().equals("API error")) .verify(); } @Test - void getOrCreateAsset_assetExists() throws IOException { + void getOrCreateAsset_assetExists() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); Asset asset = new Asset().isin("ABC123"); @@ -109,17 +115,19 @@ void getOrCreateAsset_assetExists() throws IOException { Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class))) .thenReturn(Mono.just(asset)); - StepVerifier.create(service.getOrCreateAsset(req)) + StepVerifier.create(service.getOrCreateAsset(req, null)) .expectNext(asset) .verifyComplete(); } @Test - void getOrCreateAsset_assetNotFound_createsAsset() throws IOException { + void getOrCreateAsset_assetNotFound_createsAsset() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); Asset createdAsset = new Asset().isin("ABC123"); String assetId = "ABC123_US_USD"; + File logo = null; + Mockito.when(assetUniverseApi.getAsset(assetId, null, null, null)) .thenReturn(Mono.error(WebClientResponseException.create( HttpStatus.NOT_FOUND.value(), @@ -146,13 +154,13 @@ void getOrCreateAsset_assetNotFound_createsAsset() throws IOException { Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class))) .thenReturn(Mono.just(createdAsset)); - StepVerifier.create(service.getOrCreateAsset(req)) + StepVerifier.create(service.getOrCreateAsset(req, logo)) .expectNext(createdAsset) .verifyComplete(); } @Test - void getOrCreateAsset_otherError_propagates() throws IOException { + void getOrCreateAsset_otherError_propagates() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); String assetId = "ABC123_US_USD"; @@ -177,13 +185,13 @@ void getOrCreateAsset_otherError_propagates() throws IOException { Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class))) .thenReturn(Mono.error(new RuntimeException("API error"))); - StepVerifier.create(service.getOrCreateAsset(req)) + StepVerifier.create(service.getOrCreateAsset(req, null)) .expectErrorMatches(e -> e instanceof RuntimeException && e.getMessage().equals("API error")) .verify(); } @Test - void getOrCreateAsset_createAssetFails_propagates() throws IOException { + void getOrCreateAsset_createAssetFails_propagates() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); String assetId = "ABC123_US_USD"; @@ -213,22 +221,20 @@ void getOrCreateAsset_createAssetFails_propagates() throws IOException { Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class))) .thenReturn(Mono.error(new RuntimeException("Create asset failed"))); - StepVerifier.create(service.getOrCreateAsset(req)) + StepVerifier.create(service.getOrCreateAsset(req, null)) .expectErrorMatches(e -> e instanceof RuntimeException && e.getMessage().equals("Create asset failed")) .verify(); } @Test void getOrCreateAsset_nullRequest_returnsError() { - StepVerifier.create(Mono.defer(() -> { - return service.getOrCreateAsset(null); - })) + StepVerifier.create(Mono.defer(() -> service.getOrCreateAsset(null, null))) .expectError(NullPointerException.class) .verify(); } @Test - void getOrCreateAsset_emptyMonoFromCreateAsset() throws IOException { + void getOrCreateAsset_emptyMonoFromCreateAsset() { OASAssetRequestDataRequest req = new OASAssetRequestDataRequest() .isin("ABC123").market("US").currency("USD"); String assetId = "ABC123_US_USD"; @@ -258,7 +264,7 @@ void getOrCreateAsset_emptyMonoFromCreateAsset() throws IOException { Mockito.when(responseSpec.bodyToMono(ArgumentMatchers.any(ParameterizedTypeReference.class))) .thenReturn(Mono.empty()); - StepVerifier.create(service.getOrCreateAsset(req)) + StepVerifier.create(service.getOrCreateAsset(req, null)) .expectComplete() .verify(); } diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java new file mode 100644 index 000000000..1180ab768 --- /dev/null +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentIntradayAssetPriceServiceTest.java @@ -0,0 +1,54 @@ +package com.backbase.stream.investment.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService.Ohlc; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +/** + * Unit tests covering deterministic parts of the intraday price generator. + * + *

Focus is on {@link InvestmentIntradayAssetPriceService#generateIntradayOhlc(Double)} which is + * deterministic given a provided Random seed; tests assert structural invariants and rounding. + */ +class InvestmentIntradayAssetPriceServiceTest { + + @Test + void generateIntradayOhlc_shouldValidateInput() { + assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(null)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(0.0)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> InvestmentIntradayAssetPriceService.generateIntradayOhlc(-1.0)) + .isInstanceOf(IllegalArgumentException.class); + } + + @RepeatedTest(10) + void generateIntradayOhlc_shouldProduceValidOhlcStructure() { + double previous = 100.0; + Ohlc ohlc = InvestmentIntradayAssetPriceService.generateIntradayOhlc(previous); + + double open = ohlc.open(); + double high = ohlc.high(); + double low = ohlc.low(); + double close = ohlc.close(); + + // Basic sanity + assertThat(open).isGreaterThan(0.0); + assertThat(high).isGreaterThan(0.0); + assertThat(low).isGreaterThan(0.0); + assertThat(close).isGreaterThan(0.0); + + // High must be >= max(open, close), low must be <= min(open, close) + assertThat(high).isGreaterThanOrEqualTo(Math.max(open, close)); + assertThat(low).isLessThanOrEqualTo(Math.min(open, close)); + + // Values should be rounded to 6 decimal places: value * 1e6 should be near-integer + assertThat(Math.abs(Math.round(open * 1_000_000.0) - open * 1_000_000.0)).isLessThan(1e-6); + assertThat(Math.abs(Math.round(high * 1_000_000.0) - high * 1_000_000.0)).isLessThan(1e-6); + assertThat(Math.abs(Math.round(low * 1_000_000.0) - low * 1_000_000.0)).isLessThan(1e-6); + assertThat(Math.abs(Math.round(close * 1_000_000.0) - close * 1_000_000.0)).isLessThan(1e-6); + } +} \ No newline at end of file diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java new file mode 100644 index 000000000..3748f2e60 --- /dev/null +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentRestNewsContentServiceTest.java @@ -0,0 +1,45 @@ +package com.backbase.stream.investment.service; + +import com.backbase.investment.api.service.sync.ApiClient; +import com.backbase.investment.api.service.sync.v1.ContentApi; +import com.backbase.investment.api.service.v1.model.EntryCreateUpdate; +import com.backbase.investment.api.service.v1.model.EntryCreateUpdateRequest; +import com.backbase.investment.api.service.v1.model.PaginatedEntryList; +import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class InvestmentRestNewsContentServiceTest { + + private ContentApi contentApi; + private ApiClient apiClient; + private InvestmentRestNewsContentService service; + + @BeforeEach + void setUp() { + contentApi = Mockito.mock(ContentApi.class); + apiClient = Mockito.mock(ApiClient.class); + service = new InvestmentRestNewsContentService(contentApi, apiClient); + } + + @Test + void upsertContent_createsNewEntry_whenNotExists() { + // Given + EntryCreateUpdateRequest request = new EntryCreateUpdateRequest() + .title("New Article") + .excerpt("Excerpt") + .tags(List.of("tag1")); + + PaginatedEntryList emptyList = new PaginatedEntryList() + .count(0) + .results(List.of()); + + EntryCreateUpdate created = new EntryCreateUpdate(); + + } + + +} +