blob: 1973a185670f1967af030ddef1cbf61d86615b75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.assertj.core.api.Assertions.assertThat;
/** The ITCase for {@link JdbcDynamicTableSink}. */
public abstract class JdbcDynamicTableSinkITCase extends AbstractTestBase implements DatabaseTest {
private final TableRow upsertOutputTable = createUpsertOutputTable();
private final TableRow appendOutputTable = createAppendOutputTable();
private final TableRow batchOutputTable = createBatchOutputTable();
private final TableRow realOutputTable = createRealOutputTable();
private final TableRow checkpointOutputTable = createCheckpointOutputTable();
private final TableRow userOutputTable = createUserOutputTable();
protected TableRow createUpsertOutputTable() {
return tableRow(
"dynamicSinkForUpsert",
pkField("cnt", DataTypes.BIGINT().notNull()),
field("lencnt", DataTypes.BIGINT().notNull()),
pkField("cTag", DataTypes.INT().notNull()),
field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
}
protected TableRow createAppendOutputTable() {
return tableRow(
"dynamicSinkForAppend",
field("id", DataTypes.INT().notNull()),
field("num", DataTypes.BIGINT().notNull()),
field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
}
protected TableRow createBatchOutputTable() {
return tableRow(
"dynamicSinkForBatch",
field("NAME", DataTypes.VARCHAR(20).notNull()),
field("SCORE", DataTypes.BIGINT().notNull()));
}
protected TableRow createRealOutputTable() {
return tableRow("REAL_TABLE", field("real_data", dbType("REAL"), DataTypes.FLOAT()));
}
protected TableRow createCheckpointOutputTable() {
return tableRow("checkpointTable", field("id", DataTypes.BIGINT().notNull()));
}
protected TableRow createUserOutputTable() {
return tableRow(
"USER_TABLE",
pkField("user_id", DataTypes.VARCHAR(20).notNull()),
field("user_name", DataTypes.VARCHAR(20).notNull()),
field("email", DataTypes.VARCHAR(255)),
field("balance", DataTypes.DECIMAL(18, 2)),
field("balance2", DataTypes.DECIMAL(18, 2)));
}
@Override
public List<TableManaged> getManagedTables() {
return Arrays.asList(
upsertOutputTable,
appendOutputTable,
batchOutputTable,
realOutputTable,
checkpointOutputTable,
userOutputTable);
}
@AfterEach
void afterEach() {
TestValuesTableFactory.clearAllData();
}
protected List<Row> testUserData() {
return Arrays.asList(
Row.of(
"user1",
"Tom",
"tom123@gmail.com",
new BigDecimal("8.10"),
new BigDecimal("16.20")),
Row.of(
"user3",
"Bailey",
"bailey@qq.com",
new BigDecimal("9.99"),
new BigDecimal("19.98")),
Row.of(
"user4",
"Tina",
"tina@gmail.com",
new BigDecimal("11.30"),
new BigDecimal("22.60")));
}
protected List<Row> testData() {
return Arrays.asList(
Row.of(1, 1L, "Hi", Timestamp.valueOf("1970-01-01 00:00:00.001")),
Row.of(2, 2L, "Hello", Timestamp.valueOf("1970-01-01 00:00:00.002")),
Row.of(3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 00:00:00.003")),
Row.of(
4,
3L,
"Hello world, how are you?",
Timestamp.valueOf("1970-01-01 00:00:00.004")),
Row.of(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 00:00:00.005")),
Row.of(6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 00:00:00.006")),
Row.of(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 00:00:00.007")),
Row.of(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 00:00:00.008")),
Row.of(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 00:00:00.009")),
Row.of(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 00:00:00.010")),
Row.of(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 00:00:00.011")),
Row.of(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 00:00:00.012")),
Row.of(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 00:00:00.013")),
Row.of(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 00:00:00.014")),
Row.of(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 00:00:00.015")),
Row.of(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 00:00:00.016")),
Row.of(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 00:00:00.017")),
Row.of(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 00:00:00.018")),
Row.of(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 00:00:00.019")),
Row.of(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 00:00:00.020")),
Row.of(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 00:00:00.021")));
}
protected Map<Integer, Row> testDataMap() {
return testData().stream()
.collect(Collectors.toMap(r -> r.getFieldAs(0), Function.identity()));
}
private void createTestDataTempView(StreamTableEnvironment tEnv, String viewName) {
Table table = tEnv.fromValues(testData()).as("id", "num", "text", "ts");
tEnv.createTemporaryView(viewName, table);
}
@Test
void testReal() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode());
String tableName = "realSink";
tEnv.executeSql(realOutputTable.getCreateQueryForFlink(getMetadata(), tableName));
tEnv.executeSql(String.format("INSERT INTO %s SELECT CAST(1.0 as FLOAT)", tableName))
.await();
assertThat(realOutputTable.selectAllTable(getMetadata())).containsExactly(Row.of(1.0f));
}
@Test
void testUpsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String viewName = "testData";
createTestDataTempView(tEnv, viewName);
String tableName = "upsertSink";
tEnv.executeSql(
upsertOutputTable.getCreateQueryForFlink(
getMetadata(),
tableName,
Arrays.asList(
"'sink.buffer-flush.max-rows' = '2'",
"'sink.buffer-flush.interval' = '0'",
"'sink.max-retries' = '0'")));
tEnv.executeSql(
String.format(
"INSERT INTO %s "
+ " SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS ts "
+ " FROM ( "
+ " SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS ts "
+ " FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM %s) "
+ " GROUP BY len, cTag "
+ " ) "
+ " GROUP BY cnt, cTag",
tableName, viewName))
.await();
Map<Integer, Row> mapTestData = testDataMap();
assertThat(upsertOutputTable.selectAllTable(getMetadata()))
.containsExactlyInAnyOrder(
Row.of(1L, 5L, 1, mapTestData.get(6).getField(3)),
Row.of(7L, 1L, 1, mapTestData.get(21).getField(3)),
Row.of(9L, 1L, 1, mapTestData.get(15).getField(3)));
}
@Test
void testAppend() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String viewName = "testData";
createTestDataTempView(tEnv, viewName);
String tableName = "appendSink";
tEnv.executeSql(appendOutputTable.getCreateQueryForFlink(getMetadata(), tableName));
Set<Integer> searchIds = new HashSet<>(Arrays.asList(2, 10, 20));
tEnv.executeSql(
String.format(
"INSERT INTO %s SELECT id, num, ts FROM %s WHERE id IN (%s)",
tableName,
viewName,
searchIds.stream()
.map(Object::toString)
.collect(Collectors.joining(","))))
.await();
List<Row> tableRows = appendOutputTable.selectAllTable(getMetadata());
assertThat(tableRows.size()).isEqualTo(3);
Map<Integer, Row> mapTestData = testDataMap();
assertThat(tableRows)
.containsExactlyInAnyOrderElementsOf(
searchIds.stream()
.map(mapTestData::get)
.map(d -> Row.of(d.getField(0), d.getField(1), d.getField(3)))
.collect(Collectors.toList()));
}
@Test
void testBatchSink() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
String tableName = "batchSink";
tEnv.executeSql(
batchOutputTable.getCreateQueryForFlink(
getMetadata(),
tableName,
Arrays.asList(
"'sink.buffer-flush.max-rows' = '2'",
"'sink.buffer-flush.interval' = '300ms'",
"'sink.max-retries' = '4'")));
TableResult tableResult =
tEnv.executeSql(
String.format(
"INSERT INTO %s "
+ " SELECT user_name, score "
+ " FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) "
+ " AS UserCountTable(score, user_name) ",
tableName));
tableResult.await();
assertThat(batchOutputTable.selectAllTable(getMetadata()))
.containsExactlyInAnyOrder(
Row.of("Bob", 1L),
Row.of("Tom", 22L),
Row.of("Kim", 42L),
Row.of("Kim", 42L),
Row.of("Bob", 1L));
}
@Test
void testReadingFromChangelogSource() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
String userTableLogs = "user_logs";
tEnv.executeSql(
String.format(
"CREATE TABLE %s ( "
+ " user_id STRING, "
+ " user_name STRING, "
+ " email STRING, "
+ " balance DECIMAL(18,2), "
+ " balance2 AS balance * 2 "
+ ") WITH ( "
+ " 'connector' = 'values', "
+ " 'data-id' = '%s', "
+ " 'changelog-mode' = 'I,UA,UB,D' "
+ ")",
userTableLogs, dataId));
String userTableSink = "user_sink";
tEnv.executeSql(
userOutputTable.getCreateQueryForFlink(
getMetadata(),
userTableSink,
Arrays.asList(
"'sink.buffer-flush.max-rows' = '2'",
"'sink.buffer-flush.interval' = '0'")));
tEnv.executeSql(
String.format(
"INSERT INTO %s SELECT * FROM %s", userTableSink, userTableLogs))
.await();
assertThat(userOutputTable.selectAllTable(getMetadata()))
.containsExactlyInAnyOrderElementsOf(testUserData());
}
@Test
void testFlushBufferWhenCheckpoint() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("connector", "jdbc");
options.put("url", getMetadata().getJdbcUrl());
options.put("username", getMetadata().getUsername());
options.put("password", getMetadata().getPassword());
options.put("table-name", checkpointOutputTable.getTableName());
options.put("sink.buffer-flush.interval", "0");
ResolvedSchema schema = checkpointOutputTable.getTableResolvedSchema();
DynamicTableSink tableSink = createTableSink(schema, options);
SinkRuntimeProviderContext context = new SinkRuntimeProviderContext(false);
SinkFunctionProvider sinkProvider =
(SinkFunctionProvider) tableSink.getSinkRuntimeProvider(context);
GenericJdbcSinkFunction<RowData> sinkFunction =
(GenericJdbcSinkFunction<RowData>) sinkProvider.createSinkFunction();
sinkFunction.setRuntimeContext(new MockStreamingRuntimeContext(true, 1, 0));
sinkFunction.open(new Configuration());
sinkFunction.invoke(GenericRowData.of(1L), SinkContextUtil.forTimestamp(1));
sinkFunction.invoke(GenericRowData.of(2L), SinkContextUtil.forTimestamp(1));
assertThat(checkpointOutputTable.selectAllTable(getMetadata())).isEmpty();
sinkFunction.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
assertThat(checkpointOutputTable.selectAllTable(getMetadata()))
.containsExactlyInAnyOrder(Row.of(1L), Row.of(2L));
sinkFunction.close();
}
}