blob: 840d9067a69ba7442e99b44d122fb5c1d22b1245 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.quickstart;
import static org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations.sql;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory;
import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations;
import org.jetbrains.annotations.NotNull;
public final class HoodieFlinkQuickstart {
private EnvironmentSettings settings = null;
private TableEnvironment streamTableEnv = null;
private String tableName;
private HoodieFlinkQuickstart() {
}
public static HoodieFlinkQuickstart instance() {
return new HoodieFlinkQuickstart();
}
public static void main(String[] args) throws TableNotExistException, InterruptedException {
if (args.length < 3) {
System.err.println("Usage: HoodieWriteClientExample <tablePath> <tableName> <tableType>");
System.exit(1);
}
String tablePath = args[0];
String tableName = args[1];
String tableType = args[2];
HoodieFlinkQuickstart flinkQuickstart = instance();
flinkQuickstart.initEnv();
// create filesystem table named source
flinkQuickstart.createFileSource();
// create hudi table
flinkQuickstart.createHudiTable(tablePath, tableName, HoodieTableType.valueOf(tableType));
// insert data
flinkQuickstart.insertData();
// query data
flinkQuickstart.queryData();
// update data
flinkQuickstart.updateData();
}
public void initEnv() {
if (this.streamTableEnv == null) {
settings = EnvironmentSettings.newInstance().build();
TableEnvironment streamTableEnv = TableEnvironmentImpl.create(settings);
streamTableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Configuration execConf = streamTableEnv.getConfig().getConfiguration();
execConf.setString("execution.checkpointing.interval", "2s");
// configure not to retry after failure
execConf.setString("restart-strategy", "fixed-delay");
execConf.setString("restart-strategy.fixed-delay.attempts", "0");
this.streamTableEnv = streamTableEnv;
}
}
public TableEnvironment getStreamTableEnv() {
return streamTableEnv;
}
public TableEnvironment getBatchTableEnv() {
Configuration conf = new Configuration();
// for batch upsert use cases: current suggestion is to disable these 2 options,
// from 1.14, flink runtime execution mode has switched from streaming
// to batch for batch execution mode(before that, both streaming and batch use streaming execution mode),
// current batch execution mode has these limitations:
//
// 1. the keyed stream default to always sort the inputs by key;
// 2. the batch state-backend requires the inputs sort by state key
//
// For our hudi batch pipeline upsert case, we rely on the consuming sequence for index records and data records,
// the index records must be loaded first before data records for BucketAssignFunction to keep upsert semantics correct,
// so we suggest disabling these 2 options to use streaming state-backend for batch execution mode
// to keep the strategy before 1.14.
conf.setBoolean("execution.sorted-inputs.enabled", false);
conf.setBoolean("execution.batch-state-backend.enabled", false);
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(conf);
settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment batchTableEnv = StreamTableEnvironment.create(execEnv, settings);
batchTableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
return batchTableEnv;
}
public void createHudiTable(String tablePath, String tableName,
HoodieTableType tableType) {
this.tableName = tableName;
// create hudi table
String hoodieTableDDL = sql(tableName)
.option(FlinkOptions.PATH, tablePath)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
}
public void createFileSource() {
// create filesystem table named source
String createSource = QuickstartConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
}
@NotNull List<Row> insertData() throws InterruptedException, TableNotExistException {
// insert data
String insertInto = String.format("insert into %s select * from source", tableName);
execInsertSql(streamTableEnv, insertInto);
return queryData();
}
List<Row> queryData() throws InterruptedException, TableNotExistException {
// query data
// reading from the latest commit instance.
return execSelectSql(streamTableEnv, String.format("select * from %s", tableName), 10);
}
@NotNull List<Row> updateData() throws InterruptedException, TableNotExistException {
// update data
String insertInto = String.format("insert into %s select * from source", tableName);
execInsertSql(getStreamTableEnv(), insertInto);
return queryData();
}
public static void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
try {
tableResult.getJobClient().get().getJobExecutionResult().get();
} catch (InterruptedException | ExecutionException ex) {
// ignored
}
}
public static List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout)
throws InterruptedException, TableNotExistException {
return execSelectSql(tEnv, select, timeout, null);
}
public static List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable)
throws InterruptedException, TableNotExistException {
final String sinkDDL;
if (sourceTable != null) {
// use the source table schema as the sink schema if the source table was specified, .
ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
String currentCatalog = tEnv.getCurrentCatalog();
Catalog catalog = tEnv.getCatalog(currentCatalog).get();
ResolvedCatalogTable table = (ResolvedCatalogTable) catalog.getTable(objectPath);
ResolvedSchema schema = table.getResolvedSchema();
sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink", schema);
} else {
sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink");
}
return execSelectSql(tEnv, select, sinkDDL, timeout);
}
public static List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
throws InterruptedException {
tEnv.executeSql("DROP TABLE IF EXISTS sink");
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("insert into sink " + select);
// wait for the timeout then cancels the job
TimeUnit.SECONDS.sleep(timeout);
tableResult.getJobClient().ifPresent(JobClient::cancel);
tEnv.executeSql("DROP TABLE IF EXISTS sink");
return CollectSinkTableFactory.RESULT.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}