diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index dd733374f2..b7b4d0305d 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -335,7 +335,7 @@ class OplogPopulator { if (this._config.numberOfConnectors === 0) { // If the number of connector is set to 0, then we // use a single connector to listen to the whole DB. - pipelineFactory = new WildcardPipelineFactory(); + pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingBytesThreshold); strategy = new UniqueConnector({ logger: this._logger, }); @@ -345,7 +345,7 @@ class OplogPopulator { // words, we cannot alter an existing pipeline. In this // case, the strategy is to allow a maximum of one // bucket per kafka connector. - pipelineFactory = new MultipleBucketsPipelineFactory(); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold); strategy = new ImmutableConnector({ logger: this._logger, }); @@ -354,7 +354,7 @@ class OplogPopulator { // kafka connector. However, we want to proactively // ensure that the pipeline will be accepted by // mongodb. - pipelineFactory = new MultipleBucketsPipelineFactory(); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold); strategy = new LeastFullConnector({ logger: this._logger, }); diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index cb2d1b00ba..c65d5dbeed 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -6,6 +6,7 @@ const joiSchema = joi.object({ kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), numberOfConnectors: joi.number().required().min(0), + locationStrippingBytesThreshold: joi.number().min(0).default(0), prefix: joi.string().optional(), probeServer: probeServerJoi.default(), connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'), diff --git a/extensions/oplogPopulator/OplogPopulatorTask.js b/extensions/oplogPopulator/OplogPopulatorTask.js index cd6168769e..42fa00d0c1 100644 --- a/extensions/oplogPopulator/OplogPopulatorTask.js +++ b/extensions/oplogPopulator/OplogPopulatorTask.js @@ -20,6 +20,12 @@ werelogs.configure({ level: config.log.logLevel, const mongoConfig = config.queuePopulator.mongo; const oplogPopulatorConfig = config.extensions.oplogPopulator; +const replicationQPConfig = config.extensions.replication && + config.extensions.replication.queueProcessor; +if (replicationQPConfig && replicationQPConfig.sourceCheckIfSizeGreaterThanMB) { + oplogPopulatorConfig.locationStrippingBytesThreshold = + replicationQPConfig.sourceCheckIfSizeGreaterThanMB * 1024 * 1024; +} const activeExtensions = [ 'notification', diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index 9e0977c713..c3f09577bd 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -45,6 +45,7 @@ class Connector extends EventEmitter { this._name = params.name; this._config = params.config; this._buckets = new Set(params.buckets); + this._liveBuckets = new Set(params.buckets); this._isRunning = params.isRunning; this._getPipeline = params.getPipeline; this._state = { @@ -166,6 +167,7 @@ class Connector extends EventEmitter { config: this._config, }); this._isRunning = true; + this._liveBuckets = new Set(this._buckets); } catch (err) { this._logger.error('Error while spawning connector', { method: 'Connector.spawn', @@ -193,6 +195,7 @@ class Connector extends EventEmitter { this.emit(constants.connectorUpdatedEvent, this); await this._kafkaConnect.deleteConnector(this._name); this._isRunning = false; + this._liveBuckets = new Set(); // resetting the resume point to set a new one on creation of the connector delete this._config['startup.mode.timestamp.start.at.operation.time']; } catch (err) { @@ -314,10 +317,14 @@ class Connector extends EventEmitter { * That is why we use updateConnectorConfig() instead of * updateConnectorPipeline() * @param {boolean} [doUpdate=false] updates connector if true + * @param {boolean} [updateSupported=true] whether full pipeline updates + * (including match stage changes) are supported. When false, the update + * is only applied if the match stage has not changed, to protect resume + * tokens on MongoDB >= 6.0. * @returns {Promise|boolean} connector did update * @throws {InternalError} */ - async updatePipeline(doUpdate = false) { + async updatePipeline(doUpdate = false, updateSupported = true) { // Only update when buckets changed and when not already updating if (!this._state.bucketsGotModified || this._state.isUpdating) { return false; @@ -325,10 +332,22 @@ class Connector extends EventEmitter { this._config.pipeline = this._getPipeline([...this._buckets]); try { if (doUpdate && this._isRunning) { + if (!updateSupported) { + const sameBuckets = this._liveBuckets.size === this._buckets.size && + this._liveBuckets.isSubsetOf(this._buckets); + if (!sameBuckets) { + this._logger.info('Skipping pipeline update: match stage differs', { + method: 'Connector.updatePipeline', + connector: this._name, + }); + return false; + } + } const timeBeforeUpdate = Date.now(); this._state.isUpdating = true; this.emit(constants.connectorUpdatedEvent, this); await this._kafkaConnect.updateConnectorConfig(this._name, this._config); + this._liveBuckets = new Set(this._buckets); this._updateConnectorState(false, timeBeforeUpdate); this._state.isUpdating = false; return true; diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index c15ca13550..8c50ef3311 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -267,8 +267,8 @@ class ConnectorsManager extends EventEmitter { connector: connector.name, }); return true; - } else if (connector.isRunning && this._allocationStrategy.canUpdate()) { - return connector.updatePipeline(true); + } else if (connector.isRunning) { + return connector.updatePipeline(true, this._allocationStrategy.canUpdate()); } return false; diff --git a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index 5202d83fa6..43f4e404df 100644 --- a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -8,6 +8,16 @@ const PipelineFactory = require('./PipelineFactory'); * given a list of buckets. */ class MultipleBucketsPipelineFactory extends PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingBytesThreshold threshold for stripping location data + */ + constructor(locationStrippingBytesThreshold) { + super(locationStrippingBytesThreshold); + // getPipeline is used standalone later, make sure its `this` reference binds to us. + this.getPipeline = this.getPipeline.bind(this); + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -22,24 +32,21 @@ class MultipleBucketsPipelineFactory extends PipelineFactory { } /** - * Makes new connector pipeline that includes - * buckets assigned to this connector. + * Makes connector pipeline stage, that includes buckets assigned to this connector. * @param {string[] | undefined} buckets buckets assigned to this connector - * @returns {string} new connector pipeline + * @returns {object} connector pipeline stage */ - getPipeline(buckets) { + getPipelineStage(buckets) { if (!buckets || !buckets.length) { - return JSON.stringify([]); + return null; } - return JSON.stringify([ - { - $match: { - 'ns.coll': { - $in: buckets, - } + return { + $match: { + 'ns.coll': { + $in: buckets, } } - ]); + }; } } diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index 8306a0207a..0dd2480d9c 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -7,6 +7,14 @@ const { wildCardForAllBuckets } = require('../constants'); * @classdesc base class for the pipeline factories */ class PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingBytesThreshold threshold for stripping location data + */ + constructor(locationStrippingBytesThreshold) { + this._locationStrippingBytesThreshold = locationStrippingBytesThreshold; + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -55,7 +63,49 @@ class PipelineFactory { * @param {string[] | undefined} buckets buckets assigned to this connector * @returns {string} new connector pipeline */ - getPipeline(buckets) { // eslint-disable-line no-unused-vars + getPipeline(buckets) { + const stage = this.getPipelineStage(buckets); + if (!stage) { + return JSON.stringify([]); + } + const pipeline = [ + stage, + ]; + if (this._locationStrippingBytesThreshold > 0) { + pipeline.push({ + $set: { + 'fullDocument.value.location': { + $cond: { + if: { $gte: [ + '$fullDocument.value.content-length', + this._locationStrippingBytesThreshold, + ] }, + then: '$$REMOVE', + else: '$fullDocument.value.location', + }, + }, + 'updateDescription.updatedFields.value.location': { + $cond: { + if: { $gte: [ + '$updateDescription.updatedFields.value.content-length', + this._locationStrippingBytesThreshold, + ] }, + then: '$$REMOVE', + else: '$updateDescription.updatedFields.value.location', + }, + }, + } + }); + } + return JSON.stringify(pipeline); + } + + /** + * Makes connector pipeline stage, to then be used by getPipeline. + * @param {string[] | undefined} buckets buckets assigned to this connector + * @returns {object} connector pipeline stage + */ + getPipelineStage(buckets) { // eslint-disable-line no-unused-vars throw errors.NotImplemented; } } diff --git a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js index 14eec35429..fd90b9ea63 100644 --- a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -9,6 +9,16 @@ const PipelineFactory = require('./PipelineFactory'); * that listens to all buckets. */ class WildcardPipelineFactory extends PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingBytesThreshold threshold for stripping location data + */ + constructor(locationStrippingBytesThreshold) { + super(locationStrippingBytesThreshold); + // getPipeline is used standalone later, make sure its `this` reference binds to us. + this.getPipeline = this.getPipeline.bind(this); + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -23,23 +33,20 @@ class WildcardPipelineFactory extends PipelineFactory { } /** - * Create a pipeline for the connector, to listen to all - * non-special collections. + * Makes connector pipeline stage, to listen to all non-special collections. * @param {string[] | undefined} buckets buckets assigned to this connector - * @returns {string} new connector pipeline + * @returns {object} connector pipeline stage */ - getPipeline(buckets) { // eslint-disable-line no-unused-vars - return JSON.stringify([ - { - $match: { - 'ns.coll': { - $not: { - $regex: `^(${constants.mpuBucketPrefix}|__).*`, - }, - } + getPipelineStage(buckets) { // eslint-disable-line no-unused-vars + return { + $match: { + 'ns.coll': { + $not: { + $regex: `^(${constants.mpuBucketPrefix}|__).*`, + }, } } - ]); + }; } } diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 27cc68cf9e..34a1d2496e 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -398,19 +398,6 @@ class ReplicateObject extends BackbeatTask { }); } - _checkSourceReplication(sourceEntry, log, cb) { - this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { - if (err) { - return cb(err); - } - const status = refreshedEntry.getReplicationSiteStatus(this.site); - if (status === 'COMPLETED') { - return cb(errorAlreadyCompleted); - } - return cb(); - }); - } - _getAndPutData(sourceEntry, destEntry, log, cb) { log.debug('replicating data', { entry: sourceEntry.getLogInfo() }); if (sourceEntry.getLocation().some(part => { @@ -825,7 +812,8 @@ class ReplicateObject extends BackbeatTask { }); } - processQueueEntry(sourceEntry, kafkaEntry, done) { + processQueueEntry(_sourceEntry, kafkaEntry, done) { + let sourceEntry = _sourceEntry; const log = this.logger.newRequestLogger(); const destEntry = sourceEntry.toReplicaEntry(this.site); @@ -866,12 +854,30 @@ class ReplicateObject extends BackbeatTask { this._setTargetAccountMd(destEntry, targetRole, log, next); }, next => { - if (!mdOnly && - sourceEntry.getContentLength() / 1000000 >= - this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) { - return this._checkSourceReplication(sourceEntry, log, next); + if (mdOnly) { + return next(); } - return next(); + const isLargeObject = sourceEntry.getContentLength() / 1000000 >= + this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB; + const isLocationStripped = sourceEntry.getContentLength() > 0 && sourceEntry.getLocation().length === 0; + if (!isLargeObject && !isLocationStripped) { + return next(); + } + return this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { + if (err) { + return next(err); + } + const status = refreshedEntry.getReplicationSiteStatus(this.site); + if (status === 'COMPLETED') { + log.info('replication already completed, skipping', { + entry: sourceEntry.getLogInfo(), + }); + return next(errorAlreadyCompleted); + } + // Reassign sourceEntry to use fresh metadata + sourceEntry = refreshedEntry; + return next(); + }); }, // Get data from source bucket and put it on the target bucket next => { diff --git a/tests/functional/replication/queueProcessor.js b/tests/functional/replication/queueProcessor.js index 5a0f36a84a..39938de9a3 100644 --- a/tests/functional/replication/queueProcessor.js +++ b/tests/functional/replication/queueProcessor.js @@ -1031,7 +1031,46 @@ describe('queue processor functional tests with mocking', () => { }); }); - it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => { + it('should fetch object MD from S3 for location data when missing', done => { + // Create a Kafka entry with its location[] field stripped + const originalKafkaEntry = s3mock.getParam('kafkaEntry'); + const kafkaValue = JSON.parse(originalKafkaEntry.value); + const kafkaData = JSON.parse(kafkaValue.value); + delete kafkaData.location; + kafkaValue.value = JSON.stringify(kafkaData); + const kafkaEntry = { + ...originalKafkaEntry, + value: JSON.stringify(kafkaValue), + }; + + s3mock.setParam('contentLength', 1000); + let checkMdCalled = false; + s3mock.setParam('routes.source.s3.getMetadata.handler', + (req, url, query, res) => { + checkMdCalled = true; + s3mock.resetParam('routes.source.s3.getMetadata.handler'); + s3mock._getMetadataSource(req, url, query, res); + }, { _static: true }); + + async.parallel([ + done => { + s3mock.onPutSourceMd = done; + }, + done => queueProcessorSF.processReplicationEntry( + kafkaEntry, err => { + assert.ifError(err); + assert.strictEqual(s3mock.hasPutTargetData, true); + assert(s3mock.hasPutTargetMd); + assert(checkMdCalled); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); + + it('should not check object MD for small objects with location', done => { s3mock.setParam('contentLength', 1); let checkMdCalled = false; s3mock.setParam('routes.source.s3.getMetadata.handler', diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index 7e9ef8f18f..25544df43c 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -62,6 +62,12 @@ describe('Connector', () => { await connector.spawn(); assert.notStrictEqual(partitionName, connector.config['offset.partition.name']); }); + it('should set liveBuckets to current buckets on spawn', async () => { + sinon.stub(connector._kafkaConnect, 'createConnector').resolves(); + connector._buckets = new Set(['bucket1', 'bucket2']); + await connector.spawn(); + assert.deepStrictEqual(connector._liveBuckets, new Set(['bucket1', 'bucket2'])); + }); it('should not try spawning a new connector when on is already existent', async () => { const createStub = sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); @@ -81,6 +87,13 @@ describe('Connector', () => { assert(deleteStub.calledOnceWith('example-connector')); assert.strictEqual(connector.isRunning, false); }); + it('should clear liveBuckets on destroy', async () => { + sinon.stub(connector._kafkaConnect, 'deleteConnector').resolves(); + connector._isRunning = true; + connector._liveBuckets = new Set(['bucket1']); + await connector.destroy(); + assert.deepStrictEqual(connector._liveBuckets, new Set()); + }); it('should not try destroying a new connector when connector is already destroyed', async () => { const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector') .resolves(); @@ -257,6 +270,59 @@ describe('Connector', () => { assert(pipelineStub.calledOnceWith([])); assert(updateStub.notCalled); }); + + it('should update when updateSupported is false and live buckets match', async () => { + connector._buckets = new Set(['bucket1']); + connector._liveBuckets = new Set(['bucket1']); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); + const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') + .resolves(); + const didUpdate = await connector.updatePipeline(true, false); + assert.strictEqual(didUpdate, true); + assert(updateStub.calledOnce); + }); + + it('should skip update when updateSupported is false and live buckets differ', async () => { + connector._buckets = new Set(['bucket1']); + connector._liveBuckets = new Set(['different-bucket']); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); + const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') + .resolves(); + const didUpdate = await connector.updatePipeline(true, false); + assert.strictEqual(didUpdate, false); + assert(updateStub.notCalled); + }); + + it('should sync liveBuckets after successful update', async () => { + connector._buckets = new Set(['bucket1', 'bucket2']); + connector._liveBuckets = new Set(); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); + sinon.stub(connector._kafkaConnect, 'updateConnectorConfig').resolves(); + await connector.updatePipeline(true); + assert.deepStrictEqual(connector._liveBuckets, new Set(['bucket1', 'bucket2'])); + }); + + it('should not sync liveBuckets after failed update', async () => { + connector._buckets = new Set(['bucket1']); + connector._liveBuckets = new Set(); + connector._state.bucketsGotModified = true; + connector._state.isUpdating = false; + connector._isRunning = true; + sinon.stub(connector, '_getPipeline').returns('example-pipeline'); + sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') + .rejects(errors.InternalError); + await assert.rejects(() => connector.updatePipeline(true)); + assert.deepStrictEqual(connector._liveBuckets, new Set()); + }); }); describe('getConfigSizeInBytes', () => { diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 45e587147d..6c81e86665 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -285,14 +285,30 @@ describe('ConnectorsManager', () => { assert(connectorDeleteStub.notCalled); }); - it('should do nothing if the strategy does not allow to update', async () => { + it('should update non-match stages when canUpdate is false and live buckets match', async () => { connector1._isRunning = true; - connector1._state.bucketsGotModified = false; + connector1._state.bucketsGotModified = true; connector1._buckets = new Set(['bucket1']); + connector1._liveBuckets = new Set(['bucket1']); sinon.stub(connectorsManager._allocationStrategy, 'canUpdate') - .resolves(false); + .returns(false); + const updated = await connectorsManager._spawnOrDestroyConnector(connector1); + assert.strictEqual(updated, true); + assert(connectorUpdateStub.calledOnce); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + }); + + it('should skip update when canUpdate is false and live buckets differ', async () => { + connector1._isRunning = true; + connector1._state.bucketsGotModified = true; + connector1._buckets = new Set(['bucket1']); + connector1._liveBuckets = new Set(['different-bucket']); + sinon.stub(connectorsManager._allocationStrategy, 'canUpdate') + .returns(false); const updated = await connectorsManager._spawnOrDestroyConnector(connector1); assert.strictEqual(updated, false); + assert(connectorUpdateStub.notCalled); assert(connectorCreateStub.notCalled); assert(connectorDeleteStub.notCalled); }); diff --git a/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js new file mode 100644 index 0000000000..d7030b456e --- /dev/null +++ b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js @@ -0,0 +1,27 @@ +const assert = require('assert'); +const { OplogPopulatorConfigJoiSchema} = require('../../../extensions/oplogPopulator/OplogPopulatorConfigValidator'); + +const defaultConfig = { + topic: 'backbeat-oplog', + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, + numberOfConnectors: 1, + probeServer: { + bindAddress: '0.0.0.0', + port: 8556, + }, +}; + +describe('OplogPopulatorConfigValidator', () => { + describe('locationStrippingBytesThreshold validation', () => { + it('should accept valid threshold', () => { + const config = { + ...defaultConfig, + locationStrippingBytesThreshold: 100 * 1024 * 1024, + }; + const result = OplogPopulatorConfigJoiSchema.validate(config); + assert.ifError(result.error); + assert.strictEqual(result.value.locationStrippingBytesThreshold, 100 * 1024 * 1024); + }); + }); +}); diff --git a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index a9b68c77a8..798356bd48 100644 --- a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -4,7 +4,8 @@ const MultipleBucketsPipelineFactory = const { constants } = require('arsenal'); describe('MultipleBucketsPipelineFactory', () => { - const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(); + const thresholdBytes = 100 * 1024 * 1024; + const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(thresholdBytes); describe('isValid', () => { it('should detect a valid list of buckets', () => { @@ -45,10 +46,27 @@ describe('MultipleBucketsPipelineFactory', () => { assert.strictEqual(result, '[]'); }); - it('should return the pipeline with buckets', () => { + it('should return the pipeline with buckets and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = multipleBucketsPipelineFactory.getPipeline(buckets); - assert.strictEqual(result, '[{"$match":{"ns.coll":{"$in":["bucket1","bucket2"]}}}]'); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}}); + assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + $cond: { + if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$fullDocument.value.location', + }, + }); + assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + $cond: { + if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$updateDescription.updatedFields.value.location', + }, + }); }); }); diff --git a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js index 01a191f66d..af13b034f7 100644 --- a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -3,7 +3,8 @@ const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/p const { constants } = require('arsenal'); describe('WildcardPipelineFactory', () => { - const wildcardPipelineFactory = new WildcardPipelineFactory(); + const thresholdBytes = 100 * 1024 * 1024; + const wildcardPipelineFactory = new WildcardPipelineFactory(thresholdBytes); describe('isValid', () => { it('should detect a wildcard', () => { @@ -32,10 +33,37 @@ describe('WildcardPipelineFactory', () => { }); describe('getPipeline', () => { - it('should return the pipeline with wildcard', () => { + it('should return the pipeline with buckets and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactory.getPipeline(buckets); - assert.strictEqual(result, '[{"$match":{"ns.coll":{"$not":{"$regex":"^(mpuShadowBucket|__).*"}}}}]'); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}}); + assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + $cond: { + if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$fullDocument.value.location', + }, + }); + assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + $cond: { + if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, + then: '$$REMOVE', + else: '$updateDescription.updatedFields.value.location', + }, + }); + }); + + it('should return the pipeline with buckets and no location stripping if disabled', () => { + const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(0); + + const buckets = ['bucket1', 'bucket2']; + const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 1); }); });