Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class OplogPopulator {
if (this._config.numberOfConnectors === 0) {
// If the number of connector is set to 0, then we
// use a single connector to listen to the whole DB.
pipelineFactory = new WildcardPipelineFactory();
pipelineFactory = new WildcardPipelineFactory(this._config.locationStrippingBytesThreshold);
strategy = new UniqueConnector({
logger: this._logger,
});
Expand All @@ -345,7 +345,7 @@ class OplogPopulator {
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
pipelineFactory = new MultipleBucketsPipelineFactory();
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold);
strategy = new ImmutableConnector({
logger: this._logger,
});
Expand All @@ -354,7 +354,7 @@ class OplogPopulator {
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
pipelineFactory = new MultipleBucketsPipelineFactory();
pipelineFactory = new MultipleBucketsPipelineFactory(this._config.locationStrippingBytesThreshold);
strategy = new LeastFullConnector({
logger: this._logger,
});
Expand Down
1 change: 1 addition & 0 deletions extensions/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const joiSchema = joi.object({
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
numberOfConnectors: joi.number().required().min(0),
locationStrippingBytesThreshold: joi.number().min(0).default(0),
prefix: joi.string().optional(),
probeServer: probeServerJoi.default(),
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),
Expand Down
6 changes: 6 additions & 0 deletions extensions/oplogPopulator/OplogPopulatorTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ werelogs.configure({ level: config.log.logLevel,

const mongoConfig = config.queuePopulator.mongo;
const oplogPopulatorConfig = config.extensions.oplogPopulator;
const replicationQPConfig = config.extensions.replication &&
config.extensions.replication.queueProcessor;
if (replicationQPConfig && replicationQPConfig.sourceCheckIfSizeGreaterThanMB) {
oplogPopulatorConfig.locationStrippingBytesThreshold =
replicationQPConfig.sourceCheckIfSizeGreaterThanMB * 1024 * 1024;
}

const activeExtensions = [
'notification',
Expand Down
21 changes: 20 additions & 1 deletion extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Connector extends EventEmitter {
this._name = params.name;
this._config = params.config;
this._buckets = new Set(params.buckets);
this._liveBuckets = new Set(params.buckets);
this._isRunning = params.isRunning;
this._getPipeline = params.getPipeline;
this._state = {
Expand Down Expand Up @@ -166,6 +167,7 @@ class Connector extends EventEmitter {
config: this._config,
});
this._isRunning = true;
this._liveBuckets = new Set(this._buckets);
} catch (err) {
this._logger.error('Error while spawning connector', {
method: 'Connector.spawn',
Expand Down Expand Up @@ -193,6 +195,7 @@ class Connector extends EventEmitter {
this.emit(constants.connectorUpdatedEvent, this);
await this._kafkaConnect.deleteConnector(this._name);
this._isRunning = false;
this._liveBuckets = new Set();
// resetting the resume point to set a new one on creation of the connector
delete this._config['startup.mode.timestamp.start.at.operation.time'];
} catch (err) {
Expand Down Expand Up @@ -314,21 +317,37 @@ class Connector extends EventEmitter {
* That is why we use updateConnectorConfig() instead of
* updateConnectorPipeline()
* @param {boolean} [doUpdate=false] updates connector if true
* @param {boolean} [updateSupported=true] whether full pipeline updates
* (including match stage changes) are supported. When false, the update
* is only applied if the match stage has not changed, to protect resume
* tokens on MongoDB >= 6.0.
* @returns {Promise|boolean} connector did update
* @throws {InternalError}
*/
async updatePipeline(doUpdate = false) {
async updatePipeline(doUpdate = false, updateSupported = true) {
// Only update when buckets changed and when not already updating
if (!this._state.bucketsGotModified || this._state.isUpdating) {
return false;
}
this._config.pipeline = this._getPipeline([...this._buckets]);
try {
if (doUpdate && this._isRunning) {
if (!updateSupported) {
const sameBuckets = this._liveBuckets.size === this._buckets.size &&
this._liveBuckets.isSubsetOf(this._buckets);
if (!sameBuckets) {
this._logger.info('Skipping pipeline update: match stage differs', {
method: 'Connector.updatePipeline',
connector: this._name,
});
return false;
}
}
const timeBeforeUpdate = Date.now();
this._state.isUpdating = true;
this.emit(constants.connectorUpdatedEvent, this);
await this._kafkaConnect.updateConnectorConfig(this._name, this._config);
this._liveBuckets = new Set(this._buckets);
this._updateConnectorState(false, timeBeforeUpdate);
this._state.isUpdating = false;
return true;
Expand Down
4 changes: 2 additions & 2 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ class ConnectorsManager extends EventEmitter {
connector: connector.name,
});
return true;
} else if (connector.isRunning && this._allocationStrategy.canUpdate()) {
return connector.updatePipeline(true);
} else if (connector.isRunning) {
return connector.updatePipeline(true, this._allocationStrategy.canUpdate());
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ const PipelineFactory = require('./PipelineFactory');
* given a list of buckets.
*/
class MultipleBucketsPipelineFactory extends PipelineFactory {
/**
* @constructor
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
*/
constructor(locationStrippingBytesThreshold) {
super(locationStrippingBytesThreshold);
// getPipeline is used standalone later, make sure its `this` reference binds to us.
this.getPipeline = this.getPipeline.bind(this);
}

/**
* Checks if an existing pipeline is valid against the current
* factory.
Expand All @@ -22,24 +32,21 @@ class MultipleBucketsPipelineFactory extends PipelineFactory {
}

/**
* Makes new connector pipeline that includes
* buckets assigned to this connector.
* Makes connector pipeline stage, that includes buckets assigned to this connector.
* @param {string[] | undefined} buckets buckets assigned to this connector
* @returns {string} new connector pipeline
* @returns {object} connector pipeline stage
*/
getPipeline(buckets) {
getPipelineStage(buckets) {
if (!buckets || !buckets.length) {
return JSON.stringify([]);
return null;
}
return JSON.stringify([
{
$match: {
'ns.coll': {
$in: buckets,
}
return {
$match: {
'ns.coll': {
$in: buckets,
}
}
]);
};
}
}

Expand Down
52 changes: 51 additions & 1 deletion extensions/oplogPopulator/pipeline/PipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ const { wildCardForAllBuckets } = require('../constants');
* @classdesc base class for the pipeline factories
*/
class PipelineFactory {
/**
* @constructor
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
*/
constructor(locationStrippingBytesThreshold) {
this._locationStrippingBytesThreshold = locationStrippingBytesThreshold;
}

/**
* Checks if an existing pipeline is valid against the current
* factory.
Expand Down Expand Up @@ -55,7 +63,49 @@ class PipelineFactory {
* @param {string[] | undefined} buckets buckets assigned to this connector
* @returns {string} new connector pipeline
*/
getPipeline(buckets) { // eslint-disable-line no-unused-vars
getPipeline(buckets) {
const stage = this.getPipelineStage(buckets);
if (!stage) {
return JSON.stringify([]);
}
const pipeline = [
stage,
];
if (this._locationStrippingBytesThreshold > 0) {
pipeline.push({
$set: {
'fullDocument.value.location': {
$cond: {
if: { $gte: [
'$fullDocument.value.content-length',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does $gte behave if there is no $fullDocument.value.content-length field : does it go into the then or else case? (i.e. how does it compare undefined vs a number)

may be good to add a test for this specific case (not sure it really happen, but maybe in some corner case like pushing empty objet...), to ensure we are not adding any risk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MongoDB's BSON comparison order, null/missing values are less than any number (MinKey < Null < Numbers < ...). So when content-length is missing, $gte: [null, threshold] evaluates to false, and the else branch is taken, preserving the location field as-is.

This is the correct behavior — if we don't know the object size, we keep the location.

Working on a test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we do not have functional tests using Mongo + Kafka + Kafka Connect here? So not really testable as is here. Maybe in Zenko?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more of a "unit" test, i.e. just trying to use the pipeline returned by getPipeline with mongo : create object in mongo, then make a query with the pipeline and check the result (and repeat with different "kinds" of objects)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if later you can add functional tests in zenko i think it would be nice

this._locationStrippingBytesThreshold,
] },
then: '$$REMOVE',
else: '$fullDocument.value.location',
},
},
'updateDescription.updatedFields.value.location': {
$cond: {
if: { $gte: [
'$updateDescription.updatedFields.value.content-length',
this._locationStrippingBytesThreshold,
] },
then: '$$REMOVE',
else: '$updateDescription.updatedFields.value.location',
},
},
}
});
}
return JSON.stringify(pipeline);
}

/**
* Makes connector pipeline stage, to then be used by getPipeline.
* @param {string[] | undefined} buckets buckets assigned to this connector
* @returns {object} connector pipeline stage
*/
getPipelineStage(buckets) { // eslint-disable-line no-unused-vars
throw errors.NotImplemented;
}
}
Expand Down
33 changes: 20 additions & 13 deletions extensions/oplogPopulator/pipeline/WildcardPipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ const PipelineFactory = require('./PipelineFactory');
* that listens to all buckets.
*/
class WildcardPipelineFactory extends PipelineFactory {
/**
* @constructor
* @param {number} locationStrippingBytesThreshold threshold for stripping location data
*/
constructor(locationStrippingBytesThreshold) {
super(locationStrippingBytesThreshold);
// getPipeline is used standalone later, make sure its `this` reference binds to us.
this.getPipeline = this.getPipeline.bind(this);
}

/**
* Checks if an existing pipeline is valid against the current
* factory.
Expand All @@ -23,23 +33,20 @@ class WildcardPipelineFactory extends PipelineFactory {
}

/**
* Create a pipeline for the connector, to listen to all
* non-special collections.
* Makes connector pipeline stage, to listen to all non-special collections.
* @param {string[] | undefined} buckets buckets assigned to this connector
* @returns {string} new connector pipeline
* @returns {object} connector pipeline stage
*/
getPipeline(buckets) { // eslint-disable-line no-unused-vars
return JSON.stringify([
{
$match: {
'ns.coll': {
$not: {
$regex: `^(${constants.mpuBucketPrefix}|__).*`,
},
}
getPipelineStage(buckets) { // eslint-disable-line no-unused-vars
return {
$match: {
'ns.coll': {
$not: {
$regex: `^(${constants.mpuBucketPrefix}|__).*`,
},
}
}
]);
};
}
}

Expand Down
44 changes: 25 additions & 19 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -398,19 +398,6 @@ class ReplicateObject extends BackbeatTask {
});
}

_checkSourceReplication(sourceEntry, log, cb) {
this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => {
if (err) {
return cb(err);
}
const status = refreshedEntry.getReplicationSiteStatus(this.site);
if (status === 'COMPLETED') {
return cb(errorAlreadyCompleted);
}
return cb();
});
}

_getAndPutData(sourceEntry, destEntry, log, cb) {
log.debug('replicating data', { entry: sourceEntry.getLogInfo() });
if (sourceEntry.getLocation().some(part => {
Expand Down Expand Up @@ -825,7 +812,8 @@ class ReplicateObject extends BackbeatTask {
});
}

processQueueEntry(sourceEntry, kafkaEntry, done) {
processQueueEntry(_sourceEntry, kafkaEntry, done) {
let sourceEntry = _sourceEntry;
const log = this.logger.newRequestLogger();
const destEntry = sourceEntry.toReplicaEntry(this.site);

Expand Down Expand Up @@ -866,12 +854,30 @@ class ReplicateObject extends BackbeatTask {
this._setTargetAccountMd(destEntry, targetRole, log, next);
},
next => {
if (!mdOnly &&
sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) {
return this._checkSourceReplication(sourceEntry, log, next);
if (mdOnly) {
return next();
}
return next();
const isLargeObject = sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB;
const isLocationStripped = sourceEntry.getContentLength() > 0 && sourceEntry.getLocation().length === 0;
if (!isLargeObject && !isLocationStripped) {
return next();
}
return this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => {
if (err) {
return next(err);
}
const status = refreshedEntry.getReplicationSiteStatus(this.site);
if (status === 'COMPLETED') {
log.info('replication already completed, skipping', {
entry: sourceEntry.getLogInfo(),
});
return next(errorAlreadyCompleted);
}
// Reassign sourceEntry to use fresh metadata
sourceEntry = refreshedEntry;
return next();
});
},
// Get data from source bucket and put it on the target bucket
next => {
Expand Down
Loading