diff --git a/conf/config.json b/conf/config.json index 46c590d60b..65052c6337 100644 --- a/conf/config.json +++ b/conf/config.json @@ -304,6 +304,7 @@ "kafkaConnectHost": "127.0.0.1", "kafkaConnectPort": 8083, "numberOfConnectors": 1, + "locationStrippingThreshold": 100, "probeServer": { "bindAddress": "0.0.0.0", "port": 8556 diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index dd733374f2..b9a16b97f3 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -335,7 +335,7 @@ class OplogPopulator { if (this._config.numberOfConnectors === 0) { // If the number of connector is set to 0, then we // use a single connector to listen to the whole DB. - pipelineFactory = new WildcardPipelineFactory(); + pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingThreshold); strategy = new UniqueConnector({ logger: this._logger, }); @@ -345,7 +345,7 @@ class OplogPopulator { // words, we cannot alter an existing pipeline. In this // case, the strategy is to allow a maximum of one // bucket per kafka connector. - pipelineFactory = new MultipleBucketsPipelineFactory(); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold); strategy = new ImmutableConnector({ logger: this._logger, }); @@ -354,7 +354,7 @@ class OplogPopulator { // kafka connector. However, we want to proactively // ensure that the pipeline will be accepted by // mongodb. - pipelineFactory = new MultipleBucketsPipelineFactory(); + pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold); strategy = new LeastFullConnector({ logger: this._logger, }); diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index cb2d1b00ba..3420259a33 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -6,6 +6,7 @@ const joiSchema = joi.object({ kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), numberOfConnectors: joi.number().required().min(0), + locationStrippingThreshold: joi.number().min(0).default(100), prefix: joi.string().optional(), probeServer: probeServerJoi.default(), connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'), diff --git a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index 5202d83fa6..160e58000e 100644 --- a/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -8,6 +8,16 @@ const PipelineFactory = require('./PipelineFactory'); * given a list of buckets. */ class MultipleBucketsPipelineFactory extends PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingThreshold threshold for stripping location data + */ + constructor(locationStrippingThreshold = 100) { + super(locationStrippingThreshold); + // getPipeline is used standalone later, make sure its this binds to us. + this.getPipeline = this.getPipeline.bind(this); + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -38,7 +48,8 @@ class MultipleBucketsPipelineFactory extends PipelineFactory { $in: buckets, } } - } + }, + this._getLocationStrippingStage(), ]); } } diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index 8306a0207a..4212e7da2b 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -7,6 +7,14 @@ const { wildCardForAllBuckets } = require('../constants'); * @classdesc base class for the pipeline factories */ class PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingThreshold threshold for stripping location data + */ + constructor(locationStrippingThreshold = 100) { + this._locationStrippingThreshold = locationStrippingThreshold; + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -58,6 +66,31 @@ class PipelineFactory { getPipeline(buckets) { // eslint-disable-line no-unused-vars throw errors.NotImplemented; } + + /** + * Builds a conditional location stripping stage. + * Excludes location[] from Kafka if there are too many items (to save space). + * Keeps location[] for small objects (to avoid extra S3 fetches). + * @returns {Object} MongoDB $set aggregation stage + */ + _getLocationStrippingStage() { + return { + $set: { + 'fullDocument.value.location': { + $cond: { + if: { + $gte: [ + { $size: { $ifNull: ['$fullDocument.value.location', []] } }, + this._locationStrippingThreshold + ] + }, + then: '$$REMOVE', + else: '$fullDocument.value.location' + } + } + } + }; + } } module.exports = PipelineFactory; diff --git a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js index 14eec35429..58a3f83d9c 100644 --- a/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -9,6 +9,16 @@ const PipelineFactory = require('./PipelineFactory'); * that listens to all buckets. */ class WildcardPipelineFactory extends PipelineFactory { + /** + * @constructor + * @param {number} locationStrippingThreshold threshold for stripping location data + */ + constructor(locationStrippingThreshold = 100) { + super(locationStrippingThreshold); + // getPipeline is used standalone later, make sure its this binds to us. + this.getPipeline = this.getPipeline.bind(this); + } + /** * Checks if an existing pipeline is valid against the current * factory. @@ -38,7 +48,8 @@ class WildcardPipelineFactory extends PipelineFactory { }, } } - } + }, + this._getLocationStrippingStage(), ]); } } diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 4e31169c7e..9dc4267bba 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -823,7 +823,8 @@ class ReplicateObject extends BackbeatTask { }); } - processQueueEntry(sourceEntry, kafkaEntry, done) { + processQueueEntry(_sourceEntry, kafkaEntry, done) { + let sourceEntry = _sourceEntry; const log = this.logger.newRequestLogger(); const destEntry = sourceEntry.toReplicaEntry(this.site); @@ -864,12 +865,34 @@ class ReplicateObject extends BackbeatTask { this._setTargetAccountMd(destEntry, targetRole, log, next); }, next => { - if (!mdOnly && - sourceEntry.getContentLength() / 1000000 >= - this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) { - return this._checkSourceReplication(sourceEntry, log, next); + if (mdOnly) { + return next(); + } + const isLargeObject = sourceEntry.getContentLength() / 1000000 >= + this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB; + const isLocationStripped = sourceEntry.getLocation().length === 0; + if (!isLargeObject && !isLocationStripped) { + return next(); } - return next(); + return this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { + if (err) { + return next(err); + } + if (isLargeObject) { + const status = refreshedEntry.getReplicationSiteStatus(this.site); + if (status === 'COMPLETED') { + log.info('replication already completed, skipping', { + entry: sourceEntry.getLogInfo(), + }); + return next(errorAlreadyCompleted); + } + } + if (isLocationStripped) { + // Reassign sourceEntry to use fresh metadata + sourceEntry = refreshedEntry; + } + return next(); + }); }, // Get data from source bucket and put it on the target bucket next => { diff --git a/tests/functional/replication/queueProcessor.js b/tests/functional/replication/queueProcessor.js index 5a0f36a84a..e05ff6449f 100644 --- a/tests/functional/replication/queueProcessor.js +++ b/tests/functional/replication/queueProcessor.js @@ -1031,7 +1031,44 @@ describe('queue processor functional tests with mocking', () => { }); }); - it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => { + it('should fetch object MD from S3 for location data when missing', done => { + // Create a Kafka entry with its location[] field stripped + const originalKafkaEntry = s3mock.getParam('kafkaEntry'); + const kafkaValue = JSON.parse(originalKafkaEntry.value); + delete kafkaValue.location; + const kafkaEntry = { + ...originalKafkaEntry, + value: JSON.stringify(kafkaValue), + }; + + s3mock.setParam('contentLength', 1000); + let checkMdCalled = false; + s3mock.setParam('routes.source.s3.getMetadata.handler', + (req, url, query, res) => { + checkMdCalled = true; + s3mock.resetParam('routes.source.s3.getMetadata.handler'); + s3mock._getMetadataSource(req, url, query, res); + }, { _static: true }); + + async.parallel([ + done => { + s3mock.onPutSourceMd = done; + }, + done => queueProcessorSF.processReplicationEntry( + kafkaEntry, err => { + assert.ifError(err); + assert.strictEqual(s3mock.hasPutTargetData, true); + assert(s3mock.hasPutTargetMd); + assert(checkMdCalled); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); + + it('should not check object MD for small objects with location', done => { s3mock.setParam('contentLength', 1); let checkMdCalled = false; s3mock.setParam('routes.source.s3.getMetadata.handler', diff --git a/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js new file mode 100644 index 0000000000..f2c8982a97 --- /dev/null +++ b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js @@ -0,0 +1,34 @@ +const assert = require('assert'); +const { OplogPopulatorConfigJoiSchema} = require('../../../extensions/oplogPopulator/OplogPopulatorConfigValidator'); + +const defaultConfig = { + topic: 'backbeat-oplog', + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, + numberOfConnectors: 1, + probeServer: { + bindAddress: '0.0.0.0', + port: 8556, + }, +}; + +describe('OplogPopulatorConfigValidator', () => { + describe('locationStrippingThreshold validation', () => { + it('should accept valid threshold', () => { + const config = { + ...defaultConfig, + locationStrippingThreshold: 50, + }; + const result = OplogPopulatorConfigJoiSchema.validate(config); + assert.ifError(result.error); + assert.strictEqual(result.value.locationStrippingThreshold, 50); + }); + + it('should use default of 100 when not specified', () => { + const config = { ...defaultConfig }; + const result = OplogPopulatorConfigJoiSchema.validate(config); + assert.ifError(result.error); + assert.strictEqual(result.value.locationStrippingThreshold, 100); + }); + }); +}); diff --git a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index a9b68c77a8..48196ad6a6 100644 --- a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -4,7 +4,7 @@ const MultipleBucketsPipelineFactory = const { constants } = require('arsenal'); describe('MultipleBucketsPipelineFactory', () => { - const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(); + const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(200); describe('isValid', () => { it('should detect a valid list of buckets', () => { @@ -45,10 +45,15 @@ describe('MultipleBucketsPipelineFactory', () => { assert.strictEqual(result, '[]'); }); - it('should return the pipeline with buckets', () => { + it('should return the pipeline with buckets and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = multipleBucketsPipelineFactory.getPipeline(buckets); - assert.strictEqual(result, '[{"$match":{"ns.coll":{"$in":["bucket1","bucket2"]}}}]'); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}}); + assert(pipeline[1].$set['fullDocument.value.location']); + assert(result.includes('200')); }); }); diff --git a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js index 01a191f66d..0b852dfe15 100644 --- a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -3,7 +3,7 @@ const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/p const { constants } = require('arsenal'); describe('WildcardPipelineFactory', () => { - const wildcardPipelineFactory = new WildcardPipelineFactory(); + const wildcardPipelineFactory = new WildcardPipelineFactory(200); describe('isValid', () => { it('should detect a wildcard', () => { @@ -32,10 +32,15 @@ describe('WildcardPipelineFactory', () => { }); describe('getPipeline', () => { - it('should return the pipeline with wildcard', () => { + it('should return the pipeline with buckets and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactory.getPipeline(buckets); - assert.strictEqual(result, '[{"$match":{"ns.coll":{"$not":{"$regex":"^(mpuShadowBucket|__).*"}}}}]'); + const pipeline = JSON.parse(result); + + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}}); + assert(pipeline[1].$set['fullDocument.value.location']); + assert(result.includes('200')); }); });