Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions docs/content.zh/docs/connectors/datastream/mongodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Flink 的 MongoDB Source 可以通过 `MongoSource.<OutputType>builder()` 构造
或者 `maxTimeMS()` 大于 30分钟。
6. _setPartitionStrategy(PartitionStrategy partitionStrategy) _
* 可选。 默认值: `PartitionStrategy.DEFAULT`。
* 设置分区策略。 可选的分区策略有 `SINGLE`,`SAMPLE`,`SPLIT_VECTOR`,`SHARDED` 和 `DEFAULT`。
* 设置分区策略。 可选的分区策略有 `SINGLE`,`SAMPLE`,`SPLIT_VECTOR`,`SHARDED`,`PAGINATION` 和 `DEFAULT`。
查看 <a href="#分区策略">分区策略</a> 章节获取详细介绍。
7. _setPartitionSize(MemorySize partitionSize)_
* 可选。默认值:`64mb`。
Expand Down Expand Up @@ -130,6 +130,7 @@ Flink 的 MongoDB Source 可以通过 `MongoSource.<OutputType>builder()` 构造
仅适用于未分片集合,需要 splitVector 权限。
- `SHARDED`:从 `config.chunks` 集合中直接读取分片集合的分片边界作为分区,不需要额外计算,快速且均匀。
仅适用于已经分片的集合,需要 config 数据库的读取权限。
- `PAGINATION`:按记录数均匀分割。每个分区将拥有完全相同的记录数量。可以通过 `setPartitionRecordSize()` 配置。
- `DEFAULT`:对分片集合使用 `SHARDED` 策略,对未分片集合使用 `SPLIT_VECTOR` 策略。

## MongoDB Sink
Expand Down Expand Up @@ -165,7 +166,7 @@ stream.sinkTo(sink);
{{< /tabs >}}

### 配置项
Flink 的 MongoDB Source 可以通过 `MongoSink.<InputType>builder()` 构造器构建。
Flink 的 MongoDB Sink 可以通过 `MongoSink.<InputType>builder()` 构造器构建。

