From eeaabb29bb138f6fc6a9a739501b9aa3887858fc Mon Sep 17 00:00:00 2001 From: Thomas Flament Date: Sun, 1 Feb 2026 16:58:53 +0100 Subject: [PATCH 1/5] replication: Fetch object again if locations were stripped Issue: BB-491 Signed-off-by: Thomas Flament --- .../replication/tasks/ReplicateObject.js | 44 +++++++++++-------- .../functional/replication/queueProcessor.js | 41 ++++++++++++++++- 2 files changed, 65 insertions(+), 20 deletions(-) diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 27cc68cf9..34a1d2496 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -398,19 +398,6 @@ class ReplicateObject extends BackbeatTask { }); } - _checkSourceReplication(sourceEntry, log, cb) { - this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { - if (err) { - return cb(err); - } - const status = refreshedEntry.getReplicationSiteStatus(this.site); - if (status === 'COMPLETED') { - return cb(errorAlreadyCompleted); - } - return cb(); - }); - } - _getAndPutData(sourceEntry, destEntry, log, cb) { log.debug('replicating data', { entry: sourceEntry.getLogInfo() }); if (sourceEntry.getLocation().some(part => { @@ -825,7 +812,8 @@ class ReplicateObject extends BackbeatTask { }); } - processQueueEntry(sourceEntry, kafkaEntry, done) { + processQueueEntry(_sourceEntry, kafkaEntry, done) { + let sourceEntry = _sourceEntry; const log = this.logger.newRequestLogger(); const destEntry = sourceEntry.toReplicaEntry(this.site); @@ -866,12 +854,30 @@ class ReplicateObject extends BackbeatTask { this._setTargetAccountMd(destEntry, targetRole, log, next); }, next => { - if (!mdOnly && - sourceEntry.getContentLength() / 1000000 >= - this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) { - return this._checkSourceReplication(sourceEntry, log, next); + if (mdOnly) { + return next(); } - return next(); + const isLargeObject = sourceEntry.getContentLength() / 1000000 >= + this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB; + const isLocationStripped = sourceEntry.getContentLength() > 0 && sourceEntry.getLocation().length === 0; + if (!isLargeObject && !isLocationStripped) { + return next(); + } + return this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { + if (err) { + return next(err); + } + const status = refreshedEntry.getReplicationSiteStatus(this.site); + if (status === 'COMPLETED') { + log.info('replication already completed, skipping', { + entry: sourceEntry.getLogInfo(), + }); + return next(errorAlreadyCompleted); + } + // Reassign sourceEntry to use fresh metadata + sourceEntry = refreshedEntry; + return next(); + }); }, // Get data from source bucket and put it on the target bucket next => { diff --git a/tests/functional/replication/queueProcessor.js b/tests/functional/replication/queueProcessor.js index 5a0f36a84..39938de9a 100644 --- a/tests/functional/replication/queueProcessor.js +++ b/tests/functional/replication/queueProcessor.js @@ -1031,7 +1031,46 @@ describe('queue processor functional tests with mocking', () => { }); }); - it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => { + it('should fetch object MD from S3 for location data when missing', done => { + // Create a Kafka entry with its location[] field stripped + const originalKafkaEntry = s3mock.getParam('kafkaEntry'); + const kafkaValue = JSON.parse(originalKafkaEntry.value); + const kafkaData = JSON.parse(kafkaValue.value); + delete kafkaData.location; + kafkaValue.value = JSON.stringify(kafkaData); + const kafkaEntry = { + ...originalKafkaEntry, + value: JSON.stringify(kafkaValue), + }; + + s3mock.setParam('contentLength', 1000); + let checkMdCalled = false; + s3mock.setParam('routes.source.s3.getMetadata.handler', + (req, url, query, res) => { + checkMdCalled = true; + s3mock.resetParam('routes.source.s3.getMetadata.handler'); + s3mock._getMetadataSource(req, url, query, res); + }, { _static: true }); + + async.parallel([ + done => { + s3mock.onPutSourceMd = done; + }, + done => queueProcessorSF.processReplicationEntry( + kafkaEntry, err => { + assert.ifError(err); + assert.strictEqual(s3mock.hasPutTargetData, true); + assert(s3mock.hasPutTargetMd); + assert(checkMdCalled); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); + + it('should not check object MD for small objects with location', done => { s3mock.setParam('contentLength', 1); let checkMdCalled = false; s3mock.setParam('routes.source.s3.getMetadata.handler', From 2dc2bbd8541905a20a3af7084993b998bb04571c Mon Sep 17 00:00:00 2001 From: Thomas Flament Date: Sun, 1 Feb 2026 16:59:31 +0100 Subject: [PATCH 2/5] oplog: Strip locations when the array is large This saves space on the Kafka side while preventing a refetch on most (small) objects. Issue: BB-491 Signed-off-by: Thomas Flament --- conf/config.json | 1 + extensions/oplogPopulator/OplogPopulator.js | 6 +-- .../OplogPopulatorConfigValidator.js | 1 + .../MultipleBucketsPipelineFactory.js | 31 +++++++----- .../pipeline/PipelineFactory.js | 48 ++++++++++++++++++- .../pipeline/WildcardPipelineFactory.js | 33 ++++++++----- .../OplogPopulatorConfigValidator.js | 27 +++++++++++ .../MultipleBucketsPipelineFactory.js | 12 +++-- .../pipeline/WildcardPipelineFactory.js | 22 +++++++-- 9 files changed, 146 insertions(+), 35 deletions(-) create mode 100644 tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js diff --git a/conf/config.json b/conf/config.json index 46c590d60..96cf56857 100644 --- a/conf/config.json +++ b/conf/config.json @@ -304,6 +304,7 @@ "kafkaConnectHost": "127.0.0.1", "kafkaConnectPort": 8083, "numberOfConnectors": 1, + "locationStrippingThreshold": 500, "probeServer": { "bindAddress": "0.0.0.0", "port": 8556 diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index dd733374f..b9a16b97f 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -335,7 +335,7 @@ class OplogPopulator { if (this._config.numberOfConnectors === 0) { // If the number of connector is set to 0, then we // use a single connector to listen to the whole DB. - pipelineFactory = new WildcardPipelineFactory(); + pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingThreshold); strategy = new UniqueConnector({ logger: this._logger, }); @@ -345,7 +345,7 @@ class OplogPopulator { // words, we cannot alter an existing pipeline. In this // case, the strategy is to allow a maximum of one // bucket per kafka connector. - pipelineFactory = new MultipleBucketsPipelineFactory(); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold); strategy = new ImmutableConnector({ logger: this._logger, }); @@ -354,7 +354,7 @@ class OplogPopulator { // kafka connector. However, we want to proactively // ensure that the pipeline will be accepted by // mongodb. - pipelineFactory = new MultipleBucketsPipelineFactory(); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold); strategy = new LeastFullConnector({ logger: this._logger, }); diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index cb2d1b00b..9f53d52fe 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -6,6 +6,7 @@ const joiSchema = joi.object({ kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), numberOfConnectors: joi.number().required().min(0), + locationStrippingThreshold: joi.number().default(5), prefix: joi.string().optional(), probeServer: probeServerJoi.default(), connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'), diff --git a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index 5202d83fa..d0f25da40 100644 --- a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -8,6 +8,16 @@ const PipelineFactory = require('./PipelineFactory'); * given a list of buckets. */ class MultipleBucketsPipelineFactory extends PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingThreshold threshold for stripping location data + */ + constructor(locationStrippingThreshold) { + super(locationStrippingThreshold); + // getPipeline is used standalone later, make sure its `this` reference binds to us. + this.getPipeline = this.getPipeline.bind(this); + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -22,24 +32,21 @@ class MultipleBucketsPipelineFactory extends PipelineFactory { } /** - * Makes new connector pipeline that includes - * buckets assigned to this connector. + * Makes connector pipeline stage, that includes buckets assigned to this connector. * @param {string[] | undefined} buckets buckets assigned to this connector - * @returns {string} new connector pipeline + * @returns {object} connector pipeline stage */ - getPipeline(buckets) { + getPipelineStage(buckets) { if (!buckets || !buckets.length) { - return JSON.stringify([]); + return null; } - return JSON.stringify([ - { - $match: { - 'ns.coll': { - $in: buckets, - } + return { + $match: { + 'ns.coll': { + $in: buckets, } } - ]); + }; } } diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index 8306a0207..f4767c293 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -7,6 +7,14 @@ const { wildCardForAllBuckets } = require('../constants'); * @classdesc base class for the pipeline factories */ class PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingThreshold threshold for stripping location data + */ + constructor(locationStrippingThreshold) { + this._locationStrippingThreshold = locationStrippingThreshold; + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -55,7 +63,45 @@ class PipelineFactory { * @param {string[] | undefined} buckets buckets assigned to this connector * @returns {string} new connector pipeline */ - getPipeline(buckets) { // eslint-disable-line no-unused-vars + getPipeline(buckets) { + const stage = this.getPipelineStage(buckets); + if (!stage) { + return JSON.stringify([]); + } + const pipeline = [ + stage, + ]; + if (this._locationStrippingThreshold >= 0) { + pipeline.push({ + $set: { + 'fullDocument.value.location': + this._locationStrippingExpression('fullDocument.value.location'), + 'updateDescription.updatedFields.value.location': + this._locationStrippingExpression('updateDescription.updatedFields.value.location'), + } + }); + } + return JSON.stringify(pipeline); + } + + _locationStrippingExpression(fieldPath) { + return { + $switch: { + branches: [ + { case: { $not: [{ $isArray: `$${fieldPath}` }] }, then: '$$REMOVE' }, + { case: { $gte: [{ $size: `$${fieldPath}` }, this._locationStrippingThreshold] }, then: '$$REMOVE' }, + ], + default: `$${fieldPath}`, + } + }; + } + + /** + * Makes connector pipeline stage, to then be used by getPipeline. + * @param {string[] | undefined} buckets buckets assigned to this connector + * @returns {object} connector pipeline stage + */ + getPipelineStage(buckets) { // eslint-disable-line no-unused-vars throw errors.NotImplemented; } } diff --git a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js index 14eec3542..4e8e35994 100644 --- a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -9,6 +9,16 @@ const PipelineFactory = require('./PipelineFactory'); * that listens to all buckets. */ class WildcardPipelineFactory extends PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingThreshold threshold for stripping location data + */ + constructor(locationStrippingThreshold) { + super(locationStrippingThreshold); + // getPipeline is used standalone later, make sure its `this` reference binds to us. + this.getPipeline = this.getPipeline.bind(this); + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -23,23 +33,20 @@ class WildcardPipelineFactory extends PipelineFactory { } /** - * Create a pipeline for the connector, to listen to all - * non-special collections. + * Makes connector pipeline stage, to listen to all non-special collections. * @param {string[] | undefined} buckets buckets assigned to this connector - * @returns {string} new connector pipeline + * @returns {object} connector pipeline stage */ - getPipeline(buckets) { // eslint-disable-line no-unused-vars - return JSON.stringify([ - { - $match: { - 'ns.coll': { - $not: { - $regex: `^(${constants.mpuBucketPrefix}|__).*`, - }, - } + getPipelineStage(buckets) { // eslint-disable-line no-unused-vars + return { + $match: { + 'ns.coll': { + $not: { + $regex: `^(${constants.mpuBucketPrefix}|__).*`, + }, } } - ]); + }; } } diff --git a/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js new file mode 100644 index 000000000..991d25809 --- /dev/null +++ b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js @@ -0,0 +1,27 @@ +const assert = require('assert'); +const { OplogPopulatorConfigJoiSchema} = require('../../../extensions/oplogPopulator/OplogPopulatorConfigValidator'); + +const defaultConfig = { + topic: 'backbeat-oplog', + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, + numberOfConnectors: 1, + probeServer: { + bindAddress: '0.0.0.0', + port: 8556, + }, +}; + +describe('OplogPopulatorConfigValidator', () => { + describe('locationStrippingThreshold validation', () => { + it('should accept valid threshold', () => { + const config = { + ...defaultConfig, + locationStrippingThreshold: 50, + }; + const result = OplogPopulatorConfigJoiSchema.validate(config); + assert.ifError(result.error); + assert.strictEqual(result.value.locationStrippingThreshold, 50); + }); + }); +}); diff --git a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index a9b68c77a..50084c251 100644 --- a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -4,7 +4,7 @@ const MultipleBucketsPipelineFactory = const { constants } = require('arsenal'); describe('MultipleBucketsPipelineFactory', () => { - const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(); + const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(200); describe('isValid', () => { it('should detect a valid list of buckets', () => { @@ -45,10 +45,16 @@ describe('MultipleBucketsPipelineFactory', () => { assert.strictEqual(result, '[]'); }); - it('should return the pipeline with buckets', () => { + it('should return the pipeline with buckets and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = multipleBucketsPipelineFactory.getPipeline(buckets); - assert.strictEqual(result, '[{"$match":{"ns.coll":{"$in":["bucket1","bucket2"]}}}]'); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}}); + assert(pipeline[1].$set['fullDocument.value.location']); + assert(pipeline[1].$set['updateDescription.updatedFields.value.location']); + assert(result.includes('200')); }); }); diff --git a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js index 01a191f66..d906840fe 100644 --- a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -3,7 +3,7 @@ const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/p const { constants } = require('arsenal'); describe('WildcardPipelineFactory', () => { - const wildcardPipelineFactory = new WildcardPipelineFactory(); + const wildcardPipelineFactory = new WildcardPipelineFactory(200); describe('isValid', () => { it('should detect a wildcard', () => { @@ -32,10 +32,26 @@ describe('WildcardPipelineFactory', () => { }); describe('getPipeline', () => { - it('should return the pipeline with wildcard', () => { + it('should return the pipeline with buckets and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactory.getPipeline(buckets); - assert.strictEqual(result, '[{"$match":{"ns.coll":{"$not":{"$regex":"^(mpuShadowBucket|__).*"}}}}]'); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}}); + assert(pipeline[1].$set['fullDocument.value.location']); + assert(pipeline[1].$set['updateDescription.updatedFields.value.location']); + assert(result.includes('200')); + }); + + it('should return the pipeline with buckets and no location stripping if disabled', () => { + const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(-1); + + const buckets = ['bucket1', 'bucket2']; + const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 1); }); }); From 636f27a5a12d4fd6d59d15c272bc50488765593e Mon Sep 17 00:00:00 2001 From: Thomas Flament Date: Thu, 12 Feb 2026 18:40:07 +0100 Subject: [PATCH 3/5] oplog: Update pipeline on Mongo >= 6.0 if match stage is unchanged Issue: BB-491 Signed-off-by: Thomas Flament --- .../oplogPopulator/modules/Connector.js | 21 ++++++++++- .../modules/ConnectorsManager.js | 4 +- .../pipeline/PipelineFactory.js | 8 ++-- tests/unit/oplogPopulator/Connector.js | 37 +++++++++++++++++++ .../unit/oplogPopulator/ConnectorsManager.js | 26 +++++++++++-- 5 files changed, 86 insertions(+), 10 deletions(-) diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index 9e0977c71..c296b380d 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -314,10 +314,14 @@ class Connector extends EventEmitter { * That is why we use updateConnectorConfig() instead of * updateConnectorPipeline() * @param {boolean} [doUpdate=false] updates connector if true + * @param {boolean} [updateSupported=true] whether full pipeline updates + * (including match stage changes) are supported. When false, the update + * is only applied if the match stage has not changed, to protect resume + * tokens on MongoDB >= 6.0. * @returns {Promise|boolean} connector did update * @throws {InternalError} */ - async updatePipeline(doUpdate = false) { + async updatePipeline(doUpdate = false, updateSupported = true) { // Only update when buckets changed and when not already updating if (!this._state.bucketsGotModified || this._state.isUpdating) { return false; @@ -325,6 +329,21 @@ class Connector extends EventEmitter { this._config.pipeline = this._getPipeline([...this._buckets]); try { if (doUpdate && this._isRunning) { + if (!updateSupported) { + const currentConfig = await this._kafkaConnect.getConnectorConfig(this._name); + const currentPipeline = currentConfig.pipeline + ? JSON.parse(currentConfig.pipeline) : []; + const newPipeline = JSON.parse(this._config.pipeline); + const currentMatch = JSON.stringify(currentPipeline[0]?.$match); + const newMatch = JSON.stringify(newPipeline[0]?.$match); + if (currentMatch !== newMatch) { + this._logger.info('Skipping pipeline update: match stage differs', { + method: 'Connector.updatePipeline', + connector: this._name, + }); + return false; + } + } const timeBeforeUpdate = Date.now(); this._state.isUpdating = true; this.emit(constants.connectorUpdatedEvent, this); diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index c15ca1355..8c50ef331 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -267,8 +267,8 @@ class ConnectorsManager extends EventEmitter { connector: connector.name, }); return true; - } else if (connector.isRunning && this._allocationStrategy.canUpdate()) { - return connector.updatePipeline(true); + } else if (connector.isRunning) { + return connector.updatePipeline(true, this._allocationStrategy.canUpdate()); } return false; diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index f4767c293..e435fdf49 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -84,14 +84,14 @@ class PipelineFactory { return JSON.stringify(pipeline); } - _locationStrippingExpression(fieldPath) { + _locationStrippingExpression(field) { return { $switch: { branches: [ - { case: { $not: [{ $isArray: `$${fieldPath}` }] }, then: '$$REMOVE' }, - { case: { $gte: [{ $size: `$${fieldPath}` }, this._locationStrippingThreshold] }, then: '$$REMOVE' }, + { case: { $not: [{ $isArray: `$${field}` }] }, then: '$$REMOVE' }, + { case: { $gte: [{ $size: `$${field}` }, this._locationStrippingThreshold] }, then: '$$REMOVE' }, ], - default: `$${fieldPath}`, + default: `$${field}`, } }; } diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index 7e9ef8f18..b6ddf2ce2 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -257,6 +257,43 @@ describe('Connector', () => { assert(pipelineStub.calledOnceWith([])); assert(updateStub.notCalled); }); + + it('should update when updateSupported is false and match stages are the same', async () => { + const matchStage = { $match: { 'ns.coll': { $in: ['bucket1'] } } }; + const newPipeline = JSON.stringify([matchStage, { $set: { 'fullDocument.value.location': {} } }]); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns(newPipeline); + sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({ + pipeline: JSON.stringify([matchStage]), + }); + const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') + .resolves(); + const didUpdate = await connector.updatePipeline(true, false); + assert.strictEqual(didUpdate, true); + assert(updateStub.calledOnce); + }); + + it('should skip update when updateSupported is false and match stages differ', async () => { + const newPipeline = JSON.stringify([ + { $match: { 'ns.coll': { $in: ['bucket1'] } } }, + ]); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns(newPipeline); + sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({ + pipeline: JSON.stringify([ + { $match: { 'ns.coll': { $in: ['different-bucket'] } } }, + ]), + }); + const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') + .resolves(); + const didUpdate = await connector.updatePipeline(true, false); + assert.strictEqual(didUpdate, false); + assert(updateStub.notCalled); + }); }); describe('getConfigSizeInBytes', () => { diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 45e587147..732a34652 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -285,14 +285,34 @@ describe('ConnectorsManager', () => { assert(connectorDeleteStub.notCalled); }); - it('should do nothing if the strategy does not allow to update', async () => { + it('should update non-match stages when canUpdate is false and match stages are the same', async () => { connector1._isRunning = true; - connector1._state.bucketsGotModified = false; + connector1._state.bucketsGotModified = true; connector1._buckets = new Set(['bucket1']); sinon.stub(connectorsManager._allocationStrategy, 'canUpdate') - .resolves(false); + .returns(false); + sinon.stub(connector1._kafkaConnect, 'getConnectorConfig').resolves({ + pipeline: JSON.stringify([{ $match: { 'ns.coll': { $in: ['bucket1'] } } }]), + }); + const updated = await connectorsManager._spawnOrDestroyConnector(connector1); + assert.strictEqual(updated, true); + assert(connectorUpdateStub.calledOnce); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + }); + + it('should skip update when canUpdate is false and match stages differ', async () => { + connector1._isRunning = true; + connector1._state.bucketsGotModified = true; + connector1._buckets = new Set(['bucket1']); + sinon.stub(connectorsManager._allocationStrategy, 'canUpdate') + .returns(false); + sinon.stub(connector1._kafkaConnect, 'getConnectorConfig').resolves({ + pipeline: JSON.stringify([{ $match: { 'ns.coll': { $in: ['different-bucket'] } } }]), + }); const updated = await connectorsManager._spawnOrDestroyConnector(connector1); assert.strictEqual(updated, false); + assert(connectorUpdateStub.notCalled); assert(connectorCreateStub.notCalled); assert(connectorDeleteStub.notCalled); }); From bf4ac4cfa20ba5c442ec884823b7e51042e794bc Mon Sep 17 00:00:00 2001 From: Thomas Flament Date: Tue, 24 Feb 2026 12:06:21 +0100 Subject: [PATCH 4/5] oplog: Use local _liveBuckets instead of unnecessary pipeline fetch Issue: BB-491 --- .../oplogPopulator/modules/Connector.js | 14 ++--- tests/unit/oplogPopulator/Connector.js | 63 ++++++++++++++----- 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index c296b380d..c3f09577b 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -45,6 +45,7 @@ class Connector extends EventEmitter { this._name = params.name; this._config = params.config; this._buckets = new Set(params.buckets); + this._liveBuckets = new Set(params.buckets); this._isRunning = params.isRunning; this._getPipeline = params.getPipeline; this._state = { @@ -166,6 +167,7 @@ class Connector extends EventEmitter { config: this._config, }); this._isRunning = true; + this._liveBuckets = new Set(this._buckets); } catch (err) { this._logger.error('Error while spawning connector', { method: 'Connector.spawn', @@ -193,6 +195,7 @@ class Connector extends EventEmitter { this.emit(constants.connectorUpdatedEvent, this); await this._kafkaConnect.deleteConnector(this._name); this._isRunning = false; + this._liveBuckets = new Set(); // resetting the resume point to set a new one on creation of the connector delete this._config['startup.mode.timestamp.start.at.operation.time']; } catch (err) { @@ -330,13 +333,9 @@ class Connector extends EventEmitter { try { if (doUpdate && this._isRunning) { if (!updateSupported) { - const currentConfig = await this._kafkaConnect.getConnectorConfig(this._name); - const currentPipeline = currentConfig.pipeline - ? JSON.parse(currentConfig.pipeline) : []; - const newPipeline = JSON.parse(this._config.pipeline); - const currentMatch = JSON.stringify(currentPipeline[0]?.$match); - const newMatch = JSON.stringify(newPipeline[0]?.$match); - if (currentMatch !== newMatch) { + const sameBuckets = this._liveBuckets.size === this._buckets.size && + this._liveBuckets.isSubsetOf(this._buckets); + if (!sameBuckets) { this._logger.info('Skipping pipeline update: match stage differs', { method: 'Connector.updatePipeline', connector: this._name, @@ -348,6 +347,7 @@ class Connector extends EventEmitter { this._state.isUpdating = true; this.emit(constants.connectorUpdatedEvent, this); await this._kafkaConnect.updateConnectorConfig(this._name, this._config); + this._liveBuckets = new Set(this._buckets); this._updateConnectorState(false, timeBeforeUpdate); this._state.isUpdating = false; return true; diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index b6ddf2ce2..25544df43 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -62,6 +62,12 @@ describe('Connector', () => { await connector.spawn(); assert.notStrictEqual(partitionName, connector.config['offset.partition.name']); }); + it('should set liveBuckets to current buckets on spawn', async () => { + sinon.stub(connector._kafkaConnect, 'createConnector').resolves(); + connector._buckets = new Set(['bucket1', 'bucket2']); + await connector.spawn(); + assert.deepStrictEqual(connector._liveBuckets, new Set(['bucket1', 'bucket2'])); + }); it('should not try spawning a new connector when on is already existent', async () => { const createStub = sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); @@ -81,6 +87,13 @@ describe('Connector', () => { assert(deleteStub.calledOnceWith('example-connector')); assert.strictEqual(connector.isRunning, false); }); + it('should clear liveBuckets on destroy', async () => { + sinon.stub(connector._kafkaConnect, 'deleteConnector').resolves(); + connector._isRunning = true; + connector._liveBuckets = new Set(['bucket1']); + await connector.destroy(); + assert.deepStrictEqual(connector._liveBuckets, new Set()); + }); it('should not try destroying a new connector when connector is already destroyed', async () => { const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector') .resolves(); @@ -258,16 +271,13 @@ describe('Connector', () => { assert(updateStub.notCalled); }); - it('should update when updateSupported is false and match stages are the same', async () => { - const matchStage = { $match: { 'ns.coll': { $in: ['bucket1'] } } }; - const newPipeline = JSON.stringify([matchStage, { $set: { 'fullDocument.value.location': {} } }]); + it('should update when updateSupported is false and live buckets match', async () => { + connector._buckets = new Set(['bucket1']); + connector._liveBuckets = new Set(['bucket1']); connector._state.bucketsGotModified = true; connector._state.isUpdating = false; connector._isRunning = true; - sinon.stub(connector, '_getPipeline').returns(newPipeline); - sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({ - pipeline: JSON.stringify([matchStage]), - }); + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') .resolves(); const didUpdate = await connector.updatePipeline(true, false); @@ -275,25 +285,44 @@ describe('Connector', () => { assert(updateStub.calledOnce); }); - it('should skip update when updateSupported is false and match stages differ', async () => { - const newPipeline = JSON.stringify([ - { $match: { 'ns.coll': { $in: ['bucket1'] } } }, - ]); + it('should skip update when updateSupported is false and live buckets differ', async () => { + connector._buckets = new Set(['bucket1']); + connector._liveBuckets = new Set(['different-bucket']); connector._state.bucketsGotModified = true; connector._state.isUpdating = false; connector._isRunning = true; - sinon.stub(connector, '_getPipeline').returns(newPipeline); - sinon.stub(connector._kafkaConnect, 'getConnectorConfig').resolves({ - pipeline: JSON.stringify([ - { $match: { 'ns.coll': { $in: ['different-bucket'] } } }, - ]), - }); + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') .resolves(); const didUpdate = await connector.updatePipeline(true, false); assert.strictEqual(didUpdate, false); assert(updateStub.notCalled); }); + + it('should sync liveBuckets after successful update', async () => { + connector._buckets = new Set(['bucket1', 'bucket2']); + connector._liveBuckets = new Set(); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); + sinon.stub(connector._kafkaConnect, 'updateConnectorConfig').resolves(); + await connector.updatePipeline(true); + assert.deepStrictEqual(connector._liveBuckets, new Set(['bucket1', 'bucket2'])); + }); + + it('should not sync liveBuckets after failed update', async () => { + connector._buckets = new Set(['bucket1']); + connector._liveBuckets = new Set(); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); + sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') + .rejects(errors.InternalError); + await assert.rejects(() => connector.updatePipeline(true)); + assert.deepStrictEqual(connector._liveBuckets, new Set()); + }); }); describe('getConfigSizeInBytes', () => { From db7640bf720972b8200ee597f6ab3b4d9e0ce940 Mon Sep 17 00:00:00 2001 From: Thomas Flament Date: Tue, 24 Feb 2026 19:32:04 +0100 Subject: [PATCH 5/5] Use sourceCheckIfSizeGreaterThanMB for stripping locations Issue: BB-491 --- conf/config.json | 1 - extensions/oplogPopulator/OplogPopulator.js | 6 +-- .../OplogPopulatorConfigValidator.js | 2 +- .../oplogPopulator/OplogPopulatorTask.js | 6 +++ .../MultipleBucketsPipelineFactory.js | 6 +-- .../pipeline/PipelineFactory.js | 44 ++++++++++--------- .../pipeline/WildcardPipelineFactory.js | 6 +-- .../unit/oplogPopulator/ConnectorsManager.js | 12 ++--- .../OplogPopulatorConfigValidator.js | 6 +-- .../MultipleBucketsPipelineFactory.js | 20 +++++++-- .../pipeline/WildcardPipelineFactory.js | 22 +++++++--- 11 files changed, 80 insertions(+), 51 deletions(-) diff --git a/conf/config.json b/conf/config.json index 96cf56857..46c590d60 100644 --- a/conf/config.json +++ b/conf/config.json @@ -304,7 +304,6 @@ "kafkaConnectHost": "127.0.0.1", "kafkaConnectPort": 8083, "numberOfConnectors": 1, - "locationStrippingThreshold": 500, "probeServer": { "bindAddress": "0.0.0.0", "port": 8556 diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index b9a16b97f..b7b4d0305 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -335,7 +335,7 @@ class OplogPopulator { if (this._config.numberOfConnectors === 0) { // If the number of connector is set to 0, then we // use a single connector to listen to the whole DB. - pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingThreshold); + pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingBytesThreshold); strategy = new UniqueConnector({ logger: this._logger, }); @@ -345,7 +345,7 @@ class OplogPopulator { // words, we cannot alter an existing pipeline. In this // case, the strategy is to allow a maximum of one // bucket per kafka connector. - pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold); strategy = new ImmutableConnector({ logger: this._logger, }); @@ -354,7 +354,7 @@ class OplogPopulator { // kafka connector. However, we want to proactively // ensure that the pipeline will be accepted by // mongodb. - pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold); strategy = new LeastFullConnector({ logger: this._logger, }); diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index 9f53d52fe..c65d5dbee 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -6,7 +6,7 @@ const joiSchema = joi.object({ kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), numberOfConnectors: joi.number().required().min(0), - locationStrippingThreshold: joi.number().default(5), + locationStrippingBytesThreshold: joi.number().min(0).default(0), prefix: joi.string().optional(), probeServer: probeServerJoi.default(), connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'), diff --git a/extensions/oplogPopulator/OplogPopulatorTask.js b/extensions/oplogPopulator/OplogPopulatorTask.js index cd6168769..42fa00d0c 100644 --- a/extensions/oplogPopulator/OplogPopulatorTask.js +++ b/extensions/oplogPopulator/OplogPopulatorTask.js @@ -20,6 +20,12 @@ werelogs.configure({ level: config.log.logLevel, const mongoConfig = config.queuePopulator.mongo; const oplogPopulatorConfig = config.extensions.oplogPopulator; +const replicationQPConfig = config.extensions.replication && + config.extensions.replication.queueProcessor; +if (replicationQPConfig && replicationQPConfig.sourceCheckIfSizeGreaterThanMB) { + oplogPopulatorConfig.locationStrippingBytesThreshold = + replicationQPConfig.sourceCheckIfSizeGreaterThanMB * 1024 * 1024; +} const activeExtensions = [ 'notification', diff --git a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index d0f25da40..43f4e404d 100644 --- a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -10,10 +10,10 @@ const PipelineFactory = require('./PipelineFactory'); class MultipleBucketsPipelineFactory extends PipelineFactory { /** * @constructor - * @param {number} locationStrippingThreshold threshold for stripping location data + * @param {number} locationStrippingBytesThreshold threshold for stripping location data */ - constructor(locationStrippingThreshold) { - super(locationStrippingThreshold); + constructor(locationStrippingBytesThreshold) { + super(locationStrippingBytesThreshold); // getPipeline is used standalone later, make sure its `this` reference binds to us. this.getPipeline = this.getPipeline.bind(this); } diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index e435fdf49..0dd2480d9 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -9,10 +9,10 @@ const { wildCardForAllBuckets } = require('../constants'); class PipelineFactory { /** * @constructor - * @param {number} locationStrippingThreshold threshold for stripping location data + * @param {number} locationStrippingBytesThreshold threshold for stripping location data */ - constructor(locationStrippingThreshold) { - this._locationStrippingThreshold = locationStrippingThreshold; + constructor(locationStrippingBytesThreshold) { + this._locationStrippingBytesThreshold = locationStrippingBytesThreshold; } /** @@ -71,31 +71,35 @@ class PipelineFactory { const pipeline = [ stage, ]; - if (this._locationStrippingThreshold >= 0) { + if (this._locationStrippingBytesThreshold > 0) { pipeline.push({ $set: { - 'fullDocument.value.location': - this._locationStrippingExpression('fullDocument.value.location'), - 'updateDescription.updatedFields.value.location': - this._locationStrippingExpression('updateDescription.updatedFields.value.location'), + 'fullDocument.value.location': { + $cond: { + if: { $gte: [ + '$fullDocument.value.content-length', + this._locationStrippingBytesThreshold, + ] }, + then: '$$REMOVE', + else: '$fullDocument.value.location', + }, + }, + 'updateDescription.updatedFields.value.location': { + $cond: { + if: { $gte: [ + '$updateDescription.updatedFields.value.content-length', + this._locationStrippingBytesThreshold, + ] }, + then: '$$REMOVE', + else: '$updateDescription.updatedFields.value.location', + }, + }, } }); } return JSON.stringify(pipeline); } - _locationStrippingExpression(field) { - return { - $switch: { - branches: [ - { case: { $not: [{ $isArray: `$${field}` }] }, then: '$$REMOVE' }, - { case: { $gte: [{ $size: `$${field}` }, this._locationStrippingThreshold] }, then: '$$REMOVE' }, - ], - default: `$${field}`, - } - }; - } - /** * Makes connector pipeline stage, to then be used by getPipeline. * @param {string[] | undefined} buckets buckets assigned to this connector diff --git a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js index 4e8e35994..fd90b9ea6 100644 --- a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -11,10 +11,10 @@ const PipelineFactory = require('./PipelineFactory'); class WildcardPipelineFactory extends PipelineFactory { /** * @constructor - * @param {number} locationStrippingThreshold threshold for stripping location data + * @param {number} locationStrippingBytesThreshold threshold for stripping location data */ - constructor(locationStrippingThreshold) { - super(locationStrippingThreshold); + constructor(locationStrippingBytesThreshold) { + super(locationStrippingBytesThreshold); // getPipeline is used standalone later, make sure its `this` reference binds to us. this.getPipeline = this.getPipeline.bind(this); } diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 732a34652..6c81e8666 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -285,15 +285,13 @@ describe('ConnectorsManager', () => { assert(connectorDeleteStub.notCalled); }); - it('should update non-match stages when canUpdate is false and match stages are the same', async () => { + it('should update non-match stages when canUpdate is false and live buckets match', async () => { connector1._isRunning = true; connector1._state.bucketsGotModified = true; connector1._buckets = new Set(['bucket1']); + connector1._liveBuckets = new Set(['bucket1']); sinon.stub(connectorsManager._allocationStrategy, 'canUpdate') .returns(false); - sinon.stub(connector1._kafkaConnect, 'getConnectorConfig').resolves({ - pipeline: JSON.stringify([{ $match: { 'ns.coll': { $in: ['bucket1'] } } }]), - }); const updated = await connectorsManager._spawnOrDestroyConnector(connector1); assert.strictEqual(updated, true); assert(connectorUpdateStub.calledOnce); @@ -301,15 +299,13 @@ describe('ConnectorsManager', () => { assert(connectorDeleteStub.notCalled); }); - it('should skip update when canUpdate is false and match stages differ', async () => { + it('should skip update when canUpdate is false and live buckets differ', async () => { connector1._isRunning = true; connector1._state.bucketsGotModified = true; connector1._buckets = new Set(['bucket1']); + connector1._liveBuckets = new Set(['different-bucket']); sinon.stub(connectorsManager._allocationStrategy, 'canUpdate') .returns(false); - sinon.stub(connector1._kafkaConnect, 'getConnectorConfig').resolves({ - pipeline: JSON.stringify([{ $match: { 'ns.coll': { $in: ['different-bucket'] } } }]), - }); const updated = await connectorsManager._spawnOrDestroyConnector(connector1); assert.strictEqual(updated, false); assert(connectorUpdateStub.notCalled); diff --git a/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js index 991d25809..d7030b456 100644 --- a/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js @@ -13,15 +13,15 @@ const defaultConfig = { }; describe('OplogPopulatorConfigValidator', () => { - describe('locationStrippingThreshold validation', () => { + describe('locationStrippingBytesThreshold validation', () => { it('should accept valid threshold', () => { const config = { ...defaultConfig, - locationStrippingThreshold: 50, + locationStrippingBytesThreshold: 100 * 1024 * 1024, }; const result = OplogPopulatorConfigJoiSchema.validate(config); assert.ifError(result.error); - assert.strictEqual(result.value.locationStrippingThreshold, 50); + assert.strictEqual(result.value.locationStrippingBytesThreshold, 100 * 1024 * 1024); }); }); }); diff --git a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index 50084c251..798356bd4 100644 --- a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -4,7 +4,8 @@ const MultipleBucketsPipelineFactory = const { constants } = require('arsenal'); describe('MultipleBucketsPipelineFactory', () => { - const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(200); + const thresholdBytes = 100 * 1024 * 1024; + const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(thresholdBytes); describe('isValid', () => { it('should detect a valid list of buckets', () => { @@ -52,9 +53,20 @@ describe('MultipleBucketsPipelineFactory', () => { assert.strictEqual(pipeline.length, 2); assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}}); - assert(pipeline[1].$set['fullDocument.value.location']); - assert(pipeline[1].$set['updateDescription.updatedFields.value.location']); - assert(result.includes('200')); + assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + $cond: { + if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$fullDocument.value.location', + }, + }); + assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + $cond: { + if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$updateDescription.updatedFields.value.location', + }, + }); }); }); diff --git a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js index d906840fe..af13b034f 100644 --- a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -3,7 +3,8 @@ const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/p const { constants } = require('arsenal'); describe('WildcardPipelineFactory', () => { - const wildcardPipelineFactory = new WildcardPipelineFactory(200); + const thresholdBytes = 100 * 1024 * 1024; + const wildcardPipelineFactory = new WildcardPipelineFactory(thresholdBytes); describe('isValid', () => { it('should detect a wildcard', () => { @@ -39,13 +40,24 @@ describe('WildcardPipelineFactory', () => { assert.strictEqual(pipeline.length, 2); assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}}); - assert(pipeline[1].$set['fullDocument.value.location']); - assert(pipeline[1].$set['updateDescription.updatedFields.value.location']); - assert(result.includes('200')); + assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + $cond: { + if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$fullDocument.value.location', + }, + }); + assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + $cond: { + if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$updateDescription.updatedFields.value.location', + }, + }); }); it('should return the pipeline with buckets and no location stripping if disabled', () => { - const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(-1); + const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(0); const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets);