From 1ed6e5ac50e2a70ba3f9ba7b13638255e3c6afe2 Mon Sep 17 00:00:00 2001 From: Aleksandr Savonin Date: Sat, 4 Apr 2026 13:02:26 +0200 Subject: [PATCH 1/2] [FLINK-39400] Fix typos/bugs in both English and Chinese docs --- .../docs/connectors/datastream/mongodb.md | 2 +- .../docs/connectors/table/mongodb.md | 13 ++++++------- .../docs/connectors/datastream/mongodb.md | 4 ++-- docs/content/docs/connectors/table/mongodb.md | 18 +++++++++--------- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/mongodb.md b/docs/content.zh/docs/connectors/datastream/mongodb.md index afa11b2a..44ee5f56 100644 --- a/docs/content.zh/docs/connectors/datastream/mongodb.md +++ b/docs/content.zh/docs/connectors/datastream/mongodb.md @@ -165,7 +165,7 @@ stream.sinkTo(sink); {{< /tabs >}} ### 配置项 -Flink 的 MongoDB Source 可以通过 `MongoSink.builder()` 构造器构建。 +Flink 的 MongoDB Sink 可以通过 `MongoSink.builder()` 构造器构建。 1. __setUri(String uri)__ * 必须。 diff --git a/docs/content.zh/docs/connectors/table/mongodb.md b/docs/content.zh/docs/connectors/table/mongodb.md index ec739ce6..2aba158f 100644 --- a/docs/content.zh/docs/connectors/table/mongodb.md +++ b/docs/content.zh/docs/connectors/table/mongodb.md @@ -38,8 +38,7 @@ MongoDB 连接器提供了从 MongoDB 中读取和写入数据的能力。 依赖 ------------ -In order to use the MongoDB connector the following dependencies are required for both projects -using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. +为了使用 MongoDB 连接器,以下依赖项对于使用构建自动化工具(如 Maven 或 SBT)的项目和使用 SQL JAR 包的 SQL Client 都是必需的。 {{< sql_connector_download_table "mongodb" >}} @@ -70,7 +69,7 @@ INSERT INTO MyUserTable SELECT _id, name, age, status FROM T; -- 从 MongoDB 表中读取数据 -SELECT id, name, age, status FROM MyUserTable; +SELECT _id, name, age, status FROM MyUserTable; -- 将 MongoDB 表作为维度表关联 SELECT * FROM myTopic @@ -288,7 +287,7 @@ ON myTopic.key = MyUserTable._id;
sink.delivery-guarantee
可选 否 - at-lease-once + at-least-once

Enum

