diff --git a/docs/docs/flink.md b/docs/docs/flink.md index 50bdc2c482cf..5a6788621140 100644 --- a/docs/docs/flink.md +++ b/docs/docs/flink.md @@ -77,11 +77,11 @@ Start the Flink SQL client. There is a separate `flink-runtime` module in the Ic # HADOOP_HOME is your hadoop root directory after unpack the binary package. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` -# Below works for 1.15 or less -./bin/sql-client.sh embedded -j /iceberg-flink-runtime-1.15-{{ icebergVersion }}.jar shell +# Below works for Flink 1.15 or earlier +./bin/sql-client.sh embedded -j /iceberg-flink-runtime-{{ flinkVersionMajor }}-{{ icebergVersion }}.jar shell -# 1.16 or above has a regression in loading external jar via -j option. See FLINK-30035 for details. -put iceberg-flink-runtime-1.16-{{ icebergVersion }}.jar in flink/lib dir +# Flink 1.16+ has a regression in loading external jars via -j. See FLINK-30035 for details. +# put iceberg-flink-runtime-{{ flinkVersionMajor }}-{{ icebergVersion }}.jar in flink/lib dir ./bin/sql-client.sh embedded shell ``` @@ -111,11 +111,11 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V ## Flink's Python API !!! info - PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786) + PyFlink 1.6.1 has a known issue on macOS with Apple Silicon. See [FLINK-28786](https://issues.apache.org/jira/browse/FLINK-28786). Install the Apache Flink dependency using `pip`: -```python +```bash pip install apache-flink=={{ flinkVersion }} ``` @@ -175,20 +175,20 @@ Run a query: For more details, please refer to the [Python Table API](https://ci.apache.org/projects/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/python/table/intro_to_table_api/). -## Adding catalogs. +## Adding catalogs -Flink support to create catalogs by using Flink SQL. +Flink supports creating catalogs using Flink SQL. ### Catalog Configuration A catalog is created and named by executing the following query (replace `` with your catalog name and -``=`` with catalog implementation config): +`'' = ''` with catalog implementation config): ```sql CREATE CATALOG WITH ( 'type'='iceberg', - ``=`` -); + '' = '' +); ``` The following properties can be set globally and are not limited to a specific catalog implementation: @@ -245,7 +245,7 @@ INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a'); INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table; ``` -To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables. +To replace data in the table with the result of a query, use `INSERT OVERWRITE` in a batch job (Flink streaming jobs do not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables. Partitions that have rows produced by the SELECT query will be replaced, for example: @@ -253,7 +253,7 @@ Partitions that have rows produced by the SELECT query will be replaced, for exa INSERT OVERWRITE `hive_catalog`.`default`.`sample` VALUES (1, 'a'); ``` -Iceberg also support overwriting given partitions by the `select` values: +Iceberg also supports overwriting given partitions by the `SELECT` values: ```sql INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6; @@ -276,8 +276,10 @@ env.execute("Test Iceberg DataStream"); ``` ### Branch Writes -Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink` +Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`. + For more information on branches please refer to [branches](branching.md). + ```java FlinkSink.forRowData(input) .tableLoader(tableLoader) @@ -287,7 +289,7 @@ FlinkSink.forRowData(input) ## Reading -Submit a Flink __batch__ job using the following sentences: +Submit a Flink __batch__ job using the following statements: ```sql -- Execute the flink job in batch mode for current session context @@ -295,7 +297,7 @@ SET execution.runtime-mode = batch; SELECT * FROM `hive_catalog`.`default`.`sample`; ``` -Iceberg supports processing incremental data in flink __streaming__ jobs which starts from a historical snapshot-id: +Iceberg supports processing incremental data in Flink __streaming__ jobs that start from a historical snapshot ID: ```sql -- Submit the flink job in streaming mode for current session. @@ -314,10 +316,10 @@ SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table: ```sql -SELECT * FROM `hive_catalog`.`default`.`sample`.`snapshots` +SELECT * FROM `hive_catalog`.`default`.`sample$snapshots`; ``` -Iceberg support streaming or batch read in Java API: +Iceberg supports streaming or batch reads in the Java API: ``` DataStream batch = FlinkSource.forRowData()