Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions docs/docs/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ Start the Flink SQL client. There is a separate `flink-runtime` module in the Ic
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Below works for 1.15 or less
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-1.15-{{ icebergVersion }}.jar shell
# Below works for Flink 1.15 or earlier
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-{{ flinkVersionMajor }}-{{ icebergVersion }}.jar shell

# 1.16 or above has a regression in loading external jar via -j option. See FLINK-30035 for details.
put iceberg-flink-runtime-1.16-{{ icebergVersion }}.jar in flink/lib dir
# Flink 1.16+ has a regression in loading external jars via -j. See FLINK-30035 for details.
# put iceberg-flink-runtime-{{ flinkVersionMajor }}-{{ icebergVersion }}.jar in flink/lib dir
./bin/sql-client.sh embedded shell
```

Expand Down Expand Up @@ -111,11 +111,11 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V
## Flink's Python API

!!! info
PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786)
PyFlink 1.6.1 has a known issue on macOS with Apple Silicon. See [FLINK-28786](https://issues.apache.org/jira/browse/FLINK-28786).

Install the Apache Flink dependency using `pip`:

```python
```bash
pip install apache-flink=={{ flinkVersion }}
```

Expand Down Expand Up @@ -175,20 +175,20 @@ Run a query:

For more details, please refer to the [Python Table API](https://ci.apache.org/projects/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/python/table/intro_to_table_api/).

## Adding catalogs.
## Adding catalogs

Flink support to create catalogs by using Flink SQL.
Flink supports creating catalogs using Flink SQL.

### Catalog Configuration

A catalog is created and named by executing the following query (replace `<catalog_name>` with your catalog name and
`<config_key>`=`<config_value>` with catalog implementation config):
`'<config_key>' = '<config_value>'` with catalog implementation config):

```sql
CREATE CATALOG <catalog_name> WITH (
'type'='iceberg',
`<config_key>`=`<config_value>`
);
'<config_key>' = '<config_value>'
);
```

The following properties can be set globally and are not limited to a specific catalog implementation:
Expand Down Expand Up @@ -245,15 +245,15 @@ INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
```

To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
To replace data in the table with the result of a query, use `INSERT OVERWRITE` in a batch job (Flink streaming jobs do not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.

Partitions that have rows produced by the SELECT query will be replaced, for example:

```sql
INSERT OVERWRITE `hive_catalog`.`default`.`sample` VALUES (1, 'a');
```

Iceberg also support overwriting given partitions by the `select` values:
Iceberg also supports overwriting given partitions by the `SELECT` values:

```sql
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
Expand All @@ -276,8 +276,10 @@ env.execute("Test Iceberg DataStream");
```

### Branch Writes
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`.

For more information on branches please refer to [branches](branching.md).

```java
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
Expand All @@ -287,15 +289,15 @@ FlinkSink.forRowData(input)

## Reading

Submit a Flink __batch__ job using the following sentences:
Submit a Flink __batch__ job using the following statements:

```sql
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM `hive_catalog`.`default`.`sample`;
```

Iceberg supports processing incremental data in flink __streaming__ jobs which starts from a historical snapshot-id:
Iceberg supports processing incremental data in Flink __streaming__ jobs that start from a historical snapshot ID:

```sql
-- Submit the flink job in streaming mode for current session.
Expand All @@ -314,10 +316,10 @@ SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true',
SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table:

```sql
SELECT * FROM `hive_catalog`.`default`.`sample`.`snapshots`
SELECT * FROM `hive_catalog`.`default`.`sample$snapshots`;
```

Iceberg support streaming or batch read in Java API:
Iceberg supports streaming or batch reads in the Java API:

```
DataStream<RowData> batch = FlinkSource.forRowData()
Expand Down
Loading