可选值: none, at-least-once 设置投递保证。仅一次(exactly-once)的投递保证暂不支持。 @@ -347,8 +346,8 @@ lookup cache 的主要目的是用于提高时态表关联 MongoDB 连接器的 默认情况下,flink 会缓存主键的空查询结果,你可以通过将 `lookup.partial-cache.cache-missing-key` 设置为 false 来切换行为。 ### 幂等写入 -如果在 DDL 中定义了主键,MongoDB connector 将使用 UPSERT 模式 `db.connection.update(, , { upsert: true })` 写入 MongoDB -而不是 INSERT 模式 `db.connection.insert()`。 我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 UPSERT 模式进行写入,来保证写入的幂等性。 +如果在 DDL 中定义了主键,MongoDB connector 将使用 UPSERT 模式 `db.collection.update(, , { upsert: true })` 写入 MongoDB +而不是 INSERT 模式 `db.collection.insert()`。 我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 UPSERT 模式进行写入,来保证写入的幂等性。 当使用 `INSERT OVERWRITE` 写入 MongoDB Table 时,会强制使用 UPSERT 模式写入 MongoDB。 因此,当DDL中没有定义 MongoDB Table 的主键时,会拒绝写入。 @@ -405,7 +404,7 @@ INSERT INTO MySinkTable SELECT _id, shardKey0, shardKey1, status FROM T; INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0', shardKey1 = 'value1') SELECT 1, 'INIT'; -- 指定固定分片键值 (shardKey0) 和动态分片键值 (shardKey1) -INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1' 'INIT'; +INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1', 'INIT'; ``` {{< hint warning >}} 限制:尽管 MongoDB 4.2 及之后版本中分片键的值不再是不可变的, diff --git a/docs/content/docs/connectors/datastream/mongodb.md b/docs/content/docs/connectors/datastream/mongodb.md index 3e195abc..4e32ba7c 100644 --- a/docs/content/docs/connectors/datastream/mongodb.md +++ b/docs/content/docs/connectors/datastream/mongodb.md @@ -207,10 +207,10 @@ Flink's MongoDB sink is created by using the static builder `MongoSink. serializationSchema)__ +10. __setSerializationSchema(MongoSerializationSchema serializationSchema)__ * Required. * A `MongoSerializationSchema` is required for parsing input record to MongoDB [WriteModel](https://www.mongodb.com/docs/drivers/java/sync/current/usage-examples/bulkWrite/). diff --git a/docs/content/docs/connectors/table/mongodb.md b/docs/content/docs/connectors/table/mongodb.md index 0d4f0658..4d5973c1 100644 --- a/docs/content/docs/connectors/table/mongodb.md +++ b/docs/content/docs/connectors/table/mongodb.md @@ -74,7 +74,7 @@ INSERT INTO MyUserTable SELECT _id, name, age, status FROM T; -- scan data from the MongoDB table -SELECT id, name, age, status FROM MyUserTable; +SELECT _id, name, age, status FROM MyUserTable; -- temporal join the MongoDB table as a dimension table SELECT * FROM myTopic @@ -199,7 +199,7 @@ Connector Options no (none) Duration - The max time to live for each rows in lookup cache after writing into the cache. + The max time to live for each row in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. @@ -208,7 +208,7 @@ Connector Options no (none) Duration - The max time to live for each rows in lookup cache after accessing the entry in the cache. + The max time to live for each row in lookup cache after accessing the entry in the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. @@ -241,7 +241,7 @@ Connector Options optional no always - Enum Possible values: always, never +

Enum

