diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index f43f039ff..84caea3ab 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -391,7 +391,8 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase { await this.provider.index({ index: this.getSchema().index, id: orderId, - body: document + body: document, + refresh: 'wait_for' }) return document } catch (error) { @@ -441,7 +442,8 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase { body: { doc: document, doc_as_upsert: true - } + }, + refresh: 'wait_for' }) return document } catch (error) { @@ -457,7 +459,8 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase { try { await this.provider.delete({ index: this.getSchema().index, - id: orderId + id: orderId, + refresh: 'wait_for' }) return { id: orderId } } catch (error) { @@ -582,7 +585,8 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase { const response = await this.client.index({ index: schema.index, id: ddo.id, - body: ddo + body: ddo, + refresh: 'wait_for' }) return response } else { @@ -654,7 +658,8 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase { id: ddo.id, body: { doc: ddo - } + }, + refresh: 'wait_for' }) // make sure we do not have different responses 4 between DBs // do the same thing on other methods @@ -693,7 +698,8 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase { try { const response = await this.client.delete({ index: schema.index, - id + id, + refresh: 'wait_for' }) isDeleted = response.result === 'deleted' if (isDeleted) { diff --git a/src/index.ts b/src/index.ts index 532dc363d..119445706 100644 --- a/src/index.ts +++ b/src/index.ts @@ -218,3 +218,10 @@ if (config.hasHttp) { // Call the function to schedule the cron job to delete old logs scheduleCronJobs(oceanNode) } + +process.on('unhandledRejection', (reason) => { + console.log({ reason }) +}) +process.on('uncaughtException', (reason) => { + console.log({ reason }) +}) diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index 12cc2e114..068bfedad 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -22,7 +22,10 @@ import { import { Database } from '../../components/database/index.js' import { OceanNode } from '../../OceanNode.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { Readable } from 'stream' import { waitToIndex } from './testUtils.js' import { streamToObject } from '../../utils/util.js' @@ -109,7 +112,16 @@ describe('Trusted algorithms Flow', () => { ) config = await getConfiguration(true) dbconn = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance(config, dbconn, null, null, null) + oceanNode = await OceanNode.getInstance( + config, + dbconn, + null, + null, + null, + null, + null, + true + ) indexer = new OceanIndexer( dbconn, config.indexingNetworks, @@ -499,6 +511,7 @@ describe('Trusted algorithms Flow', () => { }) after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/integration/auth.test.ts b/src/test/integration/auth.test.ts index 2a0672032..03460550b 100644 --- a/src/test/integration/auth.test.ts +++ b/src/test/integration/auth.test.ts @@ -48,7 +48,16 @@ describe('Auth Token Integration Tests', () => { config = await getConfiguration(true) database = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance(config, database) + oceanNode = await OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) provider = new JsonRpcProvider(mockSupportedNetworks['8996'].rpc) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 2dbe382c8..bb9733905 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -35,7 +35,10 @@ import { import { Database } from '../../components/database/index.js' import { OceanNode } from '../../OceanNode.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { Readable } from 'stream' import { expectedTimeoutFailure, waitToIndex } from './testUtils.js' import { getEventFromTx, streamToObject } from '../../utils/util.js' @@ -2025,7 +2028,8 @@ describe('Compute', () => { after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/integration/configAdmin.test.ts b/src/test/integration/configAdmin.test.ts index e971d5508..9b0fc5952 100644 --- a/src/test/integration/configAdmin.test.ts +++ b/src/test/integration/configAdmin.test.ts @@ -53,7 +53,16 @@ describe('Config Admin Endpoints Integration Tests', () => { config = await getConfiguration(true) database = await Database.init(config.dbConfig) - oceanNode = OceanNode.getInstance(config, database) + oceanNode = OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) }) after(async () => { diff --git a/src/test/integration/configDatabase.test.ts b/src/test/integration/configDatabase.test.ts index 401075947..d80e45974 100644 --- a/src/test/integration/configDatabase.test.ts +++ b/src/test/integration/configDatabase.test.ts @@ -1,7 +1,10 @@ import { Database } from '../../components/database/index.js' import { OceanNode } from '../../OceanNode.js' import { expect, assert } from 'chai' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { buildEnvOverrideConfig, getMockSupportedNetworks, @@ -62,7 +65,16 @@ describe('Config Database', () => { assert(initialVersionNull.value === null, 'Initial version should be null') }) - const oceanNode = await OceanNode.getInstance(await getConfiguration(true), database) + const oceanNode = await OceanNode.getInstance( + await getConfiguration(true), + database, + null, + null, + null, + null, + null, + true + ) oceanIndexer = new OceanIndexer( database, getMockSupportedNetworks(), @@ -106,7 +118,8 @@ describe('Config Database', () => { assert(version.value === updatedVersion, `Version should be ${updatedVersion}`) }) after(async () => { - oceanIndexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await oceanIndexer.stopAllChainIndexers() await tearDownEnvironment(previousConfiguration) }) }) diff --git a/src/test/integration/credentials.test.ts b/src/test/integration/credentials.test.ts index d2bd88d56..ff60cc0f9 100644 --- a/src/test/integration/credentials.test.ts +++ b/src/test/integration/credentials.test.ts @@ -16,7 +16,10 @@ import { expect, assert } from 'chai' import { JsonRpcProvider, Signer, ethers, Contract, EventLog } from 'ethers' import { Database } from '../../components/database/index.js' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { OceanNode } from '../../OceanNode.js' import { RPCS, SupportedNetwork } from '../../@types/blockchain.js' import { streamToObject } from '../../utils/util.js' @@ -139,7 +142,16 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { config = await getConfiguration(true) // Force reload the configuration const database = await Database.init(config.dbConfig) - oceanNode = OceanNode.getInstance(config, database) + oceanNode = OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) const indexer = new OceanIndexer( database, config.indexingNetworks, @@ -630,6 +642,7 @@ describe('[Credentials Flow] - Should run a complete node flow.', () => { after(async () => { await tearDownEnvironment(previousConfiguration) - oceanNode.getIndexer().stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await oceanNode.getIndexer().stopAllChainIndexers() }) }) diff --git a/src/test/integration/download.test.ts b/src/test/integration/download.test.ts index 1b5194950..6ba09043a 100644 --- a/src/test/integration/download.test.ts +++ b/src/test/integration/download.test.ts @@ -1,7 +1,10 @@ import { expect, assert } from 'chai' import { JsonRpcProvider, Signer, ethers } from 'ethers' import { Database } from '../../components/database/index.js' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { OceanNode } from '../../OceanNode.js' import { RPCS } from '../../@types/blockchain.js' import { streamToString, streamToObject } from '../../utils/util.js' @@ -89,7 +92,16 @@ describe('[Download Flow] - Should run a complete node flow.', () => { config = await getConfiguration(true) // Force reload the configuration database = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance(config, database) + oceanNode = await OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) indexer = new OceanIndexer( database, config.indexingNetworks, @@ -327,6 +339,7 @@ describe('[Download Flow] - Should run a complete node flow.', () => { }) after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/integration/encryptDecryptDDO.test.ts b/src/test/integration/encryptDecryptDDO.test.ts index cfaffb163..f05b8a5ce 100644 --- a/src/test/integration/encryptDecryptDDO.test.ts +++ b/src/test/integration/encryptDecryptDDO.test.ts @@ -38,7 +38,10 @@ import { import { DecryptDDOCommand } from '../../@types/commands.js' import { EncryptMethod } from '../../@types/fileObject.js' import { homedir } from 'os' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' describe('Should encrypt and decrypt DDO', () => { let database: Database @@ -104,7 +107,16 @@ describe('Should encrypt and decrypt DDO', () => { ) const config = await getConfiguration() database = await Database.init(config.dbConfig) - oceanNode = OceanNode.getInstance(config, database) + oceanNode = OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) // will be used later indexer = new OceanIndexer( database, @@ -392,6 +404,7 @@ describe('Should encrypt and decrypt DDO', () => { after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/integration/encryptFile.test.ts b/src/test/integration/encryptFile.test.ts index aa55e3ab5..8bc98bbca 100644 --- a/src/test/integration/encryptFile.test.ts +++ b/src/test/integration/encryptFile.test.ts @@ -34,7 +34,16 @@ describe('Encrypt File', () => { ) config = await getConfiguration(true) // Force reload the configuration const dbconn = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance(config, dbconn) + oceanNode = await OceanNode.getInstance( + config, + dbconn, + null, + null, + null, + null, + null, + true + ) anotherConsumerWallet = new ethers.Wallet( '0xef4b441145c1d0f3b4bc6d61d29f5c6e502359481152f869247c7a4244d45209' ) diff --git a/src/test/integration/getJobs.test.ts b/src/test/integration/getJobs.test.ts index 0f405a66b..6b6cfc730 100644 --- a/src/test/integration/getJobs.test.ts +++ b/src/test/integration/getJobs.test.ts @@ -74,7 +74,16 @@ describe('GetJobsHandler integration', () => { previousConfiguration = await setupEnvironment(TEST_ENV_CONFIG_FILE) const config = await getConfiguration(true) db = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance(config, db) + oceanNode = await OceanNode.getInstance( + config, + db, + null, + null, + null, + null, + null, + true + ) handler = new GetJobsHandler(oceanNode) diff --git a/src/test/integration/indexer.test.ts b/src/test/integration/indexer.test.ts index 1c83d4b42..0b4912d09 100644 --- a/src/test/integration/indexer.test.ts +++ b/src/test/integration/indexer.test.ts @@ -16,7 +16,10 @@ import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templat import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' with { type: 'json' } import { Database } from '../../components/database/index.js' import { DatabaseFactory } from '../../components/database/DatabaseFactory.js' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { RPCS } from '../../@types/blockchain.js' import { getEventFromTx, streamToObject } from '../../utils/util.js' import { waitToIndex, expectedTimeoutFailure } from './testUtils.js' @@ -100,7 +103,7 @@ describe('Indexer stores a new metadata events and orders.', () => { const config = await getConfiguration(true) database = await Database.init(config.dbConfig) - oceanNode = OceanNode.getInstance(config, database) + oceanNode = OceanNode.getInstance(config, database, null, null, null, null, null, true) indexer = new OceanIndexer( database, mockSupportedNetworks, @@ -656,6 +659,7 @@ describe('Indexer stores a new metadata events and orders.', () => { after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/integration/operationsDashboard.test.ts b/src/test/integration/operationsDashboard.test.ts index 8f102b741..a70f7d8bd 100644 --- a/src/test/integration/operationsDashboard.test.ts +++ b/src/test/integration/operationsDashboard.test.ts @@ -117,7 +117,8 @@ describe('Should test admin operations', () => { null, null, keyManager, - blockchainRegistry + blockchainRegistry, + true ) indexer = new OceanIndexer( dbconn, @@ -417,6 +418,6 @@ describe('Should test admin operations', () => { after(async () => { await tearDownEnvironment(previousConfiguration) INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() - indexer.stopAllChainIndexers() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/integration/pricing.test.ts b/src/test/integration/pricing.test.ts index fed4f9d15..1e66a2abb 100644 --- a/src/test/integration/pricing.test.ts +++ b/src/test/integration/pricing.test.ts @@ -15,7 +15,10 @@ import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templat import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' with { type: 'json' } import Dispenser from '@oceanprotocol/contracts/artifacts/contracts/pools/dispenser/Dispenser.sol/Dispenser.json' with { type: 'json' } import { Database } from '../../components/database/index.js' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { RPCS } from '../../@types/blockchain.js' import { getEventFromTx } from '../../utils/util.js' import { waitToIndex, expectedTimeoutFailure } from './testUtils.js' @@ -84,7 +87,7 @@ describe('Publish pricing scehmas and assert ddo stats - FRE & Dispenser', () => const config = await getConfiguration(true) database = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance() + oceanNode = OceanNode.getInstance(config, database, null, null, null, null, null, true) indexer = new OceanIndexer( database, mockSupportedNetworks, @@ -376,6 +379,7 @@ describe('Publish pricing scehmas and assert ddo stats - FRE & Dispenser', () => }) after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/integration/transactionValidation.test.ts b/src/test/integration/transactionValidation.test.ts index 651be6af9..a73a6bad6 100644 --- a/src/test/integration/transactionValidation.test.ts +++ b/src/test/integration/transactionValidation.test.ts @@ -11,7 +11,10 @@ import { } from '../../utils/address.js' import { publishAsset, orderAsset, reOrderAsset } from '../utils/assets.js' import { RPCS } from '../../@types/blockchain.js' -import { OceanIndexer } from '../../components/Indexer/index.js' +import { + OceanIndexer, + INDEXER_CRAWLING_EVENT_EMITTER +} from '../../components/Indexer/index.js' import { OceanNode } from '../../OceanNode.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' import { ENVIRONMENT_VARIABLES, EVENTS, getConfiguration } from '../../utils/index.js' @@ -69,7 +72,16 @@ describe('validateOrderTransaction Function with Orders', () => { config = await getConfiguration(true) // Force reload the configuration const dbconn = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance(config, dbconn) + oceanNode = await OceanNode.getInstance( + config, + dbconn, + null, + null, + null, + null, + null, + true + ) indexer = new OceanIndexer( dbconn, config.indexingNetworks, @@ -239,6 +251,7 @@ describe('validateOrderTransaction Function with Orders', () => { }) after(async () => { await tearDownEnvironment(previousConfiguration) - indexer.stopAllChainIndexers() + INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners() + await indexer.stopAllChainIndexers() }) }) diff --git a/src/test/unit/download.test.ts b/src/test/unit/download.test.ts index da45d1d8d..2c8810c9f 100644 --- a/src/test/unit/download.test.ts +++ b/src/test/unit/download.test.ts @@ -20,6 +20,7 @@ import { AssetUtils, isConfidentialChainDDO } from '../../utils/asset.js' import { DEVELOPMENT_CHAIN_ID, KNOWN_CONFIDENTIAL_EVMS } from '../../utils/address.js' import { DDO } from '@oceanprotocol/ddo-js' import { Wallet, ethers } from 'ethers' +import { KeyManager } from '../../components/KeyManager/index.js' let envOverrides: OverrideEnvConfig[] let config: OceanNodeConfig @@ -35,8 +36,18 @@ describe('Should validate files structure for download', () => { ) envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) config = await getConfiguration(true) + const keyManager = new KeyManager(config) db = await Database.init(config.dbConfig) - oceanNode = OceanNode.getInstance(config, db) + oceanNode = OceanNode.getInstance( + config, + db, + null, + null, + null, + keyManager, + null, + true + ) consumerAccount = new Wallet(process.env.PRIVATE_KEY) })