blob: f32e63d3fcbdc7204cebdde8f9d5d5b54c6ee86e [file] [log] [blame] [view]
---
title: "Flink API"
weight: 2
type: docs
aliases:
- /api/flink-api.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Flink API
{{< hint info >}}
If possible, recommend using Flink SQL or Spark SQL, or simply use SQL APIs in programs.
{{< /hint >}}
## Dependency
Maven dependency:
```xml
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-{{< param FlinkVersion >}}</artifactId>
<version>{{< version >}}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>{{< param FlinkVersion >}}.0</version>
<scope>provided</scope>
</dependency>
```
Or download the jar file:
{{< stable >}}[Paimon Flink](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-{{< param FlinkVersion >}}/{{< version >}}/paimon-flink-{{< param FlinkVersion >}}-{{< version >}}.jar).{{< /stable >}}
{{< unstable >}}[Paimon Flink](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-{{< param FlinkVersion >}}/{{< version >}}/).{{< /unstable >}}
Please choose your Flink version.
Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar.
Not only DataStream API, you can also read or write to Paimon tables by the conversion between DataStream and Table in Flink.
See [DataStream API Integration](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/).
## Write to Table
```java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
public class WriteToTable {
public static void writeTo() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// for CONTINUOUS_UNBOUNDED source, set checkpoint interval
// env.enableCheckpointing(60_000);
// create a changelog DataStream
DataStream<Row> input =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
.returns(
Types.ROW_NAMED(
new String[] {"name", "age"}, Types.STRING, Types.INT));
// get table from catalog
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalog.getTable(Identifier.create("my_db", "T"));
DataType inputType =
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()));
FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, inputType);
// set sink parallelism
// builder.parallelism(_your_parallelism)
// set overwrite mode
// builder.overwrite(...)
builder.build();
env.execute();
}
}
```
## Read from Table
```java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
public class ReadFromTable {
public static void readFrom() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get table from catalog
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalog.getTable(Identifier.create("my_db", "T"));
// table = table.copy(Collections.singletonMap("scan.file-creation-time-millis", "..."));
FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env);
// builder.sourceBounded(true);
// builder.projection(...);
// builder.predicate(...);
// builder.limit(...);
// builder.sourceParallelism(...);
DataStream<Row> dataStream = builder.buildForRow();
// use this datastream
dataStream.executeAndCollect().forEachRemaining(System.out::println);
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
}
}
```
## Cdc ingestion Table
Paimon supports ingest data into Paimon tables with schema evolution.
- You can use Java API to write cdc records into Paimon Tables.
- You can write records to Paimon's partial-update table with adding columns dynamically.
Here is an example to use `RichCdcSinkBuilder` API:
```java
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.RichCdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataTypes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import static org.apache.paimon.types.RowKind.INSERT;
public class WriteCdcToTable {
public static void writeTo() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// for CONTINUOUS_UNBOUNDED source, set checkpoint interval
// env.enableCheckpointing(60_000);
DataStream<RichCdcRecord> dataStream =
env.fromElements(
RichCdcRecord.builder(INSERT)
.field("order_id", DataTypes.BIGINT(), "123")
.field("price", DataTypes.DOUBLE(), "62.2")
.build(),
// dt field will be added with schema evolution
RichCdcRecord.builder(INSERT)
.field("order_id", DataTypes.BIGINT(), "245")
.field("price", DataTypes.DOUBLE(), "82.1")
.field("dt", DataTypes.TIMESTAMP(), "2023-06-12 20:21:12")
.build());
Identifier identifier = Identifier.create("my_db", "T");
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
CatalogLoader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalogLoader.load().getTable(identifier);
new RichCdcSinkBuilder(table)
.forRichCdcRecord(dataStream)
.identifier(identifier)
.catalogLoader(catalogLoader)
.build();
env.execute();
}
}
```