blob: 75e03fe97f5874b27b779ce8048317599475097b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.hbase2.source;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.util.HBaseTestBase;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.DOUBLE;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.assertj.core.api.Assertions.assertThat;
/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {
@Test
void testEval() throws Exception {
HBaseRowDataAsyncLookupFunction lookupFunction = buildRowDataAsyncLookupFunction();
lookupFunction.open(null);
final List<String> result = new ArrayList<>();
int[] rowkeys = {1, 2, 1, 12, 3, 12, 4, 3};
CountDownLatch latch = new CountDownLatch(rowkeys.length);
for (int rowkey : rowkeys) {
CompletableFuture<Collection<RowData>> future = new CompletableFuture<>();
lookupFunction.eval(future, rowkey);
future.whenComplete(
(rs, t) -> {
synchronized (result) {
if (rs.isEmpty()) {
result.add(rowkey + ": null");
} else {
rs.forEach(row -> result.add(rowkey + ": " + row.toString()));
}
}
latch.countDown();
});
}
// this verifies lookup calls are async
assertThat(result.size()).isLessThan(rowkeys.length);
latch.await();
lookupFunction.close();
Collections.sort(result);
assertThat(result)
.containsExactly(
"12: null",
"12: null",
"1: +I(1,+I(10),+I(Hello-1,100),+I(1.01,false,Welt-1))",
"1: +I(1,+I(10),+I(Hello-1,100),+I(1.01,false,Welt-1))",
"2: +I(2,+I(20),+I(Hello-2,200),+I(2.02,true,Welt-2))",
"3: +I(3,+I(30),+I(Hello-3,300),+I(3.03,false,Welt-3))",
"3: +I(3,+I(30),+I(Hello-3,300),+I(3.03,false,Welt-3))",
"4: +I(4,+I(40),+I(null,400),+I(4.04,true,Welt-4))");
}
private HBaseRowDataAsyncLookupFunction buildRowDataAsyncLookupFunction() {
DataType dataType =
ROW(
FIELD(ROW_KEY, INT()),
FIELD(FAMILY1, ROW(FIELD(F1COL1, INT()))),
FIELD(FAMILY2, ROW(FIELD(F2COL1, STRING()), FIELD(F2COL2, BIGINT()))),
FIELD(
FAMILY3,
ROW(
FIELD(F3COL1, DOUBLE()),
FIELD(F3COL2, DataTypes.BOOLEAN()),
FIELD(F3COL3, STRING()))));
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
return new HBaseRowDataAsyncLookupFunction(
getConf(),
TEST_TABLE_1,
hbaseSchema,
"null",
LookupOptions.MAX_RETRIES.defaultValue());
}
}