title: “Flink API” weight: 2 type: docs aliases:
{{< hint info >}} If possible, recommend using Flink SQL or Spark SQL, or simply use SQL APIs in programs. {{< /hint >}}
Maven dependency:
<dependency> <groupId>org.apache.paimon</groupId> <artifactId>paimon-flink-{{< param FlinkVersion >}}</artifactId> <version>{{< version >}}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>{{< param FlinkVersion >}}.0</version> <scope>provided</scope> </dependency>
Or download the jar file: {{< stable >}}[Paimon Flink](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-{{< param FlinkVersion >}}/{{< version >}}/paimon-flink-{{< param FlinkVersion >}}-{{< version >}}.jar).{{< /stable >}} {{< unstable >}}[Paimon Flink](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-{{< param FlinkVersion >}}/{{< version >}}/).{{< /unstable >}}
Please choose your Flink version.
Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar.
Not only DataStream API, you can also read or write to Paimon tables by the conversion between DataStream and Table in Flink. See DataStream API Integration.
import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; public class WriteToTable { public static void writeTo() throws Exception { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // for CONTINUOUS_UNBOUNDED source, set checkpoint interval // env.enableCheckpointing(60_000); // create a changelog DataStream DataStream<Row> input = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", 12), Row.ofKind(RowKind.INSERT, "Bob", 5), Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)) .returns( Types.ROW_NAMED( new String[] {"name", "age"}, Types.STRING, Types.INT)); // get table from catalog Options catalogOptions = new Options(); catalogOptions.set("warehouse", "/path/to/warehouse"); Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); Table table = catalog.getTable(Identifier.create("my_db", "T")); DataType inputType = DataTypes.ROW( DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT())); FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, inputType); // set sink parallelism // builder.parallelism(_your_parallelism) // set overwrite mode // builder.overwrite(...) builder.build(); env.execute(); } }
import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.source.FlinkSourceBuilder; import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; public class ReadFromTable { public static void readFrom() throws Exception { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get table from catalog Options catalogOptions = new Options(); catalogOptions.set("warehouse", "/path/to/warehouse"); Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); Table table = catalog.getTable(Identifier.create("my_db", "T")); // table = table.copy(Collections.singletonMap("scan.file-creation-time-millis", "...")); FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env); // builder.sourceBounded(true); // builder.projection(...); // builder.predicate(...); // builder.limit(...); // builder.sourceParallelism(...); DataStream<Row> dataStream = builder.buildForRow(); // use this datastream dataStream.executeAndCollect().forEachRemaining(System.out::println); // prints: // +I[Bob, 12] // +I[Alice, 12] // -U[Alice, 12] // +U[Alice, 14] } }
Paimon supports ingest data into Paimon tables with schema evolution.
Here is an example to use RichCdcSinkBuilder API:
import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.sink.cdc.RichCdcRecord; import org.apache.paimon.flink.sink.cdc.RichCdcSinkBuilder; import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataTypes; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import static org.apache.paimon.types.RowKind.INSERT; public class WriteCdcToTable { public static void writeTo() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // for CONTINUOUS_UNBOUNDED source, set checkpoint interval // env.enableCheckpointing(60_000); DataStream<RichCdcRecord> dataStream = env.fromElements( RichCdcRecord.builder(INSERT) .field("order_id", DataTypes.BIGINT(), "123") .field("price", DataTypes.DOUBLE(), "62.2") .build(), // dt field will be added with schema evolution RichCdcRecord.builder(INSERT) .field("order_id", DataTypes.BIGINT(), "245") .field("price", DataTypes.DOUBLE(), "82.1") .field("dt", DataTypes.TIMESTAMP(), "2023-06-12 20:21:12") .build()); Identifier identifier = Identifier.create("my_db", "T"); Options catalogOptions = new Options(); catalogOptions.set("warehouse", "/path/to/warehouse"); CatalogLoader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); Table table = catalogLoader.load().getTable(identifier); new RichCdcSinkBuilder(table) .forRichCdcRecord(dataStream) .identifier(identifier) .catalogLoader(catalogLoader) .build(); env.execute(); } }