blob: a0ce8be3af7f6461f3f1e6cf3aa474c77c52eefa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.flink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** SQL ITCase for continuous file store. */
public class ContinuousFileStoreITCase extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'a')",
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ " WITH ('changelog-producer'='input', 'bucket' = '1')");
}
@Test
public void testSourceReuseWithoutScanPushDown() {
sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH ('connector'='print')");
sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH ('connector'='print')");
StatementSet statementSet = sEnv.createStatementSet();
statementSet.addInsertSql(
"INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */");
statementSet.addInsertSql(
"INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */");
assertThat(statementSet.compilePlan().explain()).contains("Reused");
statementSet = sEnv.createStatementSet();
statementSet.addInsertSql(
"INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache'");
statementSet.addInsertSql(
"INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon'");
assertThat(statementSet.compilePlan().explain()).contains("Reused");
statementSet = sEnv.createStatementSet();
statementSet.addInsertSql(
"INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache' LIMIT 5");
statementSet.addInsertSql(
"INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon' LIMIT 10");
assertThat(statementSet.compilePlan().explain()).contains("Reused");
}
@Test
public void testSourceReuseWithScanPushDown() {
// source can be reused with projection applied
sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH ('connector'='print')");
sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH ('connector'='print')");
StatementSet statementSet = sEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO print1 SELECT a FROM T1");
statementSet.addInsertSql("INSERT INTO print2 SELECT b FROM T1");
assertThat(statementSet.compilePlan().explain()).contains("Reused");
// source cannot be reused with filter or limit applied
sEnv.executeSql(
"CREATE TEMPORARY TABLE new_print1 (a STRING, b STRING, c STRING) WITH ('connector'='print')");
sEnv.executeSql(
"CREATE TEMPORARY TABLE new_print2 (a STRING, b STRING, c STRING) WITH ('connector'='print')");
statementSet = sEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 WHERE a = 'Apache'");
statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
statementSet = sEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 LIMIT 5");
statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
}
@Test
public void testWithoutPrimaryKey() throws Exception {
testSimple("T1");
}
@Test
public void testWithPrimaryKey() throws Exception {
testSimple("T2");
}
@Test
public void testProjectionWithoutPrimaryKey() throws Exception {
testProjection("T1");
}
@Test
public void testProjectionWithPrimaryKey() throws Exception {
testProjection("T2");
}
@Test
public void testConsumerId() throws Exception {
String table = "T2";
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table));
batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
Thread.sleep(1000);
iterator.close();
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table));
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
iterator.close();
}
@Test
public void testConsumerIdInBatch() throws Exception {
String table = "T2";
batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
Thread.sleep(1000);
iterator.close();
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
// ignore the consumer id in batch mode
assertThat(
sql(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
}
@Test
@Timeout(120)
public void testSnapshotWatermark() throws Exception {
streamSqlIter(
"CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING,"
+ " dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')");
CloseableIterator<Row> insert1 = streamSqlIter("INSERT INTO T2 SELECT a, b, c FROM gen");
sql("CREATE TABLE WT (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)");
CloseableIterator<Row> insert2 =
streamSqlIter(
"INSERT INTO WT SELECT * FROM T2 /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */");
while (true) {
Set<Long> watermarks =
sql("SELECT `watermark` FROM WT$snapshots").stream()
.map(r -> (Long) r.getField(0))
.collect(Collectors.toSet());
if (watermarks.size() > 1) {
insert1.close();
insert2.close();
return;
}
Thread.sleep(1000);
}
}
private void testSimple(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
// batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
iterator.close();
}
private void testProjection(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s", table));
batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("2", "3"), Row.of("5", "6"));
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8", "9"));
iterator.close();
}
@Test
public void testContinuousLatest() throws Exception {
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */"));
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12"));
iterator.close();
}
@Test
public void testContinuousLatestStartingFromEmpty() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest') */"));
sql("INSERT INTO T1 VALUES ('1', 'Hello', 'World')");
sql("INSERT INTO T1 VALUES ('2', 'Apache', 'Paimon')");
sql("INSERT INTO T1 VALUES ('3', 'C', 'c')");
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Row.of("1", "Hello", "World"),
Row.of("2", "Apache", "Paimon"),
Row.of("3", "C", "c"));
}
@Test
public void testContinuousFromTimestamp() throws Exception {
String sql =
"SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */";
// empty table
BlockingIterator<Row, Row> iterator = BlockingIterator.of(streamSqlIter(sql, 0));
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
SnapshotManager snapshotManager =
new SnapshotManager(LocalFileIO.create(), getTableDirectory("T1"));
List<Snapshot> snapshots =
new ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
Snapshot first = snapshots.get(0);
Snapshot second = snapshots.get(1);
// before second snapshot
iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() - 1));
batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15"));
iterator.close();
// from second snapshot
iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis()));
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15"));
iterator.close();
// after second snapshot
iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() + 1));
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("13", "14", "15"));
iterator.close();
// from start
iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 1));
assertThat(iterator.collect(5))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"),
Row.of("4", "5", "6"),
Row.of("7", "8", "9"),
Row.of("10", "11", "12"),
Row.of("13", "14", "15"));
iterator.close();
// from end
iterator =
BlockingIterator.of(
streamSqlIter(
sql,
snapshotManager
.snapshot(snapshotManager.latestSnapshotId())
.timeMillis()
+ 1));
batchSql("INSERT INTO T1 VALUES ('16', '17', '18')");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("16", "17", "18"));
iterator.close();
}
@Test
public void testLackStartupTimestamp() {
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */"))
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"must set only one key in [scan.timestamp-millis,scan.timestamp] when you use from-timestamp for scan.mode");
}
@Test
public void testConfigureStartupTimestamp() throws Exception {
// Configure 'log.scan.timestamp-millis' without 'log.scan'.
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('log.scan.timestamp-millis'='%s') */",
0));
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
// Configure 'log.scan.timestamp-millis' with 'log.scan=latest'.
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */",
0))
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"scan.timestamp-millis must be null when you use latest for scan.mode");
}
@Test
public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
// Start from earliest snapshot
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 2));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), (Row.of("10", "11", "12")));
iterator.close();
// Configure 'scan.snapshot-id' with 'scan.mode=latest'.
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
0))
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"scan.snapshot-id must be null when you use latest for scan.mode");
}
@Test
public void testConfigureStartupSnapshotFull() throws Exception {
// Configure 'scan.snapshot-id' with 'scan.mode'='from-snapshot-full'.
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
// Start from earliest snapshot
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
1));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
2));
assertThat(iterator.collect(4))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"),
Row.of("4", "5", "6"),
Row.of("7", "8", "9"),
Row.of("10", "11", "12"));
iterator.close();
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
2));
assertThat(iterator.collect(5))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"),
Row.of("4", "5", "6"),
Row.of("7", "8", "9"),
Row.of("10", "11", "12"),
Row.of("13", "14", "15"));
iterator.close();
}
@Test
public void testIgnoreOverwrite() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
// should ignore this overwrite
batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')");
batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", "10", "11"));
iterator.close();
}
@Test
public void testUnsupportedUpsert() {
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */"))
.hasCauseInstanceOf(ValidationException.class)
.hasRootCauseMessage(
"File store continuous reading does not support upsert changelog mode.");
}
@Test
public void testUnsupportedEventual() {
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */"))
.hasCauseInstanceOf(ValidationException.class)
.hasRootCauseMessage(
"File store continuous reading does not support eventual consistency mode.");
}
@Test
public void testFlinkMemoryPool() {
// Check if the configuration is effective
assertThatThrownBy(
() ->
batchSql(
"INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ "
+ "VALUES ('1', '2', '3'), ('4', '5', '6')",
"T1"))
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"Weights for operator scope use cases must be greater than 0.");
batchSql(
"INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ "
+ "VALUES ('1', '2', '3'), ('4', '5', '6')",
"T1");
assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
}
@Test
public void testDynamicPartitionPruningNotWork() throws Exception {
// dim table
sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)");
sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)");
// partitioned fact table
sql(
"CREATE TABLE fact (a INT, b BIGINT, c STRING, p INT, `proctime` AS PROCTIME(), PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)\n");
sql(
"INSERT INTO fact PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')");
sql(
"INSERT INTO fact PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')");
sql(
"INSERT INTO fact PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')");
String joinSql = "SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1";
// check dynamic partition pruning isn't working
assertThat(sEnv.explainSql(joinSql)).doesNotContain("DynamicFilteringDataCollector");
// check results
BlockingIterator<Row, Row> iterator = BlockingIterator.of(streamSqlIter(joinSql));
assertThat(iterator.collect(6))
.containsExactlyInAnyOrder(
Row.of(10, 100L, "aaa", 1, 1, "a"),
Row.of(11, 101L, "bbb", 1, 1, "a"),
Row.of(12, 102L, "ccc", 1, 1, "a"),
Row.of(20, 200L, "aaa", 2, 2, "b"),
Row.of(21, 201L, "bbb", 2, 2, "b"),
Row.of(22, 202L, "ccc", 2, 2, "b"));
iterator.close();
}
@Test
public void testIgnoreDelete() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
+ "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true')");
BlockingIterator<Row, Row> iterator =
streamSqlBlockIter(
"SELECT * FROM ignore_delete /*+ OPTIONS('continuous.discovery-interval' = '1s', 'scan.snapshot-id' = '1') */");
sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'B')");
// no -D[1, 'A'] but exist -U[1, 'A']
assertThat(iterator.collect(4))
.containsExactly(
Row.ofKind(RowKind.INSERT, 1, "A"),
Row.ofKind(RowKind.INSERT, 2, "B"),
Row.ofKind(RowKind.UPDATE_BEFORE, 1, "A"),
Row.ofKind(RowKind.UPDATE_AFTER, 1, "B"));
iterator.close();
}
@Test
public void testScanFromOldSchema() throws Exception {
sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)");
sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')");
Thread.sleep(1_000);
long timestamp = System.currentTimeMillis();
sql("ALTER TABLE select_old ADD f2 STRING");
sql("INSERT INTO select_old VALUES (3, 'c', 'C')");
// this way will initialize source with the latest schema
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM select_old /*+ OPTIONS('scan.timestamp-millis'='%s') */",
timestamp));
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, "c", "C"));
iterator.close();
// this way will initialize source with time-travelled schema
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM select_old FOR SYSTEM_TIME AS OF TIMESTAMP '%s'",
DateTimeUtils.formatTimestamp(
DateTimeUtils.toInternal(timestamp, 0), 0)));
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, "c"));
}
@ParameterizedTest(name = "changelog-producer = {0}")
@ValueSource(strings = {"none", "input"})
public void testScanFromChangelog(String changelogProducer) throws Exception {
batchSql(
"CREATE TABLE IF NOT EXISTS T3 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)\n"
+ " WITH ('changelog-producer'='%s', 'bucket' = '1', \n"
+ " 'snapshot.num-retained.max' = '2',\n"
+ " 'snapshot.num-retained.min' = '1',\n"
+ " 'changelog.num-retained.max' = '3',\n"
+ " 'changelog.num-retained.min' = '1'\n"
+ ")",
changelogProducer);
batchSql("INSERT INTO T3 VALUES ('1', '2', '3')");
batchSql("INSERT INTO T3 VALUES ('4', '5', '6')");
batchSql("INSERT INTO T3 VALUES ('7', '8', '9')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
iterator.close();
batchSql("INSERT INTO T3 VALUES ('10', '11', '12')");
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Row.of("4", "5", "6"), Row.of("7", "8", "9"), Row.of("10", "11", "12"));
iterator.close();
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 4));
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("10", "11", "12"));
iterator.close();
}
}