-
Notifications
You must be signed in to change notification settings - Fork 23
Optimize oplog size by stripping locations of large objects #2715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development/9.3
Are you sure you want to change the base?
Changes from all commits
eeaabb2
2dc2bbd
636f27a
bf4ac4c
db7640b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,14 @@ const { wildCardForAllBuckets } = require('../constants'); | |
| * @classdesc base class for the pipeline factories | ||
| */ | ||
| class PipelineFactory { | ||
| /** | ||
| * @constructor | ||
| * @param {number} locationStrippingBytesThreshold threshold for stripping location data | ||
| */ | ||
| constructor(locationStrippingBytesThreshold) { | ||
| this._locationStrippingBytesThreshold = locationStrippingBytesThreshold; | ||
| } | ||
|
|
||
| /** | ||
| * Checks if an existing pipeline is valid against the current | ||
| * factory. | ||
|
|
@@ -55,7 +63,49 @@ class PipelineFactory { | |
| * @param {string[] | undefined} buckets buckets assigned to this connector | ||
| * @returns {string} new connector pipeline | ||
| */ | ||
| getPipeline(buckets) { // eslint-disable-line no-unused-vars | ||
| getPipeline(buckets) { | ||
| const stage = this.getPipelineStage(buckets); | ||
| if (!stage) { | ||
| return JSON.stringify([]); | ||
| } | ||
| const pipeline = [ | ||
| stage, | ||
| ]; | ||
| if (this._locationStrippingBytesThreshold > 0) { | ||
| pipeline.push({ | ||
| $set: { | ||
| 'fullDocument.value.location': { | ||
| $cond: { | ||
| if: { $gte: [ | ||
| '$fullDocument.value.content-length', | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does may be good to add a test for this specific case (not sure it really happen, but maybe in some corner case like pushing empty objet...), to ensure we are not adding any risk
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In MongoDB's BSON comparison order, null/missing values are less than any number (MinKey < Null < Numbers < ...). So when content-length is missing, $gte: [null, threshold] evaluates to false, and the else branch is taken, preserving the location field as-is. This is the correct behavior — if we don't know the object size, we keep the location. Working on a test.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually we do not have functional tests using Mongo + Kafka + Kafka Connect here? So not really testable as is here. Maybe in Zenko?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking more of a "unit" test, i.e. just trying to use the pipeline returned by
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if later you can add functional tests in zenko i think it would be nice |
||
| this._locationStrippingBytesThreshold, | ||
| ] }, | ||
| then: '$$REMOVE', | ||
| else: '$fullDocument.value.location', | ||
| }, | ||
| }, | ||
| 'updateDescription.updatedFields.value.location': { | ||
| $cond: { | ||
| if: { $gte: [ | ||
| '$updateDescription.updatedFields.value.content-length', | ||
| this._locationStrippingBytesThreshold, | ||
| ] }, | ||
| then: '$$REMOVE', | ||
| else: '$updateDescription.updatedFields.value.location', | ||
| }, | ||
| }, | ||
| } | ||
| }); | ||
| } | ||
| return JSON.stringify(pipeline); | ||
| } | ||
|
|
||
| /** | ||
| * Makes connector pipeline stage, to then be used by getPipeline. | ||
| * @param {string[] | undefined} buckets buckets assigned to this connector | ||
| * @returns {object} connector pipeline stage | ||
| */ | ||
| getPipelineStage(buckets) { // eslint-disable-line no-unused-vars | ||
| throw errors.NotImplemented; | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.