Possible values: always, never Fine-grained configuration to control filter push down. Supported policies are:
    @@ -294,7 +294,7 @@ Connector Options
    sink.delivery-guarantee
    optional no - at-lease-once + at-least-once

    Enum

    Possible values: none, at-least-once Optional delivery guarantee when committing. The exactly-once guarantee is not supported yet. @@ -345,7 +345,7 @@ fields in the order defined in the DDL. value as _id of the corresponding document. - When there's multiple fields in the specified primary key, we convert and composite these fields into a bson document as the _id of the corresponding document. - For example, if have a primary key statement `PRIMARY KEY (f1, f2) NOT ENFORCED`, + For example, if you have a primary key statement `PRIMARY KEY (f1, f2) NOT ENFORCED`, the extracted _id will be the form like `_id: {f1: v1, f2: v2}`. Notice that it will be ambiguous if the _id field exists in DDL, but the primary key is not declared as _id. @@ -393,8 +393,8 @@ by setting `lookup.partial-cache.cache-missing-key` to false. ### Idempotent Writes -MongoDB connector use upsert writing mode `db.connection.update(, , { upsert: true })` -rather than insert writing mode `db.connection.insert()` if primary key is defined in DDL. +MongoDB connector use upsert writing mode `db.collection.update(, , { upsert: true })` +rather than insert writing mode `db.collection.insert()` if primary key is defined in DDL. We composite the primary key fields as the document _id which is the reserved primary key of MongoDB. Use upsert mode to write rows into MongoDB, which provides idempotence. @@ -456,7 +456,7 @@ INSERT INTO MySinkTable SELECT _id, shardKey0, shardKey1, status FROM T; INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0', shardKey1 = 'value1') SELECT 1, 'INIT'; -- Insert with static(shardKey0) and dynamic(shardKey1) partition -INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1' 'INIT'; +INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1', 'INIT'; ``` {{< hint warning >}} LIMITATION: Although the shard key value is no longer immutable in MongoDB 4.2 and later, From 968e1ca7a40d005c1e2f129841af12cdac9c04fd Mon Sep 17 00:00:00 2001 From: Aleksandr Savonin Date: Sat, 4 Apr 2026 13:09:59 +0200 Subject: [PATCH 2/2] [FLINK-39400] Add missing PAGINATION strategy and sync Chinese docs with English --- .../docs/connectors/datastream/mongodb.md | 11 ++++++-- .../docs/connectors/table/mongodb.md | 27 ++++++++++++++++++- .../docs/connectors/datastream/mongodb.md | 4 ++- docs/content/docs/connectors/table/mongodb.md | 13 ++++++++- 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/mongodb.md b/docs/content.zh/docs/connectors/datastream/mongodb.md index 44ee5f56..c5d2cc04 100644 --- a/docs/content.zh/docs/connectors/datastream/mongodb.md +++ b/docs/content.zh/docs/connectors/datastream/mongodb.md @@ -101,7 +101,7 @@ Flink 的 MongoDB Source 可以通过 `MongoSource.builder()` 构造 或者 `maxTimeMS()` 大于 30分钟。 6. _setPartitionStrategy(PartitionStrategy partitionStrategy) _ * 可选。 默认值: `PartitionStrategy.DEFAULT`。 - * 设置分区策略。 可选的分区策略有 `SINGLE`,`SAMPLE`,`SPLIT_VECTOR`,`SHARDED` 和 `DEFAULT`。 + * 设置分区策略。 可选的分区策略有 `SINGLE`,`SAMPLE`,`SPLIT_VECTOR`,`SHARDED`,`PAGINATION` 和 `DEFAULT`。 查看 分区策略 章节获取详细介绍。 7. _setPartitionSize(MemorySize partitionSize)_ * 可选。默认值:`64mb`。 @@ -130,6 +130,7 @@ Flink 的 MongoDB Source 可以通过 `MongoSource.builder()` 构造 仅适用于未分片集合,需要 splitVector 权限。 - `SHARDED`:从 `config.chunks` 集合中直接读取分片集合的分片边界作为分区,不需要额外计算,快速且均匀。 仅适用于已经分片的集合,需要 config 数据库的读取权限。 +- `PAGINATION`:按记录数均匀分割。每个分区将拥有完全相同的记录数量。可以通过 `setPartitionRecordSize()` 配置。 - `DEFAULT`:对分片集合使用 `SHARDED` 策略,对未分片集合使用 `SPLIT_VECTOR` 策略。 ## MongoDB Sink @@ -188,7 +189,13 @@ Flink 的 MongoDB Sink 可以通过 `MongoSink.builder()` 构造器 7. _setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)_ * 可选。默认值:`DeliveryGuarantee.AT_LEAST_ONCE`. * 设置投递保证。 仅一次(EXACTLY_ONCE)的投递保证暂不支持。 -8. __setSerializationSchema(MongoSerializationSchema serializationSchema)__ +8. _setOrderedWrites(boolean ordered)_ + * 可选。默认值:`true`。 + * 设置 MongoDB 驱动程序选项以执行有序写入。 +9. _setBypassDocumentValidation(boolean bypassDocumentValidation)_ + * 可选。默认值:`false`。 + * 设置 MongoDB 驱动程序选项以绕过文档验证。 +10. __setSerializationSchema(MongoSerializationSchema serializationSchema)__ * 必须。 * 设置 `MongoSerializationSchema` 将输入类型转换为 MongoDB [WriteModel](https://www.mongodb.com/docs/drivers/java/sync/current/usage-examples/bulkWrite/)。 diff --git a/docs/content.zh/docs/connectors/table/mongodb.md b/docs/content.zh/docs/connectors/table/mongodb.md index 2aba158f..7b339fd8 100644 --- a/docs/content.zh/docs/connectors/table/mongodb.md +++ b/docs/content.zh/docs/connectors/table/mongodb.md @@ -149,7 +149,7 @@ ON myTopic.key = MyUserTable._id; 否 default String - 设置分区策略。可以选择的分区策略有`single`,`sample`,`split-vector`,`sharded` 和 `default`。 + 设置分区策略。可以选择的分区策略有`single`,`sample`,`split-vector`,`sharded`,`pagination` 和 `default`。 请参阅分区扫描部分了解更多详情。 @@ -169,6 +169,14 @@ ON myTopic.key = MyUserTable._id; 仅用于 `SAMPLE` 抽样分区策略,设置每个分区的样本数量。抽样分区器根据分区键对集合进行随机采样的方式计算分区边界。 `总的样本数量 = 每个分区的样本数量 * (文档总数 / 每个分区的文档数量)`。 + +
    scan.partition.record-size
    + 可选 + 否 + (none) + Integer + 设置每个分区的记录数量。仅在分区策略为 `pagination` 时生效。如果未设置,将根据 `scan.partition.size` 自动推断。 +
    lookup.cache
    可选 @@ -290,6 +298,22 @@ ON myTopic.key = MyUserTable._id; at-least-once

    Enum

    可选值: none, at-least-once 设置投递保证。仅一次(exactly-once)的投递保证暂不支持。 + + +
    sink.ordered-writes
    + 可选 + 否 + true + Boolean + 设置 MongoDB 驱动程序选项以执行有序写入。默认值为 true,表示有序写入。 + + +
    sink.bypass-document-validation
    + 可选 + 否 + false + Boolean + 设置 MongoDB 驱动程序选项以绕过文档验证。默认值为 false,表示进行文档验证。 @@ -330,6 +354,7 @@ MongoDB 连接器通过将 DDL 声明的主键进行组合,来生成文档的 仅适用于未分片集合,需要 splitVector 权限。 - `sharded`: 从 `config.chunks` 集合中直接读取分片集合的分片边界作为分区,不需要额外计算,快速且均匀。 仅适用于已经分片的集合,需要 config 数据库的读取权限。 +- `pagination`: 按记录数均匀分割。每个分区将拥有完全相同的记录数量。可以通过 `scan.partition.record-size` 选项配置。 - `default`: 对分片集合使用 `sharded` 策略,对未分片集合使用 `split-vector` 策略。 ### Lookup Cache diff --git a/docs/content/docs/connectors/datastream/mongodb.md b/docs/content/docs/connectors/datastream/mongodb.md index 4e32ba7c..53ff3b56 100644 --- a/docs/content/docs/connectors/datastream/mongodb.md +++ b/docs/content/docs/connectors/datastream/mongodb.md @@ -106,7 +106,7 @@ Flink's MongoDB source is created by using the static builder `MongoSource.Partition Strategies section for detail. + `SHARDED`, `PAGINATION` and `DEFAULT`. You can see Partition Strategies section for detail. 7. _setPartitionSize(MemorySize partitionSize)_ * Optional. Default: `64mb`. * Sets the partition memory size of MongoDB split. Split a MongoDB collection into multiple @@ -144,6 +144,8 @@ The following partition strategies are provided: range of the chunks are stored within the collection) as the partitions directly. The sharded strategy only used for sharded collection which is fast and even. Read permission of config database is required. +- `PAGINATION`: splits records evenly by count. Each chunk will have exactly the same number + of records. This can be configured by `setPartitionRecordSize()`. - `DEFAULT`: uses sharded strategy for sharded collections otherwise using split vector strategy. diff --git a/docs/content/docs/connectors/table/mongodb.md b/docs/content/docs/connectors/table/mongodb.md index 4d5973c1..252dc59f 100644 --- a/docs/content/docs/connectors/table/mongodb.md +++ b/docs/content/docs/connectors/table/mongodb.md @@ -154,7 +154,7 @@ Connector Options no default String - Specifies the partition strategy. Available strategies are `single`, `sample`, `split-vector`, `sharded` and `default`. + Specifies the partition strategy. Available strategies are `single`, `sample`, `split-vector`, `sharded`, `pagination` and `default`. See the following Partitioned Scan section for more details. @@ -176,6 +176,15 @@ Connector Options Then uses every `scan.partition.samples` as the value to use to calculate the partition boundaries. The total number of samples taken is calculated as: `samples per partition * (count of documents / number of documents per partition)`. + +
    scan.partition.record-size
    + optional + no + (none) + Integer + Specifies the number of records in each chunk. Only takes effect when the partition strategy is `pagination`. + This option will be automatically inferred from `scan.partition.size` if absent. +
    lookup.cache
    optional @@ -366,6 +375,8 @@ feature for MongoDB collection. The following partition strategies are provided: range of the chunks are stored within the collection) as the partitions directly. The sharded strategy only used for sharded collection which is fast and even. Read permission of config database is required. +- `pagination`: splits records evenly by count. Each chunk will have exactly the same number + of records. This can be configured by `scan.partition.record-size` option. - `default`: uses sharded strategy for sharded collections otherwise using split vector strategy.