blob: 90b0746e98a94e07897a314fbeaaf3e71ae666b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.kudu.connector;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowConvertor;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.Row;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import org.testcontainers.shaded.com.google.common.io.Closer;
import org.testcontainers.shaded.com.google.common.net.HostAndPort;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KuduTestBase {
private static final String DOCKER_IMAGE = "apache/kudu:1.13.0";
private static final Integer KUDU_MASTER_PORT = 7051;
private static final Integer KUDU_TSERVER_PORT = 7050;
private static final Integer NUMBER_OF_REPLICA = 3;
private static final Object[][] booksTableData = {
{1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
{1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
{1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
{1004, "A Cup of Java", "Kumar", 44.44, 44},
{1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
private static GenericContainer<?> master;
private static List<GenericContainer<?>> tServers;
private static String masterAddress;
private static KuduClient kuduClient;
@BeforeAll
public static void beforeClass() throws Exception {
Network network = Network.newNetwork();
ImmutableList.Builder<GenericContainer<?>> tServersBuilder = ImmutableList.builder();
master = new GenericContainer<>(DOCKER_IMAGE)
.withExposedPorts(KUDU_MASTER_PORT, 8051)
.withCommand("master")
.withNetwork(network)
.withNetworkAliases("kudu-master");
master.start();
masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)).toString();
for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) {
String instanceName = "kudu-tserver-" + instance;
GenericContainer<?> tableServer = new GenericContainer<>(DOCKER_IMAGE)
.withExposedPorts(KUDU_TSERVER_PORT)
.withCommand("tserver")
.withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT)
.withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false " +
"--rpc_advertised_addresses=" + instanceName)
.withNetwork(network)
.withNetworkAliases(instanceName)
.dependsOn(master);
tableServer.start();
tServersBuilder.add(tableServer);
}
tServers = tServersBuilder.build();
System.out.println(HostAndPort.fromParts(master.getHost(), master.getMappedPort(8051)).toString());
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build();
}
@AfterAll
public static void afterClass() throws Exception {
kuduClient.close();
try (Closer closer = Closer.create()) {
closer.register(master::stop);
tServers.forEach(tabletServer -> closer.register(tabletServer::stop));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
if (createIfNotExist) {
ColumnSchemasFactory schemasFactory = () -> {
List<ColumnSchema> schemas = new ArrayList<>();
schemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
schemas.add(new ColumnSchema.ColumnSchemaBuilder("title", Type.STRING).build());
schemas.add(new ColumnSchema.ColumnSchemaBuilder("author", Type.STRING).build());
schemas.add(new ColumnSchema.ColumnSchemaBuilder("price", Type.DOUBLE).nullable(true).build());
schemas.add(new ColumnSchema.ColumnSchemaBuilder("quantity", Type.INT32).nullable(true).build());
return schemas;
};
tableInfo.createTableIfNotExists(
schemasFactory,
() -> new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(Lists.newArrayList("id"), 2));
}
return tableInfo;
}
public static List<Tuple5<Integer, String, String, Double, Integer>> booksDataTuple() {
return Arrays.stream(booksTableData)
.map(row -> {
Integer rowId = (Integer) row[0];
if (rowId % 2 == 1) {
Tuple5<Integer, String, String, Double, Integer> values =
Tuple5.of((Integer) row[0],
(String) row[1],
(String) row[2],
(Double) row[3],
(Integer) row[4]);
return values;
} else {
Tuple5<Integer, String, String, Double, Integer> values =
Tuple5.of((Integer) row[0],
(String) row[1],
(String) row[2],
null, null);
return values;
}
})
.collect(Collectors.toList());
}
public static TableSchema booksTableSchema(){
return TableSchema.builder()
.field("id", DataTypes.INT())
.field("title", DataTypes.STRING())
.field( "author", DataTypes.STRING())
.field("price", DataTypes.DOUBLE())
.field("quantity", DataTypes.INT())
.build();
}
public static List<RowData> booksRowData() {
return Arrays.stream(booksTableData)
.map(row -> {
Integer rowId = (Integer) row[0];
if (rowId % 2 == 1) {
GenericRowData values = new GenericRowData(5);
values.setField(0, row[0]);
values.setField(1, StringData.fromString(row[1].toString()));
values.setField(2, StringData.fromString(row[2].toString()));
values.setField(3, row[3]);
values.setField(4, row[4]);
return values;
} else {
GenericRowData values = new GenericRowData(5);
values.setField(0, row[0]);
values.setField(1, StringData.fromString(row[1].toString()));
values.setField(2, StringData.fromString(row[2].toString()));
return values;
}
})
.collect(Collectors.toList());
}
public static List<Row> booksDataRow() {
return Arrays.stream(booksTableData)
.map(row -> {
Integer rowId = (Integer) row[0];
if (rowId % 2 == 1) {
Row values = new Row(5);
values.setField(0, row[0]);
values.setField(1, row[1]);
values.setField(2, row[2]);
values.setField(3, row[3]);
values.setField(4, row[4]);
return values;
} else {
Row values = new Row(5);
values.setField(0, row[0]);
values.setField(1, row[1]);
values.setField(2, row[2]);
return values;
}
})
.collect(Collectors.toList());
}
public static List<BookInfo> booksDataPojo() {
return Arrays.stream(booksTableData).map(row -> new BookInfo(
(int) row[0],
(String) row[1],
(String) row[2],
(Double) row[3],
(int) row[4]))
.collect(Collectors.toList());
}
public String getMasterAddress() {
return masterAddress;
}
public KuduClient getClient() {
return kuduClient;
}
protected void setUpDatabase(KuduTableInfo tableInfo) {
try {
String masterAddresses = getMasterAddress();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns,
AbstractSingleOperationMapper.KuduOperation.INSERT));
booksDataRow().forEach(row -> {
try {
kuduWriter.write(row);
} catch (Exception e) {
e.printStackTrace();
}
});
kuduWriter.close();
} catch (Exception e) {
Assertions.fail();
}
}
protected void cleanDatabase(KuduTableInfo tableInfo) {
try {
String masterAddresses = getMasterAddress();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns,
AbstractSingleOperationMapper.KuduOperation.INSERT));
kuduWriter.deleteTable();
kuduWriter.close();
} catch (Exception e) {
Assertions.fail();
}
}
protected List<Row> readRows(KuduTableInfo tableInfo) throws Exception {
String masterAddresses = getMasterAddress();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
KuduReader<Row> reader = new KuduReader<>(tableInfo, readerConfig, new RowResultRowConvertor());
KuduInputSplit[] splits = reader.createInputSplits(1);
List<Row> rows = new ArrayList<>();
for (KuduInputSplit split : splits) {
KuduReaderIterator<Row> resultIterator = reader.scanner(split.getScanToken());
while (resultIterator.hasNext()) {
Row row = resultIterator.next();
if (row != null) {
rows.add(row);
}
}
}
reader.close();
return rows;
}
protected List<RowData> readRowDatas(KuduTableInfo tableInfo) throws Exception {
String masterAddresses = getMasterAddress();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
KuduReader<RowData> reader = new KuduReader<>(tableInfo, readerConfig, new RowResultRowDataConvertor());
KuduInputSplit[] splits = reader.createInputSplits(1);
List<RowData> rows = new ArrayList<>();
for (KuduInputSplit split : splits) {
KuduReaderIterator<RowData> resultIterator = reader.scanner(split.getScanToken());
while (resultIterator.hasNext()) {
RowData row = resultIterator.next();
if (row != null) {
rows.add(row);
}
}
}
reader.close();
return rows;
}
protected void kuduRowsTest(List<Row> rows) {
for (Row row : rows) {
Integer rowId = (Integer) row.getField(0);
if (rowId % 2 == 1) {
Assertions.assertNotEquals(null, row.getField(3));
Assertions.assertNotEquals(null, row.getField(4));
} else {
Assertions.assertNull(row.getField(3));
Assertions.assertNull(row.getField(4));
}
}
}
protected void validateSingleKey(String tableName) throws Exception {
KuduTable kuduTable = getClient().openTable(tableName);
Schema schema = kuduTable.getSchema();
assertEquals(1, schema.getPrimaryKeyColumnCount());
assertEquals(2, schema.getColumnCount());
assertTrue(schema.getColumn("first").isKey());
assertFalse(schema.getColumn("second").isKey());
KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
assertEquals(1, rows.size());
assertEquals("f", rows.get(0).getString("first"));
assertEquals("s", rows.get(0).getString("second"));
}
public static class BookInfo {
public int id, quantity;
public String title, author;
public Double price;
public BookInfo() {
}
public BookInfo(int id, String title, String author, Double price, int quantity) {
this.id = id;
this.title = title;
this.author = author;
this.price = price;
this.quantity = quantity;
}
}
}