1. __setUri(String uri)__
* 必须。
Expand All @@ -188,7 +189,13 @@ Flink 的 MongoDB Source 可以通过 `MongoSink.<InputType>builder()` 构造器
7. _setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)_
* 可选。默认值:`DeliveryGuarantee.AT_LEAST_ONCE`.
* 设置投递保证。 仅一次(EXACTLY_ONCE)的投递保证暂不支持。
8. __setSerializationSchema(MongoSerializationSchema<InputType> serializationSchema)__
8. _setOrderedWrites(boolean ordered)_
* 可选。默认值:`true`。
* 设置 MongoDB 驱动程序选项以执行有序写入。
9. _setBypassDocumentValidation(boolean bypassDocumentValidation)_
* 可选。默认值:`false`。
* 设置 MongoDB 驱动程序选项以绕过文档验证。
10. __setSerializationSchema(MongoSerializationSchema<InputType> serializationSchema)__
* 必须。
* 设置 `MongoSerializationSchema` 将输入类型转换为 MongoDB
[WriteModel](https://www.mongodb.com/docs/drivers/java/sync/current/usage-examples/bulkWrite/)。
Expand Down
40 changes: 32 additions & 8 deletions docs/content.zh/docs/connectors/table/mongodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ MongoDB 连接器提供了从 MongoDB 中读取和写入数据的能力。

依赖
------------
In order to use the MongoDB connector the following dependencies are required for both projects
using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
为了使用 MongoDB 连接器,以下依赖项对于使用构建自动化工具(如 Maven 或 SBT)的项目和使用 SQL JAR 包的 SQL Client 都是必需的。

{{< sql_connector_download_table "mongodb" >}}

Expand Down Expand Up @@ -70,7 +69,7 @@ INSERT INTO MyUserTable
SELECT _id, name, age, status FROM T;

-- 从 MongoDB 表中读取数据
SELECT id, name, age, status FROM MyUserTable;
SELECT _id, name, age, status FROM MyUserTable;

-- 将 MongoDB 表作为维度表关联
SELECT * FROM myTopic
Expand Down Expand Up @@ -150,7 +149,7 @@ ON myTopic.key = MyUserTable._id;
<td>否</td>
<td style="word-wrap: break-word;">default</td>
<td>String</td>
<td>设置分区策略。可以选择的分区策略有`single`,`sample`,`split-vector`,`sharded` 和 `default`。
<td>设置分区策略。可以选择的分区策略有`single`,`sample`,`split-vector`,`sharded`,`pagination` 和 `default`。
请参阅<a href="#分区扫描">分区扫描</a>部分了解更多详情。</td>
</tr>
<tr>
Expand All @@ -170,6 +169,14 @@ ON myTopic.key = MyUserTable._id;
<td>仅用于 `SAMPLE` 抽样分区策略,设置每个分区的样本数量。抽样分区器根据分区键对集合进行随机采样的方式计算分区边界。
`总的样本数量 = 每个分区的样本数量 * (文档总数 / 每个分区的文档数量)`。</td>
</tr>
<tr>
<td><h5>scan.partition.record-size</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>设置每个分区的记录数量。仅在分区策略为 `pagination` 时生效。如果未设置,将根据 `scan.partition.size` 自动推断。</td>
</tr>
<tr>
<td><h5>lookup.cache</h5></td>
<td>可选</td>
Expand Down Expand Up @@ -288,9 +295,25 @@ ON myTopic.key = MyUserTable._id;
<td><h5>sink.delivery-guarantee</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">at-lease-once</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td><p>Enum</p>可选值: none, at-least-once</td>
<td>设置投递保证。仅一次(exactly-once)的投递保证暂不支持。</td>
</tr>
<tr>
<td><h5>sink.ordered-writes</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>设置 MongoDB 驱动程序选项以执行有序写入。默认值为 true,表示有序写入。</td>
</tr>
<tr>
<td><h5>sink.bypass-document-validation</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>设置 MongoDB 驱动程序选项以绕过文档验证。默认值为 false,表示进行文档验证。</td>
</tr>
</tbody>
</table>
Expand Down Expand Up @@ -331,6 +354,7 @@ MongoDB 连接器通过将 DDL 声明的主键进行组合,来生成文档的
仅适用于未分片集合,需要 splitVector 权限。
- `sharded`: 从 `config.chunks` 集合中直接读取分片集合的分片边界作为分区,不需要额外计算,快速且均匀。
仅适用于已经分片的集合,需要 config 数据库的读取权限。
- `pagination`: 按记录数均匀分割。每个分区将拥有完全相同的记录数量。可以通过 `scan.partition.record-size` 选项配置。
- `default`: 对分片集合使用 `sharded` 策略,对未分片集合使用 `split-vector` 策略。

### Lookup Cache
Expand All @@ -347,8 +371,8 @@ lookup cache 的主要目的是用于提高时态表关联 MongoDB 连接器的
默认情况下,flink 会缓存主键的空查询结果,你可以通过将 `lookup.partial-cache.cache-missing-key` 设置为 false 来切换行为。

### 幂等写入
如果在 DDL 中定义了主键,MongoDB connector 将使用 UPSERT 模式 `db.connection.update(<query>, <update>, { upsert: true })` 写入 MongoDB
而不是 INSERT 模式 `db.connection.insert()`。 我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 UPSERT 模式进行写入,来保证写入的幂等性。
如果在 DDL 中定义了主键,MongoDB connector 将使用 UPSERT 模式 `db.collection.update(<query>, <update>, { upsert: true })` 写入 MongoDB
而不是 INSERT 模式 `db.collection.insert()`。 我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 UPSERT 模式进行写入,来保证写入的幂等性。

当使用 `INSERT OVERWRITE` 写入 MongoDB Table 时,会强制使用 UPSERT 模式写入 MongoDB。
因此,当DDL中没有定义 MongoDB Table 的主键时,会拒绝写入。
Expand Down Expand Up @@ -405,7 +429,7 @@ INSERT INTO MySinkTable SELECT _id, shardKey0, shardKey1, status FROM T;
INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0', shardKey1 = 'value1') SELECT 1, 'INIT';

-- 指定固定分片键值 (shardKey0) 和动态分片键值 (shardKey1)
INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1' 'INIT';
INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1', 'INIT';
```
{{< hint warning >}}
限制:尽管 MongoDB 4.2 及之后版本中分片键的值不再是不可变的,
Expand Down
8 changes: 5 additions & 3 deletions docs/content/docs/connectors/datastream/mongodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Flink's MongoDB source is created by using the static builder `MongoSource.<Outp
6. _setPartitionStrategy(PartitionStrategy partitionStrategy)_
* Optional. Default: `PartitionStrategy.DEFAULT`.
* Sets the partition strategy. Available partition strategies are `SINGLE`, `SAMPLE`, `SPLIT_VECTOR`,
`SHARDED` and `DEFAULT`. You can see <a href="#partition-strategies">Partition Strategies section</a> for detail.
`SHARDED`, `PAGINATION` and `DEFAULT`. You can see <a href="#partition-strategies">Partition Strategies section</a> for detail.
7. _setPartitionSize(MemorySize partitionSize)_
* Optional. Default: `64mb`.
* Sets the partition memory size of MongoDB split. Split a MongoDB collection into multiple
Expand Down Expand Up @@ -144,6 +144,8 @@ The following partition strategies are provided:
range of the chunks are stored within the collection) as the partitions directly. The
sharded strategy only used for sharded collection which is fast and even. Read permission
of config database is required.
- `PAGINATION`: splits records evenly by count. Each chunk will have exactly the same number
of records. This can be configured by `setPartitionRecordSize()`.
- `DEFAULT`: uses sharded strategy for sharded collections otherwise using split vector
strategy.

