Skip to content

Commit 0e02af2

Browse files
manuzhangcodex
andcommitted
Docs: Fix Flink docs examples
Update the Flink docs to use the correct Iceberg metadata-table syntax, clean up the generic CREATE CATALOG example, and replace hardcoded runtime jar examples with version-templated names. Also fix the PyFlink install snippet and several wording issues in the writing and reading sections. Co-authored-by: Codex <codex@openai.com>
1 parent 027f088 commit 0e02af2

1 file changed

Lines changed: 20 additions & 18 deletions

File tree

docs/docs/flink.md

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ Start the Flink SQL client. There is a separate `flink-runtime` module in the Ic
7777
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
7878
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
7979

80-
# Below works for 1.15 or less
81-
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-1.15-{{ icebergVersion }}.jar shell
80+
# Below works for Flink 1.15 or earlier
81+
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-{{ flinkVersionMajor }}-{{ icebergVersion }}.jar shell
8282

83-
# 1.16 or above has a regression in loading external jar via -j option. See FLINK-30035 for details.
84-
put iceberg-flink-runtime-1.16-{{ icebergVersion }}.jar in flink/lib dir
83+
# Flink 1.16+ has a regression in loading external jars via -j. See FLINK-30035 for details.
84+
# put iceberg-flink-runtime-{{ flinkVersionMajor }}-{{ icebergVersion }}.jar in flink/lib dir
8585
./bin/sql-client.sh embedded shell
8686
```
8787

@@ -111,11 +111,11 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V
111111
## Flink's Python API
112112

113113
!!! info
114-
PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786)
114+
PyFlink 1.6.1 has a known issue on macOS with Apple Silicon. See [FLINK-28786](https://issues.apache.org/jira/browse/FLINK-28786).
115115

116116
Install the Apache Flink dependency using `pip`:
117117

118-
```python
118+
```bash
119119
pip install apache-flink=={{ flinkVersion }}
120120
```
121121

@@ -175,20 +175,20 @@ Run a query:
175175

176176
For more details, please refer to the [Python Table API](https://ci.apache.org/projects/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/python/table/intro_to_table_api/).
177177

178-
## Adding catalogs.
178+
## Adding catalogs
179179

180-
Flink support to create catalogs by using Flink SQL.
180+
Flink supports creating catalogs using Flink SQL.
181181

182182
### Catalog Configuration
183183

184184
A catalog is created and named by executing the following query (replace `<catalog_name>` with your catalog name and
185-
`<config_key>`=`<config_value>` with catalog implementation config):
185+
`'<config_key>'='<config_value>'` with catalog implementation config):
186186

187187
```sql
188188
CREATE CATALOG <catalog_name> WITH (
189189
'type'='iceberg',
190-
`<config_key>`=`<config_value>`
191-
);
190+
'<config_key>'='<config_value>'
191+
);
192192
```
193193

194194
The following properties can be set globally and are not limited to a specific catalog implementation:
@@ -245,15 +245,15 @@ INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
245245
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
246246
```
247247

248-
To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
248+
To replace data in the table with the result of a query, use `INSERT OVERWRITE` in a batch job (Flink streaming jobs do not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
249249

250250
Partitions that have rows produced by the SELECT query will be replaced, for example:
251251

252252
```sql
253253
INSERT OVERWRITE `hive_catalog`.`default`.`sample` VALUES (1, 'a');
254254
```
255255

256-
Iceberg also support overwriting given partitions by the `select` values:
256+
Iceberg also supports overwriting given partitions by the `SELECT` values:
257257

258258
```sql
259259
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
@@ -276,8 +276,10 @@ env.execute("Test Iceberg DataStream");
276276
```
277277

278278
### Branch Writes
279-
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`
279+
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`.
280+
280281
For more information on branches please refer to [branches](branching.md).
282+
281283
```java
282284
FlinkSink.forRowData(input)
283285
.tableLoader(tableLoader)
@@ -287,15 +289,15 @@ FlinkSink.forRowData(input)
287289

288290
## Reading
289291

290-
Submit a Flink __batch__ job using the following sentences:
292+
Submit a Flink __batch__ job using the following statements:
291293

292294
```sql
293295
-- Execute the flink job in batch mode for current session context
294296
SET execution.runtime-mode = batch;
295297
SELECT * FROM `hive_catalog`.`default`.`sample`;
296298
```
297299

298-
Iceberg supports processing incremental data in flink __streaming__ jobs which starts from a historical snapshot-id:
300+
Iceberg supports processing incremental data in Flink __streaming__ jobs that start from a historical snapshot ID:
299301

300302
```sql
301303
-- Submit the flink job in streaming mode for current session.
@@ -314,10 +316,10 @@ SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true',
314316
SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table:
315317

316318
```sql
317-
SELECT * FROM `hive_catalog`.`default`.`sample`.`snapshots`
319+
SELECT * FROM `hive_catalog`.`default`.`sample$snapshots`;
318320
```
319321

320-
Iceberg support streaming or batch read in Java API:
322+
Iceberg supports streaming or batch reads in the Java API:
321323

322324
```
323325
DataStream<RowData> batch = FlinkSource.forRowData()

0 commit comments

Comments
 (0)