blob: 8dd1a79c0fd99810484692f8d94a85ab7fbf266a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.kudu.table.dynamic;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Unit Tests for {@link KuduDynamicTableSink}.
*/
public class KuduDynamicSinkTest extends KuduTestBase {
public static final String INPUT_TABLE = "books";
public static StreamExecutionEnvironment env;
public static TableEnvironment tEnv;
@BeforeEach
public void init() {
KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true);
setUpDatabase(tableInfo);
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
}
@AfterEach
public void clean() {
KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true);
cleanDatabase(tableInfo);
}
@Test
public void testKuduSink() throws Exception {
String createSql = "CREATE TABLE "
+ INPUT_TABLE
+ "("
+ "id int,"
+ "title string,"
+ "author string,"
+ "price double,"
+ "quantity int"
+ ") WITH ("
+ " 'connector'='kudu',"
+ " 'kudu.masters'='"
+ 123245
+ "',"
+ " 'kudu.table'='"
+ INPUT_TABLE
+ "','kudu.primary-key-columns'='id"
+ "','kudu.max-buffer-size'='1024"
+ "','kudu.flush-interval'='1000"
+ "','kudu.operation-timeout'='500"
+ "','kudu.ignore-not-found'='true"
+ "','kudu.ignore-not-found'='true'"
+ ")";
tEnv.executeSql(createSql);
tEnv.executeSql("insert into " + INPUT_TABLE + " values(1006,'test title','test author',10.1,10)");
CloseableIterator<Row> collected =
tEnv.executeSql("select * from " + INPUT_TABLE + " where id =1006").collect();
assertNotNull(collected);
}
}