Expand Down Expand Up @@ -207,10 +209,10 @@ Flink's MongoDB sink is created by using the static builder `MongoSink.<InputTyp
8. _setOrderedWrites(boolean ordered)_
* Optional. Default: `true`
* Defines MongoDB driver option to perform ordered writes.
8. _setBypassDocumentValidation(boolean bypassDocumentValidation)_
9. _setBypassDocumentValidation(boolean bypassDocumentValidation)_
* Optional. Default: `false`
* Defines MongoDB driver option to bypass document validation.
9. __setSerializationSchema(MongoSerializationSchema<InputType> serializationSchema)__
10. __setSerializationSchema(MongoSerializationSchema<InputType> serializationSchema)__
* Required.
* A `MongoSerializationSchema` is required for parsing input record to MongoDB
[WriteModel](https://www.mongodb.com/docs/drivers/java/sync/current/usage-examples/bulkWrite/).
Expand Down
31 changes: 21 additions & 10 deletions docs/content/docs/connectors/table/mongodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ INSERT INTO MyUserTable
SELECT _id, name, age, status FROM T;

-- scan data from the MongoDB table
SELECT id, name, age, status FROM MyUserTable;
SELECT _id, name, age, status FROM MyUserTable;

-- temporal join the MongoDB table as a dimension table
SELECT * FROM myTopic
Expand Down Expand Up @@ -154,7 +154,7 @@ Connector Options
<td>no</td>
<td style="word-wrap: break-word;">default</td>
<td>String</td>
<td>Specifies the partition strategy. Available strategies are `single`, `sample`, `split-vector`, `sharded` and `default`.
<td>Specifies the partition strategy. Available strategies are `single`, `sample`, `split-vector`, `sharded`, `pagination` and `default`.
See the following <a href="#partitioned-scan">Partitioned Scan</a> section for more details.</td>
</tr>
<tr>
Expand All @@ -176,6 +176,15 @@ Connector Options
Then uses every `scan.partition.samples` as the value to use to calculate the partition boundaries.
The total number of samples taken is calculated as: `samples per partition * (count of documents / number of documents per partition)`.</td>
</tr>
<tr>
<td><h5>scan.partition.record-size</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Specifies the number of records in each chunk. Only takes effect when the partition strategy is `pagination`.
This option will be automatically inferred from `scan.partition.size` if absent.</td>
</tr>
<tr>
<td><h5>lookup.cache</h5></td>
<td>optional</td>
Expand All @@ -199,7 +208,7 @@ Connector Options
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The max time to live for each rows in lookup cache after writing into the cache.
<td>The max time to live for each row in lookup cache after writing into the cache.
"lookup.cache" must be set to "PARTIAL" to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. </td>
</tr>
<tr>
Expand All @@ -208,7 +217,7 @@ Connector Options
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The max time to live for each rows in lookup cache after accessing the entry in the cache.
<td>The max time to live for each row in lookup cache after accessing the entry in the cache.
"lookup.cache" must be set to "PARTIAL" to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. </td>
</tr>
<tr>
Expand Down Expand Up @@ -241,7 +250,7 @@ Connector Options
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">always</td>
<td>Enum Possible values: always, never</td>
<td><p>Enum</p>Possible values: always, never</td>
<td>Fine-grained configuration to control filter push down.
Supported policies are:
<ul>
Expand Down Expand Up @@ -294,7 +303,7 @@ Connector Options
<td><h5>sink.delivery-guarantee</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">at-lease-once</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td><p>Enum</p>Possible values: none, at-least-once</td>
<td>Optional delivery guarantee when committing. The exactly-once guarantee is not supported yet.</td>
</tr>
Expand Down Expand Up @@ -345,7 +354,7 @@ fields in the order defined in the DDL.
value as _id of the corresponding document.
- When there's multiple fields in the specified primary key, we convert and composite these fields
into a bson document as the _id of the corresponding document.
For example, if have a primary key statement `PRIMARY KEY (f1, f2) NOT ENFORCED`,
For example, if you have a primary key statement `PRIMARY KEY (f1, f2) NOT ENFORCED`,
the extracted _id will be the form like `_id: {f1: v1, f2: v2}`.

Notice that it will be ambiguous if the _id field exists in DDL, but the primary key is not declared as _id.
Expand All @@ -366,6 +375,8 @@ feature for MongoDB collection. The following partition strategies are provided:
range of the chunks are stored within the collection) as the partitions directly. The
sharded strategy only used for sharded collection which is fast and even. Read permission
of config database is required.
- `pagination`: splits records evenly by count. Each chunk will have exactly the same number
of records. This can be configured by `scan.partition.record-size` option.
- `default`: uses sharded strategy for sharded collections otherwise using split vector
strategy.

Expand Down Expand Up @@ -393,8 +404,8 @@ by setting `lookup.partial-cache.cache-missing-key` to false.

### Idempotent Writes

MongoDB connector use upsert writing mode `db.connection.update(<query>, <update>, { upsert: true })`
rather than insert writing mode `db.connection.insert()` if primary key is defined in DDL.
MongoDB connector use upsert writing mode `db.collection.update(<query>, <update>, { upsert: true })`
rather than insert writing mode `db.collection.insert()` if primary key is defined in DDL.
We composite the primary key fields as the document _id which is the reserved primary key of
MongoDB. Use upsert mode to write rows into MongoDB, which provides idempotence.

Expand Down Expand Up @@ -456,7 +467,7 @@ INSERT INTO MySinkTable SELECT _id, shardKey0, shardKey1, status FROM T;
INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0', shardKey1 = 'value1') SELECT 1, 'INIT';

-- Insert with static(shardKey0) and dynamic(shardKey1) partition
INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1' 'INIT';
INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1', 'INIT';
```
{{< hint warning >}}
LIMITATION: Although the shard key value is no longer immutable in MongoDB 4.2 and later,
Expand Down
Loading