blob: 21f1647a940dd445f604b527f6e594a1c9aac32b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test cases for {@link HoodieTableFactory}.
*/
public class TestHoodieTableFactory {
private static final String AVRO_SCHEMA_FILE_PATH = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_read_schema.avsc")).toString();
private static final String INFERRED_SCHEMA = "{\"type\":\"record\","
+ "\"name\":\"record\","
+ "\"fields\":["
+ "{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ "{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},"
+ "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
private Configuration conf;
@TempDir
File tempFile;
@BeforeEach
void beforeEach() throws IOException {
this.conf = new Configuration();
this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath());
this.conf.setString(FlinkOptions.TABLE_NAME, "t1");
StreamerUtil.initTableIfNotExists(this.conf);
}
@Test
void testRequiredOptionsForSource() {
// miss pk and pre combine key will throw exception
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.build();
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1));
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));
// given the pk and miss the pre combine key will throw exception
TableSchema schema2 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2));
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
// given pk and pre combine key will be ok
TableSchema schema3 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2");
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext3));
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3));
}
@Test
void testInferAvroSchemaForSource() {
// infer the schema if not specified
final HoodieTableSource tableSource1 =
(HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA));
// set up the explicit schema using the file path
this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
HoodieTableSource tableSource2 =
(HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf));
Configuration conf2 = tableSource2.getConf();
assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
}
@Test
void testSetupHoodieKeyOptionsForSource() {
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
// definition with complex primary keys and partition paths
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
TableSchema schema2 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20).notNull())
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0", "f1")
.build();
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName()));
// definition with complex primary keys and empty partition paths
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema2, "");
final HoodieTableSource tableSource3 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3);
final Configuration conf3 = tableSource3.getConf();
assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
}
@Test
void testSetupHiveOptionsForSource() {
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(MultiPartKeysValueExtractor.class.getName()));
// set up hive style partitioning is true.
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
}
@Test
void testSetupCleaningOptionsForSource() {
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
// set up new retains commits that is less than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20));
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30));
// set up new retains commits that is greater than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25");
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35));
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
}
@Test
void testInferAvroSchemaForSink() {
// infer the schema if not specified
final HoodieTableSink tableSink1 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA));
// set up the explicit schema using the file path
this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
HoodieTableSink tableSink2 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
Configuration conf2 = tableSink2.getConf();
assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
}
@Test
void testSetupHoodieKeyOptionsForSink() {
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
// definition with complex primary keys and partition paths
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
TableSchema schema2 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20).notNull())
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0", "f1")
.build();
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema2, "f2");
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName()));
// definition with complex primary keys and empty partition paths
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
final MockContext sinkContext3 = MockContext.getInstance(this.conf, schema2, "");
final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext3);
final Configuration conf3 = tableSink3.getConf();
assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
}
@Test
void testSetupHiveOptionsForSink() {
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(MultiPartKeysValueExtractor.class.getName()));
// set up hive style partitioning is true.
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
}
@Test
void testSetupCleaningOptionsForSink() {
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
// set up new retains commits that is less than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20));
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30));
// set up new retains commits that is greater than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25");
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35));
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
}
@Test
void testMorTableInsertAllowDuplication() {
TableSchema schema = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
// overwrite the operation
this.conf.setString(FlinkOptions.OPERATION.key(), "insert");
this.conf.setString(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
final MockContext sinkContext = MockContext.getInstance(this.conf, schema, "f2");
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sinkContext),
"Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* Mock dynamic table factory context.
*/
private static class MockContext implements DynamicTableFactory.Context {
private final Configuration conf;
private final TableSchema schema;
private final List<String> partitions;
private MockContext(Configuration conf, TableSchema schema, List<String> partitions) {
this.conf = conf;
this.schema = schema;
this.partitions = partitions;
}
static MockContext getInstance(Configuration conf) {
return getInstance(conf, TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"));
}
static MockContext getInstance(Configuration conf, TableSchema schema, String partition) {
return getInstance(conf, schema, Collections.singletonList(partition));
}
static MockContext getInstance(Configuration conf, TableSchema schema, List<String> partitions) {
return new MockContext(conf, schema, partitions);
}
@Override
public ObjectIdentifier getObjectIdentifier() {
return ObjectIdentifier.of("hudi", "default", "t1");
}
@Override
public CatalogTable getCatalogTable() {
return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock source table");
}
@Override
public ReadableConfig getConfiguration() {
return conf;
}
@Override
public ClassLoader getClassLoader() {
return null;
}
@Override
public boolean isTemporary() {
return false;
}
}
}