blob: 758a5ccf212dfe22ccd53788f20631fb68ec3def [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.hbase1;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.hbase.HConstants;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.connector.hbase.util.HBaseConfigurationUtil.getHBaseConfiguration;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.BOOLEAN;
import static org.apache.flink.table.api.DataTypes.DATE;
import static org.apache.flink.table.api.DataTypes.DECIMAL;
import static org.apache.flink.table.api.DataTypes.DOUBLE;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit test for {@link HBase1DynamicTableFactory}. */
class HBaseDynamicTableFactoryTest {
private static final String FAMILY1 = "f1";
private static final String FAMILY2 = "f2";
private static final String FAMILY3 = "f3";
private static final String FAMILY4 = "f4";
private static final String COL1 = "c1";
private static final String COL2 = "c2";
private static final String COL3 = "c3";
private static final String COL4 = "c4";
private static final String ROWKEY = "rowkey";
@Test
void testTableSourceFactory() {
ResolvedSchema schema =
ResolvedSchema.of(
Column.physical(FAMILY1, ROW(FIELD(COL1, INT()))),
Column.physical(FAMILY2, ROW(FIELD(COL1, INT()), FIELD(COL2, BIGINT()))),
Column.physical(ROWKEY, BIGINT()),
Column.physical(
FAMILY3,
ROW(
FIELD(COL1, DOUBLE()),
FIELD(COL2, BOOLEAN()),
FIELD(COL3, STRING()))),
Column.physical(
FAMILY4,
ROW(
FIELD(COL1, DECIMAL(10, 3)),
FIELD(COL2, TIMESTAMP(3)),
FIELD(COL3, DATE()),
FIELD(COL4, TIME()))));
DynamicTableSource source = createTableSource(schema, getAllOptions());
assertThat(source).isInstanceOf(HBaseDynamicTableSource.class);
HBaseDynamicTableSource hbaseSource = (HBaseDynamicTableSource) source;
int[][] lookupKey = {{2}};
LookupTableSource.LookupRuntimeProvider lookupProvider =
hbaseSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey));
assertThat(lookupProvider).isInstanceOf(LookupFunctionProvider.class);
LookupFunction tableFunction =
((LookupFunctionProvider) lookupProvider).createLookupFunction();
assertThat(tableFunction).isInstanceOf(HBaseRowDataLookupFunction.class);
assertThat(((HBaseRowDataLookupFunction) tableFunction).getHTableName())
.isEqualTo("testHBastTable");
HBaseTableSchema hbaseSchema = hbaseSource.getHBaseTableSchema();
assertThat(hbaseSchema.getRowKeyIndex()).isEqualTo(2);
assertThat(hbaseSchema.getRowKeyTypeInfo()).contains(Types.LONG);
assertThat(hbaseSchema.getFamilyNames()).containsExactly("f1", "f2", "f3", "f4");
assertThat(hbaseSchema.getQualifierNames("f1")).containsExactly("c1");
assertThat(hbaseSchema.getQualifierNames("f2")).containsExactly("c1", "c2");
assertThat(hbaseSchema.getQualifierNames("f3")).containsExactly("c1", "c2", "c3");
assertThat(hbaseSchema.getQualifierNames("f4")).containsExactly("c1", "c2", "c3", "c4");
assertThat(hbaseSchema.getQualifierDataTypes("f1")).containsExactly(INT());
assertThat(hbaseSchema.getQualifierDataTypes("f2")).containsExactly(INT(), BIGINT());
assertThat(hbaseSchema.getQualifierDataTypes("f3"))
.containsExactly(DOUBLE(), BOOLEAN(), STRING());
assertThat(hbaseSchema.getQualifierDataTypes("f4"))
.containsExactly(DECIMAL(10, 3), TIMESTAMP(3), DATE(), TIME());
}
@Test
void testLookupOptions() {
ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING()));
Map<String, String> options = getAllOptions();
options.put("lookup.cache", "PARTIAL");
options.put("lookup.partial-cache.expire-after-access", "15213s");
options.put("lookup.partial-cache.expire-after-write", "18213s");
options.put("lookup.partial-cache.max-rows", "10000");
options.put("lookup.partial-cache.cache-missing-key", "false");
options.put("lookup.max-retries", "15513");
DynamicTableSource source = createTableSource(schema, options);
HBaseDynamicTableSource hbaseSource = (HBaseDynamicTableSource) source;
assertThat(((HBaseDynamicTableSource) source).getMaxRetryTimes()).isEqualTo(15513);
assertThat(hbaseSource.getCache()).isInstanceOf(DefaultLookupCache.class);
DefaultLookupCache cache = (DefaultLookupCache) hbaseSource.getCache();
assertThat(cache)
.isEqualTo(
DefaultLookupCache.newBuilder()
.expireAfterAccess(Duration.ofSeconds(15213))
.expireAfterWrite(Duration.ofSeconds(18213))
.maximumSize(10000)
.cacheMissingKey(false)
.build());
}
@Test
void testTableSinkFactory() {
ResolvedSchema schema =
ResolvedSchema.of(
Column.physical(ROWKEY, STRING()),
Column.physical(FAMILY1, ROW(FIELD(COL1, DOUBLE()), FIELD(COL2, INT()))),
Column.physical(FAMILY2, ROW(FIELD(COL1, INT()), FIELD(COL3, BIGINT()))),
Column.physical(
FAMILY3, ROW(FIELD(COL2, BOOLEAN()), FIELD(COL3, STRING()))),
Column.physical(
FAMILY4,
ROW(
FIELD(COL1, DECIMAL(10, 3)),
FIELD(COL2, TIMESTAMP(3)),
FIELD(COL3, DATE()),
FIELD(COL4, TIME()))));
DynamicTableSink sink = createTableSink(schema, getAllOptions());
assertThat(sink).isInstanceOf(HBaseDynamicTableSink.class);
HBaseDynamicTableSink hbaseSink = (HBaseDynamicTableSink) sink;
HBaseTableSchema hbaseSchema = hbaseSink.getHBaseTableSchema();
assertThat(hbaseSchema.getRowKeyIndex()).isZero();
assertThat(hbaseSchema.getRowKeyDataType()).contains(STRING());
assertThat(hbaseSchema.getFamilyNames()).containsExactly("f1", "f2", "f3", "f4");
assertThat(hbaseSchema.getQualifierNames("f1")).containsExactly("c1", "c2");
assertThat(hbaseSchema.getQualifierNames("f2")).containsExactly("c1", "c3");
assertThat(hbaseSchema.getQualifierNames("f3")).containsExactly("c2", "c3");
assertThat(hbaseSchema.getQualifierNames("f4")).containsExactly("c1", "c2", "c3", "c4");
assertThat(hbaseSchema.getQualifierDataTypes("f1")).containsExactly(DOUBLE(), INT());
assertThat(hbaseSchema.getQualifierDataTypes("f2")).containsExactly(INT(), BIGINT());
assertThat(hbaseSchema.getQualifierDataTypes("f3")).containsExactly(BOOLEAN(), STRING());
assertThat(hbaseSchema.getQualifierDataTypes("f4"))
.containsExactly(DECIMAL(10, 3), TIMESTAMP(3), DATE(), TIME());
// verify hadoop Configuration
org.apache.hadoop.conf.Configuration expectedConfiguration = getHBaseConfiguration();
expectedConfiguration.set(HConstants.ZOOKEEPER_QUORUM, "localhost:2181");
expectedConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/flink");
expectedConfiguration.set("hbase.security.authentication", "kerberos");
org.apache.hadoop.conf.Configuration actualConfiguration = hbaseSink.getConfiguration();
assertThat(IteratorUtils.toList(actualConfiguration.iterator()))
.isEqualTo(IteratorUtils.toList(expectedConfiguration.iterator()));
// verify tableName
assertThat(hbaseSink.getTableName()).isEqualTo("testHBastTable");
HBaseWriteOptions expectedWriteOptions =
HBaseWriteOptions.builder()
.setBufferFlushMaxRows(1000)
.setBufferFlushIntervalMillis(1000)
.setBufferFlushMaxSizeInBytes(2 * 1024 * 1024)
.build();
HBaseWriteOptions actualWriteOptions = hbaseSink.getWriteOptions();
assertThat(actualWriteOptions).isEqualTo(expectedWriteOptions);
}
@Test
void testBufferFlushOptions() {
Map<String, String> options = getAllOptions();
options.put("sink.buffer-flush.max-size", "10mb");
options.put("sink.buffer-flush.max-rows", "100");
options.put("sink.buffer-flush.interval", "10s");
ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING()));
DynamicTableSink sink = createTableSink(schema, options);
HBaseWriteOptions expected =
HBaseWriteOptions.builder()
.setBufferFlushMaxRows(100)
.setBufferFlushIntervalMillis(10 * 1000)
.setBufferFlushMaxSizeInBytes(10 * 1024 * 1024)
.build();
HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
assertThat(actual).isEqualTo(expected);
}
@Test
void testSinkIgnoreNullValueOptions() {
Map<String, String> options = getAllOptions();
options.put("sink.ignore-null-value", "true");
ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING()));
DynamicTableSink sink = createTableSink(schema, options);
HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
assertThat(actual.isIgnoreNullValue()).isTrue();
}
@Test
void testParallelismOptions() {
Map<String, String> options = getAllOptions();
options.put("sink.parallelism", "2");
ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING()));
DynamicTableSink sink = createTableSink(schema, options);
assertThat(sink).isInstanceOf(HBaseDynamicTableSink.class);
HBaseDynamicTableSink hbaseSink = (HBaseDynamicTableSink) sink;
SinkFunctionProvider provider =
(SinkFunctionProvider)
hbaseSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
assertThat(provider.getParallelism()).contains(2);
}
@Test
void testDisabledBufferFlushOptions() {
Map<String, String> options = getAllOptions();
options.put("sink.buffer-flush.max-size", "0");
options.put("sink.buffer-flush.max-rows", "0");
options.put("sink.buffer-flush.interval", "0");
ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING()));
DynamicTableSink sink = createTableSink(schema, options);
HBaseWriteOptions expected =
HBaseWriteOptions.builder()
.setBufferFlushMaxRows(0)
.setBufferFlushIntervalMillis(0)
.setBufferFlushMaxSizeInBytes(0)
.build();
HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
assertThat(actual).isEqualTo(expected);
}
@Test
void testUnknownOption() {
Map<String, String> options = getAllOptions();
options.put("sink.unknown.key", "unknown-value");
ResolvedSchema schema =
ResolvedSchema.of(
Column.physical(ROWKEY, STRING()),
Column.physical(FAMILY1, ROW(FIELD(COL1, DOUBLE()), FIELD(COL2, INT()))));
assertThatThrownBy(() -> createTableSource(schema, options))
.satisfies(
FlinkAssertions.anyCauseMatches(
"Unsupported options:\n\nsink.unknown.key"));
assertThatThrownBy(() -> createTableSink(schema, options))
.satisfies(
FlinkAssertions.anyCauseMatches(
"Unsupported options:\n\nsink.unknown.key"));
}
@Test
void testTypeWithUnsupportedPrecision() {
Map<String, String> options = getAllOptions();
// test unsupported timestamp precision
ResolvedSchema wrongTs =
ResolvedSchema.of(
Column.physical(ROWKEY, STRING()),
Column.physical(
FAMILY1, ROW(FIELD(COL1, TIMESTAMP(6)), FIELD(COL2, INT()))));
assertThatThrownBy(() -> createTableSource(wrongTs, options))
.satisfies(
FlinkAssertions.anyCauseMatches(
"The precision 6 of TIMESTAMP type is out of the range [0, 3] supported by HBase connector"));
assertThatThrownBy(() -> createTableSink(wrongTs, options))
.satisfies(
FlinkAssertions.anyCauseMatches(
"The precision 6 of TIMESTAMP type is out of the range [0, 3] supported by HBase connector"));
// test unsupported time precision
ResolvedSchema wrongTimeSchema =
ResolvedSchema.of(
Column.physical(ROWKEY, STRING()),
Column.physical(FAMILY1, ROW(FIELD(COL1, TIME(6)), FIELD(COL2, INT()))));
assertThatThrownBy(() -> createTableSource(wrongTimeSchema, options))
.satisfies(
FlinkAssertions.anyCauseMatches(
"The precision 6 of TIME type is out of the range [0, 3] supported by HBase connector"));
assertThatThrownBy(() -> createTableSink(wrongTimeSchema, options))
.satisfies(
FlinkAssertions.anyCauseMatches(
"The precision 6 of TIME type is out of the range [0, 3] supported by HBase connector"));
}
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "hbase-1.4");
options.put("table-name", "testHBastTable");
options.put("zookeeper.quorum", "localhost:2181");
options.put("zookeeper.znode.parent", "/flink");
options.put("properties.hbase.security.authentication", "kerberos");
return options;
}
}