Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@
"kafkaConnectHost": "127.0.0.1",
"kafkaConnectPort": 8083,
"numberOfConnectors": 1,
"locationStrippingThreshold": 100,
"probeServer": {
"bindAddress": "0.0.0.0",
"port": 8556
Expand Down
6 changes: 3 additions & 3 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class OplogPopulator {
if (this._config.numberOfConnectors === 0) {
// If the number of connector is set to 0, then we
// use a single connector to listen to the whole DB.
pipelineFactory = new WildcardPipelineFactory();
pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingThreshold);
strategy = new UniqueConnector({
logger: this._logger,
});
Expand All @@ -345,7 +345,7 @@ class OplogPopulator {
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
pipelineFactory = new MultipleBucketsPipelineFactory();
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
strategy = new ImmutableConnector({
logger: this._logger,
});
Expand All @@ -354,7 +354,7 @@ class OplogPopulator {
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
pipelineFactory = new MultipleBucketsPipelineFactory();
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingThreshold);
strategy = new LeastFullConnector({
logger: this._logger,
});
Expand Down
1 change: 1 addition & 0 deletions extensions/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const joiSchema = joi.object({
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
numberOfConnectors: joi.number().required().min(0),
locationStrippingThreshold: joi.number().min(0).default(100),
prefix: joi.string().optional(),
probeServer: probeServerJoi.default(),
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ const PipelineFactory = require('./PipelineFactory');
* given a list of buckets.
*/
class MultipleBucketsPipelineFactory extends PipelineFactory {
/**
* @constructor
* @param {number} locationStrippingThreshold threshold for stripping location data
*/
constructor(locationStrippingThreshold = 100) {
super(locationStrippingThreshold);
// getPipeline is used standalone later, make sure its this binds to us.
this.getPipeline = this.getPipeline.bind(this);
}

/**
* Checks if an existing pipeline is valid against the current
* factory.
Expand Down Expand Up @@ -38,7 +48,8 @@ class MultipleBucketsPipelineFactory extends PipelineFactory {
$in: buckets,
}
}
}
},
this._getLocationStrippingStage(),
]);
}
}
Expand Down
33 changes: 33 additions & 0 deletions extensions/oplogPopulator/pipeline/PipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ const { wildCardForAllBuckets } = require('../constants');
* @classdesc base class for the pipeline factories
*/
class PipelineFactory {
/**
* @constructor
* @param {number} locationStrippingThreshold threshold for stripping location data
*/
constructor(locationStrippingThreshold = 100) {
this._locationStrippingThreshold = locationStrippingThreshold;
}

/**
* Checks if an existing pipeline is valid against the current
* factory.
Expand Down Expand Up @@ -58,6 +66,31 @@ class PipelineFactory {
getPipeline(buckets) { // eslint-disable-line no-unused-vars
throw errors.NotImplemented;
}

/**
* Builds a conditional location stripping stage.
* Excludes location[] from Kafka if there are too many items (to save space).
* Keeps location[] for small objects (to avoid extra S3 fetches).
* @returns {Object} MongoDB $set aggregation stage
*/
_getLocationStrippingStage() {
return {
$set: {
'fullDocument.value.location': {
$cond: {
if: {
$gte: [
{ $size: { $ifNull: ['$fullDocument.value.location', []] } },
this._locationStrippingThreshold
]
},
then: '$$REMOVE',
else: '$fullDocument.value.location'
}
}
}
};
}
}

module.exports = PipelineFactory;
13 changes: 12 additions & 1 deletion extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ const PipelineFactory = require('./PipelineFactory');
* that listens to all buckets.
*/
class WildcardPipelineFactory extends PipelineFactory {
/**
* @constructor
* @param {number} locationStrippingThreshold threshold for stripping location data
*/
constructor(locationStrippingThreshold = 100) {
super(locationStrippingThreshold);
// getPipeline is used standalone later, make sure its this binds to us.
this.getPipeline = this.getPipeline.bind(this);
}

/**
* Checks if an existing pipeline is valid against the current
* factory.
Expand Down Expand Up @@ -38,7 +48,8 @@ class WildcardPipelineFactory extends PipelineFactory {
},
}
}
}
},
this._getLocationStrippingStage(),
]);
}
}
Expand Down
35 changes: 29 additions & 6 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,8 @@ class ReplicateObject extends BackbeatTask {
});
}

processQueueEntry(sourceEntry, kafkaEntry, done) {
processQueueEntry(_sourceEntry, kafkaEntry, done) {
let sourceEntry = _sourceEntry;
const log = this.logger.newRequestLogger();
const destEntry = sourceEntry.toReplicaEntry(this.site);

Expand Down Expand Up @@ -864,12 +865,34 @@ class ReplicateObject extends BackbeatTask {
this._setTargetAccountMd(destEntry, targetRole, log, next);
},
next => {
if (!mdOnly &&
sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) {
return this._checkSourceReplication(sourceEntry, log, next);
if (mdOnly) {
return next();
}
const isLargeObject = sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB;
const isLocationStripped = sourceEntry.getLocation().length === 0;
if (!isLargeObject && !isLocationStripped) {
return next();
}
return next();
return this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => {
if (err) {
return next(err);
}
if (isLargeObject) {
const status = refreshedEntry.getReplicationSiteStatus(this.site);
if (status === 'COMPLETED') {
log.info('replication already completed, skipping', {
entry: sourceEntry.getLogInfo(),
});
return next(errorAlreadyCompleted);
}
}
if (isLocationStripped) {
// Reassign sourceEntry to use fresh metadata
sourceEntry = refreshedEntry;
}
return next();
});
},
// Get data from source bucket and put it on the target bucket
next => {
Expand Down
39 changes: 38 additions & 1 deletion tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,44 @@ describe('queue processor functional tests with mocking', () => {
});
});

it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => {
it('should fetch object MD from S3 for location data when missing', done => {
// Create a Kafka entry with its location[] field stripped
const originalKafkaEntry = s3mock.getParam('kafkaEntry');
const kafkaValue = JSON.parse(originalKafkaEntry.value);
delete kafkaValue.location;
const kafkaEntry = {
...originalKafkaEntry,
value: JSON.stringify(kafkaValue),
};

s3mock.setParam('contentLength', 1000);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
(req, url, query, res) => {
checkMdCalled = true;
s3mock.resetParam('routes.source.s3.getMetadata.handler');
s3mock._getMetadataSource(req, url, query, res);
}, { _static: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
kafkaEntry, err => {
assert.ifError(err);
assert.strictEqual(s3mock.hasPutTargetData, true);
assert(s3mock.hasPutTargetMd);
assert(checkMdCalled);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});

it('should not check object MD for small objects with location', done => {
s3mock.setParam('contentLength', 1);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const assert = require('assert');
const { OplogPopulatorConfigJoiSchema} = require('../../../extensions/oplogPopulator/OplogPopulatorConfigValidator');

const defaultConfig = {
topic: 'backbeat-oplog',
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
numberOfConnectors: 1,
probeServer: {
bindAddress: '0.0.0.0',
port: 8556,
},
};

describe('OplogPopulatorConfigValidator', () => {
describe('locationStrippingThreshold validation', () => {
it('should accept valid threshold', () => {
const config = {
...defaultConfig,
locationStrippingThreshold: 50,
};
const result = OplogPopulatorConfigJoiSchema.validate(config);
assert.ifError(result.error);
assert.strictEqual(result.value.locationStrippingThreshold, 50);
});

it('should use default of 100 when not specified', () => {
const config = { ...defaultConfig };
const result = OplogPopulatorConfigJoiSchema.validate(config);
assert.ifError(result.error);
assert.strictEqual(result.value.locationStrippingThreshold, 100);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const MultipleBucketsPipelineFactory =
const { constants } = require('arsenal');

describe('MultipleBucketsPipelineFactory', () => {
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory();
const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(200);

describe('isValid', () => {
it('should detect a valid list of buckets', () => {
Expand Down Expand Up @@ -45,10 +45,15 @@ describe('MultipleBucketsPipelineFactory', () => {
assert.strictEqual(result, '[]');
});

it('should return the pipeline with buckets', () => {
it('should return the pipeline with buckets and location stripping', () => {
const buckets = ['bucket1', 'bucket2'];
const result = multipleBucketsPipelineFactory.getPipeline(buckets);
assert.strictEqual(result, '[{"$match":{"ns.coll":{"$in":["bucket1","bucket2"]}}}]');
const pipeline = JSON.parse(result);

assert.strictEqual(pipeline.length, 2);
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}});
assert(pipeline[1].$set['fullDocument.value.location']);
assert(result.includes('200'));
});
});

Expand Down
11 changes: 8 additions & 3 deletions tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/p
const { constants } = require('arsenal');

describe('WildcardPipelineFactory', () => {
const wildcardPipelineFactory = new WildcardPipelineFactory();
const wildcardPipelineFactory = new WildcardPipelineFactory(200);

describe('isValid', () => {
it('should detect a wildcard', () => {
Expand Down Expand Up @@ -32,10 +32,15 @@ describe('WildcardPipelineFactory', () => {
});

describe('getPipeline', () => {
it('should return the pipeline with wildcard', () => {
it('should return the pipeline with buckets and location stripping', () => {
const buckets = ['bucket1', 'bucket2'];
const result = wildcardPipelineFactory.getPipeline(buckets);
assert.strictEqual(result, '[{"$match":{"ns.coll":{"$not":{"$regex":"^(mpuShadowBucket|__).*"}}}}]');
const pipeline = JSON.parse(result);

assert.strictEqual(pipeline.length, 2);
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}});
assert(pipeline[1].$set['fullDocument.value.location']);
assert(result.includes('200'));
});
});

Expand Down
Loading