Kudu Connector rework (#78)
Kudu connector rework including the addition of a
connector to the Table API for it.
Co-authored-by: Gyula Fora <gyula@cloudera.com>
Co-authored-by: Balazs Varga <bvarga@cloudera.com>
diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 8692ca5..14c13eb 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -1,7 +1,11 @@
# Flink Kudu Connector
-This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the
-following dependency to your project:
+This connector provides a source (```KuduInputFormat```), a sink/output
+(```KuduSink``` and ```KuduOutputFormat```, respectively),
+ as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`),
+ to allow reading and writing to [Kudu](https://kudu.apache.org/).
+
+To use this connector, add the following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
@@ -9,104 +13,283 @@
<version>1.1-SNAPSHOT</version>
</dependency>
-*Version Compatibility*: This module is compatible with Apache Kudu *1.9.0* (last stable version).
+ *Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+.
Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
-See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html).
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).
## Installing Kudu
Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html).
-Optionally, you can use the docker images provided in dockers folder.
+Optionally, you can use the docker images provided in dockers folder.
-## KuduInputFormat
+## SQL and Table API
+
+The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section)
+we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API.
+
+For more information about the possible queries please check the [official documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/)
+
+### Kudu Catalog
+
+The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management.
+By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only
+allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as
+in-memory catalog or Hive catalog.
+
+When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file:
```
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-env.setParallelism(PARALLELISM);
-
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
- .create("books")
- .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
- .addColumn(KuduColumnInfo.Builder.createString("title").build())
- .addColumn(KuduColumnInfo.Builder.createString("author").build())
- .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
- .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
- .build();
-// create a reader configuration
-KuduReaderConfig readerConfig = KuduReaderConfig.Builder
- .setMasters("172.25.0.6")
- .setRowLimit(1000)
- .build();
-// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
-env.createInput(new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe()))
- .count();
-
-env.execute();
+catalogs:
+ - name: kudu
+ type: kudu
+ kudu.masters: <host>:7051
```
-## KuduOutputFormat
+Once the SQL CLI is started you can simply switch to the Kudu catalog by calling `USE CATALOG kudu;`
-```
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+You can also create and use the KuduCatalog directly in the Table environment:
-env.setParallelism(PARALLELISM);
-
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
- .create("books")
- .createIfNotExist(true)
- .replicas(3)
- .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
- .addColumn(KuduColumnInfo.Builder.createString("title").build())
- .addColumn(KuduColumnInfo.Builder.createString("author").build())
- .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
- .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
- .build();
-// create a writer configuration
-KuduWriterConfig writerConfig = KuduWriterConfig.Builder
- .setMasters("172.25.0.6")
- .setUpsertWrite()
- .setStrongConsistency()
- .build();
-...
-
-env.fromCollection(books)
- .output(new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe()));
-
-env.execute();
+```java
+String KUDU_MASTERS="host1:port1,host2:port2"
+KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
+tableEnv.registerCatalog("kudu", catalog);
+tableEnv.useCatalog("kudu");
```
-## KuduSink
+### DDL operations using SQL
+
+It is possible to manipulate Kudu tables using SQL DDL.
+
+When not using the Kudu catalog, the following additional properties must be specified in the `WITH` clause:
+* `'connector.type'='kudu'`
+* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of Kudu masters
+* `'kudu.table'='...'`: The table's name within the Kudu database.
+
+If you have registered and are using the Kudu catalog, these properties are handled automatically.
+
+To create a table, the additional properties `kudu.primary-key-columns` and `kudu.hash-columns` must be specified
+as comma-delimited lists. Optionally, you can set the `kudu.replicas` property (defaults to 1).
+Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use
+`catalog.createTable` as described in [this](#Creating-a-KuduTable-directly-with-KuduCatalog) section or create the table directly in Kudu.
+
+The `NOT NULL` constraint can be added to any of the column definitions.
+By setting a column as a primary key, it will automatically by created with the `NOT NULL` constraint.
+Hash columns must be a subset of primary key columns.
+
+Kudu Catalog
```
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setParallelism(PARALLELISM);
-
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
- .create("books")
- .createIfNotExist(true)
- .replicas(3)
- .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
- .addColumn(KuduColumnInfo.Builder.createString("title").build())
- .addColumn(KuduColumnInfo.Builder.createString("author").build())
- .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
- .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
- .build();
-// create a writer configuration
-KuduWriterConfig writerConfig = KuduWriterConfig.Builder
- .setMasters("172.25.0.6")
- .setUpsertWrite()
- .setStrongConsistency()
- .build();
-...
-
-env.fromCollection(books)
- .addSink(new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()));
-
-env.execute();
+CREATE TABLE TestTable (
+ first STRING,
+ second STRING,
+ third INT NOT NULL
+) WITH (
+ 'kudu.hash-columns' = 'first',
+ 'kudu.primary-key-columns' = 'first,second'
+)
```
+
+Other catalogs
+```
+CREATE TABLE TestTable (
+ first STRING,
+ second STRING,
+ third INT NOT NULL
+) WITH (
+ 'connector.type' = 'kudu',
+ 'kudu.masters' = '...',
+ 'kudu.table' = 'TestTable',
+ 'kudu.hash-columns' = 'first',
+ 'kudu.primary-key-columns' = 'first,second'
+)
+```
+
+Renaming a table:
+```
+ALTER TABLE TestTable RENAME TO TestTableRen
+```
+
+Dropping a table:
+```sql
+DROP TABLE TestTableRen
+```
+
+#### Creating a KuduTable directly with KuduCatalog
+
+The KuduCatalog also exposes a simple `createTable` method that required only the where table configuration,
+including schema, partitioning, replication, etc. can be specified using a `KuduTableInfo` object.
+
+Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory` and
+a `CreateTableOptionsFactory` parameter, that implement respectively `getColumnSchemas()`
+returning a list of Kudu [ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html) objects;
+ and `getCreateTableOptions()` returning a
+[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html) object.
+
+This example shows the creation of a table called `ExampleTable` with two columns,
+`first` being a primary key; and configuration of replicas and hash partitioning.
+
+```java
+KuduTableInfo tableInfo = KuduTableInfo
+ .forTable("ExampleTable")
+ .createTableIfNotExists(
+ () ->
+ Lists.newArrayList(
+ new ColumnSchema
+ .ColumnSchemaBuilder("first", Type.INT32)
+ .key(true)
+ .build(),
+ new ColumnSchema
+ .ColumnSchemaBuilder("second", Type.STRING)
+ .build()
+ ),
+ () -> new CreateTableOptions()
+ .setNumReplicas(1)
+ .addHashPartitions(Lists.newArrayList("first"), 2));
+
+catalog.createTable(tableInfo, false);
+```
+The example uses lambda expressions to implement the functional interfaces.
+
+Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/docs/schema_design.html).
+
+### Supported data types
+| Flink/SQL | Kudu |
+| ------------- |:-------------:|
+| STRING | STRING |
+| BOOLEAN | BOOL |
+| TINYINT | INT8 |
+| SMALLINT | INT16 |
+| INT | INT32 |
+| BIGINT | INT64 |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| BYTES | BINARY |
+| TIMESTAMP(3) | UNIXTIME_MICROS |
+
+Note:
+* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp`
+* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a `VARBINARY(2147483647)`
+* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)`
+* `DECIMAL` types are not yet supported
+
+### Known limitations
+* Data type limitations (see above).
+* SQL Create table: primary keys can only be set by the `kudu.primary-key-columns` property, using the
+`PRIMARY KEY` constraint is not yet possible.
+* SQL Create table: range partitioning is not supported.
+* When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns
+are described as being nullable, and not being primary keys.
+* Kudu tables cannot be altered through the catalog other than simple renaming
+
+## DataStream API
+
+It is also possible to use the the Kudu connector directly from the DataStream API however we
+encourage all users to explore the Table API as it provides a lot of useful tooling when working
+with Kudu data.
+
+### Reading tables into a DataStreams
+
+There are 2 main ways of reading a Kudu Table into a DataStream
+ 1. Using the `KuduCatalog` and the Table API
+ 2. Using the `KuduRowInputFormat` directly
+
+Using the `KuduCatalog` and Table API is the recommended way of reading tables as it automatically
+guarantees type safety and takes care of configuration of our readers.
+
+This is how it works in practice:
+```java
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableSettings);
+
+tableEnv.registerCatalog("kudu", new KuduCatalog("master:port"));
+tableEnv.useCatalog("kudu");
+
+Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable");
+DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class);
+```
+
+The second way of achieving the same thing is by using the `KuduRowInputFormat` directly.
+In this case we have to manually provide all information about our table:
+
+```java
+KuduTableInfo tableInfo = ...
+KuduReaderConfig readerConfig = ...
+KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo);
+
+DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo);
+```
+
+At the end of the day the `KuduTableSource` is just a convenient wrapper around the `KuduRowInputFormat`.
+
+### Kudu Sink
+The connector provides a `KuduSink` class that can be used to consume DataStreams
+and write the results into a Kudu table.
+
+The constructor takes 3 or 4 arguments.
+ * `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
+ * `KuduTableInfo` identifies the table to be written
+ * `KuduOperationMapper` maps the records coming from the DataStream to a list of Kudu operations.
+ * `KuduFailureHandler` (optional): If you want to provide your own logic for handling writing failures.
+
+The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record.
+It is assumed that a Kudu table with columns `col1, col2, col3` called `AlreadyExistingTable` exists. Note that if this were not the case,
+we could pass a `KuduTableInfo` as described in the [Catalog - Creating a table](#creating-a-table) section,
+and the sink would create the table with the provided configuration.
+
+```java
+KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(KUDU_MASTERS).build();
+
+KuduSink<Row> sink = new KuduSink<>(
+ writerConfig,
+ KuduTableInfo.forTable("AlreadyExistingTable"),
+ new RowOperationMapper<>(
+ new String[]{"col1", "col2", "col3"},
+ AbstractSingleOperationMapper.KuduOperation.UPSERT)
+)
+```
+
+#### KuduOperationMapper
+
+This section describes the Operation mapping logic in more detail.
+
+The connector supports insert, upsert, update, and delete operations.
+The operation to be performed can vary dynamically based on the record.
+To allow for more flexibility, it is also possible for one record to trigger
+0, 1, or more operations.
+For the highest level of control, implement the `KuduOperationMapper` interface.
+
+If one record from the DataStream corresponds to one table operation,
+extend the `AbstractSingleOperationMapper` class. An array of column
+names must be provided. This must match the Kudu table's schema.
+
+The `getField` method must be overridden, which extracts the value for the table column whose name is
+at the `i`th place in the `columnNames` array.
+If the operation is one of (`CREATE, UPSERT, UPDATE, DELETE`)
+and doesn't depend on the input record (constant during the life of the sink), it can be set in the constructor
+of `AbstractSingleOperationMapper`.
+It is also possible to implement your own logic by overriding the
+`createBaseOperation` method that returns a Kudu [Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html).
+
+There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks.
+* `PojoOperationMapper`: Each table column must correspond to a POJO field
+with the same name. The `columnNames` array should contain those fields of the POJO that
+are present as table columns (the POJO fields can be a superset of table columns).
+* `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on position. The
+`i`th field of the Row/Tuple corresponds to the column of the table at the `i`th
+position in the `columnNames` array.
+
+## Building the connector
+
+The connector can be easily built by using maven:
+
+```
+cd bahir-flink
+mvn clean install
+```
+
+### Running the tests
+
+The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost.
+
+This might not work out of the box on some operating systems (such as Mac OS X).
+To solve this problem go to *System Preferences/Sharing* and enable Remote login for your user.
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index bd42c55..fe7887c 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,8 +30,7 @@
<packaging>jar</packaging>
<properties>
- <kudu.version>1.10.0</kudu.version>
-
+ <kudu.version>1.11.1</kudu.version>
<mockito.version>1.10.19</mockito.version>
<junit.groups>!DockerTest</junit.groups>
</properties>
@@ -39,17 +38,32 @@
<dependencies>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!--test dependencies-->
<dependency>
<groupId>org.apache.kudu</groupId>
@@ -65,6 +79,7 @@
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
+
<!-- this is added because test cluster use @Rule from junit4 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
@@ -72,13 +87,28 @@
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
- </dependencies>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+</dependencies>
<build>
<extensions>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
index 9d7d017..4bf81fe 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
@@ -16,18 +16,19 @@
*/
package org.apache.flink.connectors.kudu.batch;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,11 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * Output format for writing data into a Kudu table (defined by the provided {@link KuduTableInfo}) in both batch
+ * and stream programs.
+ */
+@PublicEvolving
public class KuduOutputFormat<IN> extends RichOutputFormat<IN> implements CheckpointedFunction {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -42,19 +48,19 @@
private final KuduTableInfo tableInfo;
private final KuduWriterConfig writerConfig;
private final KuduFailureHandler failureHandler;
- private final KuduSerialization<IN> serializer;
+ private final KuduOperationMapper<IN> opsMapper;
private transient KuduWriter kuduWriter;
- public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer) {
- this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+ public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper) {
+ this(writerConfig, tableInfo, opsMapper, new DefaultKuduFailureHandler());
}
- public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer, KuduFailureHandler failureHandler) {
- this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
- this.writerConfig = checkNotNull(writerConfig,"config could not be null");
- this.serializer = checkNotNull(serializer,"serializer could not be null");
- this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+ public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper, KuduFailureHandler failureHandler) {
+ this.tableInfo = checkNotNull(tableInfo, "tableInfo could not be null");
+ this.writerConfig = checkNotNull(writerConfig, "config could not be null");
+ this.opsMapper = checkNotNull(opsMapper, "opsMapper could not be null");
+ this.failureHandler = checkNotNull(failureHandler, "failureHandler could not be null");
}
@Override
@@ -63,15 +69,12 @@
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
-
- serializer.withSchema(kuduWriter.getSchema());
+ kuduWriter = new KuduWriter(tableInfo, writerConfig, opsMapper, failureHandler);
}
@Override
public void writeRecord(IN row) throws IOException {
- final KuduRow kuduRow = serializer.serialize(row);
- kuduWriter.write(kuduRow);
+ kuduWriter.write(row);
}
@Override
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
similarity index 70%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
index 98877d8..dc17ed4 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
@@ -16,19 +16,20 @@
*/
package org.apache.flink.connectors.kudu.batch;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
-import org.apache.flink.connectors.kudu.connector.serde.KuduDeserialization;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+
import org.apache.kudu.client.KuduException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,13 +40,21 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
-public class KuduInputFormat<OUT> extends RichInputFormat<OUT, KuduInputSplit> {
+/**
+ * Input format for reading the contents of a Kudu table (defined by the provided {@link KuduTableInfo}) in both batch
+ * and stream programs. Rows of the Kudu table are mapped to {@link Row} instances that can converted to other data
+ * types by the user later if necessary.
+ *
+ * <p> For programmatic access to the schema of the input rows users can use the {@link org.apache.flink.connectors.kudu.table.KuduCatalog}
+ * or overwrite the column order manually by providing a list of projected column names.
+ */
+@PublicEvolving
+public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final KuduReaderConfig readerConfig;
private final KuduTableInfo tableInfo;
- private final KuduDeserialization<OUT> deserializer;
private List<KuduFilterInfo> tableFilters;
private List<String> tableProjections;
@@ -55,16 +64,20 @@
private transient KuduReader kuduReader;
private transient KuduReaderIterator resultIterator;
- public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer) {
- this(readerConfig, tableInfo, deserializer, new ArrayList<>(), new ArrayList<>());
+ public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo) {
+ this(readerConfig, tableInfo, new ArrayList<>(), null);
}
- public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
- this.readerConfig = checkNotNull(readerConfig,"readerConfig could not be null");
- this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
- this.deserializer = checkNotNull(deserializer,"deserializer could not be null");
- this.tableFilters = checkNotNull(tableFilters,"tableFilters could not be null");
- this.tableProjections = checkNotNull(tableProjections,"tableProjections could not be null");
+ public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<String> tableProjections) {
+ this(readerConfig, tableInfo, new ArrayList<>(), tableProjections);
+ }
+
+ public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+
+ this.readerConfig = checkNotNull(readerConfig, "readerConfig could not be null");
+ this.tableInfo = checkNotNull(tableInfo, "tableInfo could not be null");
+ this.tableFilters = checkNotNull(tableFilters, "tableFilters could not be null");
+ this.tableProjections = tableProjections;
this.endReached = false;
}
@@ -124,11 +137,10 @@
}
@Override
- public OUT nextRecord(OUT reuse) throws IOException {
+ public Row nextRecord(Row reuse) throws IOException {
// check that current iterator has next rows
if (this.resultIterator.hasNext()) {
- KuduRow row = resultIterator.next();
- return deserializer.deserialize(row);
+ return resultIterator.next();
} else {
endReached = true;
return null;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
new file mode 100644
index 0000000..b178308
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.ColumnSchema;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Factory for creating {@link ColumnSchema}s to be used when creating a table that
+ * does not currently exist in Kudu. Usable through {@link KuduTableInfo#createTableIfNotExists}.
+ *
+ * <p> This factory implementation must be Serializable as it will be used directly in the Flink sources
+ * and sinks.
+ */
+@PublicEvolving
+public interface ColumnSchemasFactory extends Serializable {
+
+ /**
+ * Creates the columns of the Kudu table that will be used during the createTable operation.
+ *
+ * @return List of columns.
+ */
+ List<ColumnSchema> getColumnSchemas();
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
new file mode 100644
index 0000000..4a475e9
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.client.CreateTableOptions;
+
+import java.io.Serializable;
+
+/**
+ * Factory for creating {@link CreateTableOptions} to be used when creating a table that
+ * does not currently exist in Kudu. Usable through {@link KuduTableInfo#createTableIfNotExists}.
+ *
+ * <p> This factory implementation must be Serializable as it will be used directly in the Flink sources
+ * and sinks.
+ */
+@PublicEvolving
+public interface CreateTableOptionsFactory extends Serializable {
+
+ /**
+ * Creates the {@link CreateTableOptions} that will be used during the createTable operation.
+ *
+ * @return CreateTableOptions for creating the table.
+ */
+ CreateTableOptions getCreateTableOptions();
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
deleted file mode 100644
index ff8a601..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Type;
-
-import java.io.Serializable;
-
-@PublicEvolving
-public class KuduColumnInfo implements Serializable {
-
- private String name;
- private Type type;
- private boolean key;
- private boolean rangeKey;
- private boolean hashKey;
- private boolean nullable;
- private Object defaultValue;
- private int blockSize;
- private Encoding encoding;
- private Compression compression;
-
- private KuduColumnInfo(String name, Type type) {
- this.name = name;
- this.type = type;
- this.blockSize = 0;
- this.key = false;
- this.rangeKey = false;
- this.hashKey = false;
- this.nullable = false;
- this.defaultValue = null;
- this.encoding = Encoding.AUTO;
- this.compression = Compression.DEFAULT;
- }
-
- protected String name() {
- return name;
- }
-
- protected boolean isRangeKey() {
- return rangeKey;
- }
-
- protected boolean isHashKey() {
- return hashKey;
- }
-
- protected ColumnSchema columnSchema() {
- return new ColumnSchema.ColumnSchemaBuilder(name, type)
- .key(key)
- .nullable(nullable)
- .defaultValue(defaultValue)
- .desiredBlockSize(blockSize)
- .encoding(encoding.encode)
- .compressionAlgorithm(compression.algorithm)
- .build();
- }
-
- public static class Builder {
- private KuduColumnInfo column;
-
- private Builder(String name, Type type) {
- this.column = new KuduColumnInfo(name, type);
- }
-
- public static Builder create(String name, Type type) {
- return new Builder(name, type);
- }
- public static Builder createByte(String name) {
- return create(name, Type.INT8);
- }
- public static Builder createShort(String name) {
- return create(name, Type.INT16);
- }
- public static Builder createInteger(String name) {
- return create(name, Type.INT32);
- }
- public static Builder createLong(String name) {
- return create(name, Type.INT64);
- }
- public static Builder createDouble(String name) {
- return create(name, Type.DOUBLE);
- }
- public static Builder createFloat(String name) {
- return create(name, Type.FLOAT);
- }
- public static Builder createString(String name) {
- return create(name, Type.STRING);
- }
- public static Builder createBool(String name) {
- return create(name, Type.BOOL);
- }
- public static Builder createByteArray(String name) {
- return create(name, Type.BINARY);
- }
- public static Builder createUnixTime(String name) {
- return create(name, Type.UNIXTIME_MICROS);
- }
-
- public Builder asKey() {
- return key(true);
- }
-
- public Builder key(boolean key) {
- this.column.key = key;
- return this;
- }
-
- public Builder asRangeKey() {
- return rangeKey(true);
- }
-
- public Builder rangeKey(boolean rangeKey) {
- this.column.rangeKey = rangeKey;
- return this;
- }
-
- public Builder asHashKey() {
- return hashKey(true);
- }
-
- public Builder hashKey(boolean hashKey) {
- this.column.hashKey = hashKey;
- return this;
- }
-
- public Builder asNullable() {
- return nullable(true);
- }
-
- public Builder asNotNullable() {
- return nullable(false);
- }
-
- public Builder nullable(boolean nullable) {
- this.column.nullable = nullable;
- return this;
- }
-
- public Builder defaultValue(Object defaultValue) {
- this.column.defaultValue = defaultValue;
- return this;
- }
-
- public Builder desiredBlockSize(int blockSize) {
- this.column.blockSize = blockSize;
- return this;
- }
-
- public Builder encoding(Encoding encoding) {
- this.column.encoding = encoding;
- return this;
- }
-
- public Builder compressionAlgorithm(Compression compression) {
- this.column.compression = compression;
- return this;
- }
-
- public KuduColumnInfo build() {
- return column;
- }
- }
-
- public enum Compression {
- UNKNOWN(ColumnSchema.CompressionAlgorithm.UNKNOWN),
- DEFAULT(ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION),
- WITHOUT(ColumnSchema.CompressionAlgorithm.NO_COMPRESSION),
- SNAPPY(ColumnSchema.CompressionAlgorithm.SNAPPY),
- LZ4(ColumnSchema.CompressionAlgorithm.LZ4),
- ZLIB(ColumnSchema.CompressionAlgorithm.ZLIB);
-
- final ColumnSchema.CompressionAlgorithm algorithm;
-
- Compression(ColumnSchema.CompressionAlgorithm algorithm) {
- this.algorithm = algorithm;
- }
- }
-
- public enum Encoding {
- UNKNOWN(ColumnSchema.Encoding.UNKNOWN),
- AUTO(ColumnSchema.Encoding.AUTO_ENCODING),
- PLAIN(ColumnSchema.Encoding.PLAIN_ENCODING),
- PREFIX(ColumnSchema.Encoding.PREFIX_ENCODING),
- GROUP_VARINT(ColumnSchema.Encoding.GROUP_VARINT),
- RLE(ColumnSchema.Encoding.RLE),
- DICT(ColumnSchema.Encoding.DICT_ENCODING),
- BIT_SHUFFLE(ColumnSchema.Encoding.BIT_SHUFFLE);
-
- final ColumnSchema.Encoding encode;
-
- Encoding(ColumnSchema.Encoding encode) {
- this.encode = encode;
- }
- }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index c7ae4a4..0a89cad 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -17,6 +17,7 @@
package org.apache.flink.connectors.kudu.connector;
import org.apache.flink.annotation.PublicEvolving;
+
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduPredicate;
@@ -36,6 +37,7 @@
public KuduPredicate toPredicate(Schema schema) {
return toPredicate(schema.getColumn(this.column));
}
+
public KuduPredicate toPredicate(ColumnSchema column) {
KuduPredicate predicate;
switch (this.type) {
@@ -63,35 +65,35 @@
switch (column.getType()) {
case STRING:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String) this.value);
break;
case FLOAT:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Float)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
break;
case INT8:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Byte)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
break;
case INT16:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Short)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
break;
case INT32:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Integer)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
break;
case INT64:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Long)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
break;
case DOUBLE:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Double)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
break;
case BOOL:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Boolean)this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
break;
case UNIXTIME_MICROS:
- Long time = (Long)this.value;
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, time*1000);
+ Long time = (Long) this.value;
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, time * 1000);
break;
case BINARY:
- predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[])this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[]) this.value);
break;
default:
throw new IllegalArgumentException("Illegal var type: " + column.getType());
@@ -99,6 +101,24 @@
return predicate;
}
+ public enum FilterType {
+ GREATER(KuduPredicate.ComparisonOp.GREATER),
+ GREATER_EQUAL(KuduPredicate.ComparisonOp.GREATER_EQUAL),
+ EQUAL(KuduPredicate.ComparisonOp.EQUAL),
+ LESS(KuduPredicate.ComparisonOp.LESS),
+ LESS_EQUAL(KuduPredicate.ComparisonOp.LESS_EQUAL),
+ IS_NOT_NULL(null),
+ IS_NULL(null),
+ IS_IN(null);
+
+ final KuduPredicate.ComparisonOp comparator;
+
+ FilterType(KuduPredicate.ComparisonOp comparator) {
+ this.comparator = comparator;
+ }
+
+ }
+
public static class Builder {
private KuduFilterInfo filter;
@@ -154,22 +174,4 @@
}
}
- public enum FilterType {
- GREATER(KuduPredicate.ComparisonOp.GREATER),
- GREATER_EQUAL(KuduPredicate.ComparisonOp.GREATER_EQUAL),
- EQUAL(KuduPredicate.ComparisonOp.EQUAL),
- LESS(KuduPredicate.ComparisonOp.LESS),
- LESS_EQUAL(KuduPredicate.ComparisonOp.LESS_EQUAL),
- IS_NOT_NULL(null),
- IS_NULL(null),
- IS_IN(null);
-
- final KuduPredicate.ComparisonOp comparator;
-
- FilterType(KuduPredicate.ComparisonOp comparator) {
- this.comparator = comparator;
- }
-
- }
-
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
deleted file mode 100644
index 78e6e6e..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.types.Row;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.stream.Stream;
-
-@PublicEvolving
-public class KuduRow extends Row {
-
- private Map<String, Integer> rowNames;
-
- public KuduRow(Integer arity) {
- super(arity);
- rowNames = new LinkedHashMap<>();
- }
-
- public Object getField(String name) {
- return super.getField(rowNames.get(name));
- }
-
- public boolean hasField(String name) {
- return rowNames.get(name) != null;
- }
-
- public void setField(int pos, String name, Object value) {
- super.setField(pos, value);
- this.rowNames.put(name, pos);
- }
-
- public boolean isNull(String name) {
- return isNull(rowNames.get(name));
- }
-
- public boolean isNull(int pos) {
- return getField(pos) == null;
- }
-
- private static int validFields(Object object) {
- Long validField = 0L;
- for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
- validField += basicValidation(c.getDeclaredFields()).count();
- }
- return validField.intValue();
- }
-
- private static Stream<Field> basicValidation(Field[] fields) {
- return Arrays.stream(fields)
- .filter(cField -> !Modifier.isStatic(cField.getModifiers()))
- .filter(cField -> !Modifier.isTransient(cField.getModifiers()));
- }
-
- public Map<String,Object> blindMap() {
- Map<String,Object> toRet = new LinkedHashMap<>();
- rowNames.entrySet().stream()
- .sorted(Comparator.comparing(Map.Entry::getValue))
- .forEach(entry -> toRet.put(entry.getKey(), super.getField(entry.getValue())));
- return toRet;
- }
-
- @Override
- public String toString() {
- return blindMap().toString();
- }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index eb63b3f..83c7dde 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -17,119 +17,90 @@
package org.apache.flink.connectors.kudu.connector;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.kudu.ColumnSchema;
+
+import org.apache.commons.lang3.Validate;
import org.apache.kudu.Schema;
import org.apache.kudu.client.CreateTableOptions;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+/**
+ * Describes which table should be used in sources and sinks along with specifications
+ * on how to create it if it does not exist.
+ *
+ * <p> For sources and sinks reading from already existing tables, simply use @{@link KuduTableInfo#forTable(String)}
+ * and if you want the system to create the table if it does not exist you need to specify the column and options
+ * factories through {@link KuduTableInfo#createTableIfNotExists}
+ */
@PublicEvolving
public class KuduTableInfo implements Serializable {
- private static final Integer DEFAULT_REPLICAS = 1;
- private static final boolean DEFAULT_CREATE_IF_NOT_EXIST = false;
-
- private Integer replicas;
private String name;
- private boolean createIfNotExist;
- private List<KuduColumnInfo> columns;
+ private CreateTableOptionsFactory createTableOptionsFactory = null;
+ private ColumnSchemasFactory schemasFactory = null;
- private KuduTableInfo(String name){
- this.name = name;
- this.replicas = DEFAULT_REPLICAS;
- this.createIfNotExist = DEFAULT_CREATE_IF_NOT_EXIST;
- this.columns = new ArrayList<>();
+ private KuduTableInfo(String name) {
+ this.name = Validate.notNull(name);
}
+ /**
+ * Creates a new {@link KuduTableInfo} that is sufficient for reading/writing to existing Kudu Tables.
+ * For creating new tables call {@link #createTableIfNotExists} afterwards.
+ *
+ * @param name Table name in Kudu
+ * @return KuduTableInfo for the given table name
+ */
+ public static KuduTableInfo forTable(String name) {
+ return new KuduTableInfo(name);
+ }
+
+ /**
+ * Defines table parameters to be used when creating the Kudu table if it does not exist (read or write)
+ *
+ * @param schemasFactory factory for defining columns
+ * @param createTableOptionsFactory factory for defining create table options
+ * @return KuduTableInfo that will create tables that does not exist with the given settings.
+ */
+ public KuduTableInfo createTableIfNotExists(ColumnSchemasFactory schemasFactory, CreateTableOptionsFactory createTableOptionsFactory) {
+ this.createTableOptionsFactory = Validate.notNull(createTableOptionsFactory);
+ this.schemasFactory = Validate.notNull(schemasFactory);
+ return this;
+ }
+
+ /**
+ * Returns the {@link Schema} of the table. Only works if {@link #createTableIfNotExists} was specified otherwise throws an error.
+ *
+ * @return Schema of the target table.
+ */
+ public Schema getSchema() {
+ if (!getCreateTableIfNotExists()) {
+ throw new RuntimeException("Cannot access schema for KuduTableInfo. Use createTableIfNotExists to specify the columns.");
+ }
+
+ return new Schema(schemasFactory.getColumnSchemas());
+ }
+
+ /**
+ * @return Name of the table.
+ */
public String getName() {
return name;
}
- public Schema getSchema() {
- if(hasNotColumns()) return null;
- List<ColumnSchema> schemaColumns = new ArrayList<>();
- for(KuduColumnInfo column : columns){
- schemaColumns.add(column.columnSchema());
- }
- return new Schema(schemaColumns);
+ /**
+ * @return True if table creation is enabled if target table does not exist.
+ */
+ public boolean getCreateTableIfNotExists() {
+ return createTableOptionsFactory != null;
}
- public boolean createIfNotExist() {
- return createIfNotExist;
- }
-
+ /**
+ * @return CreateTableOptions if {@link #createTableIfNotExists} was specified.
+ */
public CreateTableOptions getCreateTableOptions() {
- CreateTableOptions options = new CreateTableOptions();
- if(replicas!=null){
- options.setNumReplicas(replicas);
+ if (!getCreateTableIfNotExists()) {
+ throw new RuntimeException("Cannot access CreateTableOptions for KuduTableInfo. Use createTableIfNotExists to specify.");
}
- if(hasColummns()) {
- List<String> rangeKeys = new ArrayList<>();
- List<String> hashKeys = new ArrayList<>();
- for(KuduColumnInfo column : columns){
- if(column.isRangeKey()){
- rangeKeys.add(column.name());
- }
- if(column.isHashKey()){
- hashKeys.add(column.name());
- }
- }
- options.setRangePartitionColumns(rangeKeys);
- options.addHashPartitions(hashKeys, replicas*2);
- }
-
- return options;
- }
-
- public boolean hasNotColumns(){
- return !hasColummns();
- }
- public boolean hasColummns(){
- return (columns!=null && columns.size()>0);
- }
-
- public static class Builder {
- KuduTableInfo table;
-
- private Builder(String name) {
- table = new KuduTableInfo(name);
- }
-
- public static Builder create(String name) {
- return new Builder(name);
- }
-
- public static Builder open(String name) {
- return new Builder(name);
- }
-
- public Builder createIfNotExist(boolean createIfNotExist) {
- this.table.createIfNotExist = createIfNotExist;
- return this;
- }
-
- public Builder replicas(int replicas) {
- if (replicas == 0) return this;
- this.table.replicas = replicas;
- return this;
- }
-
- public Builder columns(List<KuduColumnInfo> columns) {
- if(columns==null) return this;
- this.table.columns.addAll(columns);
- return this;
- }
-
- public Builder addColumn(KuduColumnInfo column) {
- if(column==null) return this;
- this.table.columns.add(column);
- return this;
- }
-
- public KuduTableInfo build() {
- return table;
- }
+ return createTableOptionsFactory.getCreateTableOptions();
}
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
index 7548033..5f45137 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
@@ -22,12 +22,18 @@
import java.util.List;
import java.util.stream.Collectors;
+/**
+ * Default failure handling logic that doesn't do any handling but throws
+ * an error.
+ */
public class DefaultKuduFailureHandler implements KuduFailureHandler {
+
@Override
public void onFailure(List<RowError> failure) throws IOException {
String errors = failure.stream()
.map(error -> error.toString() + System.lineSeparator())
.collect(Collectors.joining());
+
throw new IOException("Error while sending value. \n " + errors);
}
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
index 42de4f7..3c8954f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
@@ -17,12 +17,17 @@
package org.apache.flink.connectors.kudu.connector.failure;
import org.apache.flink.annotation.PublicEvolving;
+
import org.apache.kudu.client.RowError;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+/**
+ * Custom handling logic for errors resulting from trying to execute Kudu operations in the
+ * {@link org.apache.flink.connectors.kudu.connector.writer.KuduWriter}
+ */
@PublicEvolving
public interface KuduFailureHandler extends Serializable {
@@ -34,4 +39,13 @@
*/
void onFailure(List<RowError> failure) throws IOException;
+ /**
+ * Handle a ClassCastException. Default implementation rethrows the exception.
+ *
+ * @param e the cause of failure
+ * @throws IOException if the casting failed
+ */
+ default void onTypeMismatch(ClassCastException e) throws IOException {
+ throw new IOException("Class casting failed \n", e);
+ }
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
index a809106..5d85d6c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
@@ -16,16 +16,19 @@
*/
package org.apache.flink.connectors.kudu.connector.reader;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.LocatableInputSplit;
+@Internal
public class KuduInputSplit extends LocatableInputSplit {
private byte[] scanToken;
/**
* Creates a new KuduInputSplit
+ *
* @param splitNumber the number of the input split
- * @param hostnames The names of the hosts storing the data this input split refers to.
+ * @param hostnames The names of the hosts storing the data this input split refers to.
*/
public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
super(splitNumber, hostnames);
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 9c6e790..51ab748 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -16,11 +16,16 @@
*/
package org.apache.flink.connectors.kudu.connector.reader;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.kudu.client.*;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,13 +47,15 @@
private transient KuduSession session;
private transient KuduTable table;
- public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig) throws IOException {
- this(tableInfo, readerConfig, new ArrayList<>(), new ArrayList<>());
+ public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig) throws IOException {
+ this(tableInfo, readerConfig, new ArrayList<>(), null);
}
- public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters) throws IOException {
- this(tableInfo, readerConfig, tableFilters, new ArrayList<>());
+
+ public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters) throws IOException {
+ this(tableInfo, readerConfig, tableFilters, null);
}
- public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections) throws IOException {
+
+ public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections) throws IOException {
this.tableInfo = tableInfo;
this.readerConfig = readerConfig;
this.tableFilters = tableFilters;
@@ -72,7 +79,7 @@
if (client.tableExists(tableName)) {
return client.openTable(tableName);
}
- if (tableInfo.createIfNotExist()) {
+ if (tableInfo.getCreateTableIfNotExists()) {
return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
}
throw new UnsupportedOperationException("table not exists and is marketed to not be created");
@@ -85,7 +92,7 @@
public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) {
KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
- if (CollectionUtils.isNotEmpty(tableProjections)) {
+ if (tableProjections != null) {
tokenBuilder.setProjectedColumnNames(tableProjections);
}
@@ -95,7 +102,7 @@
.forEach(tokenBuilder::addPredicate);
}
- if (rowLimit !=null && rowLimit > 0) {
+ if (rowLimit != null && rowLimit > 0) {
tokenBuilder.limit(rowLimit);
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
index 6f5f079..96fde1b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -16,32 +16,36 @@
*/
package org.apache.flink.connectors.kudu.connector.reader;
-import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * Configuration used by {@link org.apache.flink.connectors.kudu.batch.KuduRowInputFormat}. Specifies connection and other necessary properties.
+ */
@PublicEvolving
public class KuduReaderConfig implements Serializable {
private final String masters;
- private final Integer rowLimit;
+ private final int rowLimit;
private KuduReaderConfig(
String masters,
- Integer rowLimit) {
+ int rowLimit) {
this.masters = checkNotNull(masters, "Kudu masters cannot be null");
- this.rowLimit = checkNotNull(rowLimit, "Kudu rowLimit cannot be null");;
+ this.rowLimit = checkNotNull(rowLimit, "Kudu rowLimit cannot be null");
}
public String getMasters() {
return masters;
}
- public Integer getRowLimit() {
+ public int getRowLimit() {
return rowLimit;
}
@@ -57,20 +61,26 @@
* Builder for the {@link KuduReaderConfig}.
*/
public static class Builder {
- private String masters;
- private Integer rowLimit = 0;
+ private static final int DEFAULT_ROW_LIMIT = 0;
+
+ private final String masters;
+ private final int rowLimit;
private Builder(String masters) {
+ this(masters, DEFAULT_ROW_LIMIT);
+ }
+
+ private Builder(String masters, Integer rowLimit) {
this.masters = masters;
+ this.rowLimit = rowLimit;
}
public static Builder setMasters(String masters) {
return new Builder(masters);
}
- public Builder setRowLimit(Integer rowLimit) {
- this.rowLimit = rowLimit;
- return this;
+ public Builder setRowLimit(int rowLimit) {
+ return new Builder(masters, rowLimit);
}
public KuduReaderConfig build() {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
index 4a8e69c..1ea4690 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
@@ -17,9 +17,9 @@
package org.apache.flink.connectors.kudu.connector.reader;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.types.Row;
+
import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.RowResult;
@@ -51,61 +51,23 @@
}
}
- public KuduRow next() {
+ public Row next() {
RowResult row = this.rowIterator.next();
- return toKuduRow(row);
+ return toFlinkRow(row);
}
private void nextRows() throws KuduException {
this.rowIterator = scanner.nextRows();
}
- private KuduRow toKuduRow(RowResult row) {
+ private Row toFlinkRow(RowResult row) {
Schema schema = row.getColumnProjection();
- KuduRow values = new KuduRow(schema.getColumnCount());
+ Row values = new Row(schema.getColumnCount());
schema.getColumns().forEach(column -> {
String name = column.getName();
int pos = schema.getColumnIndex(name);
- if(row.isNull(name)) {
- values.setField(pos, name, null);
- } else {
- Type type = column.getType();
- switch (type) {
- case BINARY:
- values.setField(pos, name, row.getBinary(name));
- break;
- case STRING:
- values.setField(pos, name, row.getString(name));
- break;
- case BOOL:
- values.setField(pos, name, row.getBoolean(name));
- break;
- case DOUBLE:
- values.setField(pos, name, row.getDouble(name));
- break;
- case FLOAT:
- values.setField(pos, name, row.getFloat(name));
- break;
- case INT8:
- values.setField(pos, name, row.getByte(name));
- break;
- case INT16:
- values.setField(pos, name, row.getShort(name));
- break;
- case INT32:
- values.setField(pos, name, row.getInt(name));
- break;
- case INT64:
- values.setField(pos, name, row.getLong(name));
- break;
- case UNIXTIME_MICROS:
- values.setField(pos, name, row.getLong(name) / 1000);
- break;
- default:
- throw new IllegalArgumentException("Illegal var type: " + type);
- }
- }
+ values.setField(pos, row.getObject(name));
});
return values;
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
deleted file mode 100644
index 36584b5..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.kudu.Schema;
-
-public class DefaultSerDe implements KuduSerialization<KuduRow>, KuduDeserialization<KuduRow> {
-
- @Override
- public KuduRow deserialize(KuduRow row) {
- return row;
- }
-
- @Override
- public KuduRow serialize(KuduRow value) {
- return value;
- }
-
- @Override
- public DefaultSerDe withSchema(Schema schema) {
- return this;
- }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
deleted file mode 100644
index 190c4c7..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-
-import java.io.Serializable;
-
-public interface KuduDeserialization<T> extends Serializable {
- T deserialize(KuduRow row);
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
deleted file mode 100644
index b13c59b..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.kudu.Schema;
-
-import java.io.Serializable;
-
-public interface KuduSerialization<T> extends Serializable {
- KuduRow serialize(T value);
-
- KuduSerialization<T> withSchema(Schema schema);
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
deleted file mode 100644
index bc57174..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.kudu.Schema;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.stream.Stream;
-
-public class PojoSerDe<P> implements KuduSerialization<P>, KuduDeserialization<P> {
-
-
- private Class<P> clazz;
-
- public transient KuduTableInfo tableInfo;
- public transient Schema schema;
-
-
- public PojoSerDe(Class<P> clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public PojoSerDe<P> withSchema(Schema schema) {
- this.schema = schema;
- return this;
- }
-
- @Override
- public KuduRow serialize(P object) {
- return mapTo(object);
- }
-
- private KuduRow mapTo(P object) {
- if (schema == null) throw new IllegalArgumentException("schema must be set to serialize");
-
- KuduRow row = new KuduRow(schema.getRowSize());
-
- for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
- basicValidation(c.getDeclaredFields())
- .forEach(cField -> {
- try {
- cField.setAccessible(true);
- row.setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object));
- } catch (IllegalAccessException e) {
- String error = String.format("Cannot get value for %s", cField.getName());
- throw new IllegalArgumentException(error, e);
- }
- });
- }
-
- return row;
- }
-
- private Stream<Field> basicValidation(Field[] fields) {
- return Arrays.stream(fields)
- .filter(field -> schemaHasColumn(field.getName()))
- .filter(field -> !Modifier.isStatic(field.getModifiers()))
- .filter(field -> !Modifier.isTransient(field.getModifiers()));
- }
-
- private boolean schemaHasColumn(String field) {
- return schema.getColumns().stream().anyMatch(col -> StringUtils.equalsIgnoreCase(col.getName(),field));
- }
-
- @Override
- public P deserialize(KuduRow row) {
- return mapFrom(row);
- }
-
- private P mapFrom(KuduRow row) {
- P o = createInstance(clazz);
-
- for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
- Field[] fields = c.getDeclaredFields();
-
- basicValidation(fields)
- .forEach(cField -> {
- try {
- cField.setAccessible(true);
- Object value = row.getField(cField.getName());
- if (value != null) {
- if (cField.getType() == value.getClass()) {
- cField.set(o, value);
- } else if (cField.getType() == Long.class && value.getClass() == Date.class) {
- cField.set(o, ((Date) value).getTime());
- } else {
- cField.set(o, value);
- }
- }
- } catch (IllegalAccessException e) {
- String error = String.format("Cannot get value for %s", cField.getName());
- throw new IllegalArgumentException(error, e);
- }
- });
- }
-
- return o;
-
- }
-
- private P createInstance(Class<P> clazz) {
- try {
- Constructor<P> constructor = clazz.getDeclaredConstructor();
- constructor.setAccessible(true);
- return constructor.newInstance();
- } catch (ReflectiveOperationException e) {
- String error = String.format("Cannot create instance for %s", clazz.getSimpleName());
- throw new IllegalArgumentException(error, e);
- }
- }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
new file mode 100644
index 0000000..d9f8219
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Base implementation for {@link KuduOperationMapper}s that have one-to-one input to
+ * Kudu operation mapping. It requires a fixed table schema to be provided at construction
+ * time and only requires users to implement a getter for a specific column index (relative
+ * to the ones provided in the constructor).
+ * <br>
+ * Supports both fixed operation type per record by specifying the {@link KuduOperation} or a
+ * custom implementation for creating the base {@link Operation} throwugh the
+ * {@link #createBaseOperation(Object, KuduTable)} method.
+ *
+ * @param <T> Input type
+ */
+@PublicEvolving
+public abstract class AbstractSingleOperationMapper<T> implements KuduOperationMapper<T> {
+
+ protected final String[] columnNames;
+ private final KuduOperation operation;
+
+ protected AbstractSingleOperationMapper(String[] columnNames) {
+ this(columnNames, null);
+ }
+
+ public AbstractSingleOperationMapper(String[] columnNames, KuduOperation operation) {
+ this.columnNames = columnNames;
+ this.operation = operation;
+ }
+
+ /**
+ * Returns the object corresponding to the given column index.
+ *
+ * @param input Input element
+ * @param i Column index
+ * @return Column value
+ */
+ public abstract Object getField(T input, int i);
+
+ public Optional<Operation> createBaseOperation(T input, KuduTable table) {
+ if (operation == null) {
+ throw new UnsupportedOperationException("createBaseOperation must be overridden if no operation specified in constructor");
+ }
+ switch (operation) {
+ case INSERT:
+ return Optional.of(table.newInsert());
+ case UPDATE:
+ return Optional.of(table.newUpdate());
+ case UPSERT:
+ return Optional.of(table.newUpsert());
+ case DELETE:
+ return Optional.of(table.newDelete());
+ default:
+ throw new RuntimeException("Unknown operation " + operation);
+ }
+ }
+
+ @Override
+ public List<Operation> createOperations(T input, KuduTable table) {
+ Optional<Operation> operationOpt = createBaseOperation(input, table);
+ if (!operationOpt.isPresent()) {
+ return Collections.emptyList();
+ }
+
+ Operation operation = operationOpt.get();
+ PartialRow partialRow = operation.getRow();
+
+ for (int i = 0; i < columnNames.length; i++) {
+ partialRow.addObject(columnNames[i], getField(input, i));
+ }
+
+ return Collections.singletonList(operation);
+ }
+
+ public enum KuduOperation {
+ INSERT,
+ UPDATE,
+ UPSERT,
+ DELETE
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
new file mode 100644
index 0000000..4878ab3
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates the logic of mapping input records (of a DataStream) to operations
+ * executed in Kudu. By allowing to return a list of operations we give flexibility
+ * to the implementers to provide more sophisticated logic.
+ *
+ * @param <T> Type of the input data
+ */
+@PublicEvolving
+public interface KuduOperationMapper<T> extends Serializable {
+
+ /**
+ * Create a list of operations to be executed by the {@link KuduWriter} for the
+ * current input
+ *
+ * @param input input element
+ * @param table table for which the operations should be created
+ * @return List of operations to be executed on the table
+ */
+ List<Operation> createOperations(T input, KuduTable table);
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 57c0741..7233478 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -18,13 +18,17 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.*;
+
+import org.apache.kudu.client.DeleteTableResponse;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.RowError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,23 +37,24 @@
import java.util.List;
@Internal
-public class KuduWriter implements AutoCloseable {
+public class KuduWriter<T> implements AutoCloseable {
private final Logger log = LoggerFactory.getLogger(getClass());
private final KuduTableInfo tableInfo;
private final KuduWriterConfig writerConfig;
private final KuduFailureHandler failureHandler;
+ private final KuduOperationMapper<T> operationMapper;
private transient KuduClient client;
private transient KuduSession session;
private transient KuduTable table;
-
- public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig) throws IOException {
- this (tableInfo, writerConfig, new DefaultKuduFailureHandler());
+ public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduOperationMapper<T> operationMapper) throws IOException {
+ this(tableInfo, writerConfig, operationMapper, new DefaultKuduFailureHandler());
}
- public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduFailureHandler failureHandler) throws IOException {
+
+ public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduOperationMapper<T> operationMapper, KuduFailureHandler failureHandler) throws IOException {
this.tableInfo = tableInfo;
this.writerConfig = writerConfig;
this.failureHandler = failureHandler;
@@ -57,6 +62,7 @@
this.client = obtainClient();
this.session = obtainSession();
this.table = obtainTable();
+ this.operationMapper = operationMapper;
}
private KuduClient obtainClient() {
@@ -74,23 +80,18 @@
if (client.tableExists(tableName)) {
return client.openTable(tableName);
}
- if (tableInfo.createIfNotExist()) {
+ if (tableInfo.getCreateTableIfNotExists()) {
return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
}
throw new UnsupportedOperationException("table not exists and is marketed to not be created");
}
- public Schema getSchema() {
- return table.getSchema();
- }
-
- public void write(KuduRow row) throws IOException {
+ public void write(T input) throws IOException {
checkAsyncErrors();
- final Operation operation = mapToOperation(row);
- final OperationResponse response = session.apply(operation);
-
- checkErrors(response);
+ for (Operation operation : operationMapper.createOperations(input, table)) {
+ checkErrors(session.apply(operation));
+ }
}
public void flushAndCheckErrors() throws IOException {
@@ -140,73 +141,9 @@
}
private void checkAsyncErrors() throws IOException {
- if (session.countPendingErrors() == 0) return;
+ if (session.countPendingErrors() == 0) { return; }
List<RowError> errors = Arrays.asList(session.getPendingErrors().getRowErrors());
failureHandler.onFailure(errors);
}
-
- private Operation mapToOperation(KuduRow row) {
- final Operation operation = obtainOperation();
- final PartialRow partialRow = operation.getRow();
-
- table.getSchema().getColumns().forEach(column -> {
- String columnName = column.getName();
- if (!row.hasField(columnName)) {
- return;
- }
- Object value = row.getField(columnName);
-
- if (value == null) {
- partialRow.setNull(columnName);
- } else {
- Type type = column.getType();
- switch (type) {
- case STRING:
- partialRow.addString(columnName, (String) value);
- break;
- case FLOAT:
- partialRow.addFloat(columnName, (Float) value);
- break;
- case INT8:
- partialRow.addByte(columnName, (Byte) value);
- break;
- case INT16:
- partialRow.addShort(columnName, (Short) value);
- break;
- case INT32:
- partialRow.addInt(columnName, (Integer) value);
- break;
- case INT64:
- partialRow.addLong(columnName, (Long) value);
- break;
- case DOUBLE:
- partialRow.addDouble(columnName, (Double) value);
- break;
- case BOOL:
- partialRow.addBoolean(columnName, (Boolean) value);
- break;
- case UNIXTIME_MICROS:
- //*1000 to correctly create date on kudu
- partialRow.addLong(columnName, ((Long) value) * 1000);
- break;
- case BINARY:
- partialRow.addBinary(columnName, (byte[]) value);
- break;
- default:
- throw new IllegalArgumentException("Illegal var type: " + type);
- }
- }
- });
- return operation;
- }
-
- private Operation obtainOperation() {
- switch (writerConfig.getWriteMode()) {
- case INSERT: return table.newInsert();
- case UPDATE: return table.newUpdate();
- case UPSERT: return table.newUpsert();
- }
- return table.newUpsert();
- }
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 13672d5..598f8d0 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -16,39 +16,37 @@
*/
package org.apache.flink.connectors.kudu.connector.writer;
-import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.kudu.client.SessionConfiguration.FlushMode;
+/**
+ * Configuration used by {@link org.apache.flink.connectors.kudu.streaming.KuduSink} and {@link org.apache.flink.connectors.kudu.batch.KuduOutputFormat}.
+ * Specifies connection and other necessary properties.
+ */
@PublicEvolving
public class KuduWriterConfig implements Serializable {
private final String masters;
private final FlushMode flushMode;
- private final KuduWriterMode writeMode;
private KuduWriterConfig(
String masters,
- FlushMode flushMode,
- KuduWriterMode writeMode) {
+ FlushMode flushMode) {
this.masters = checkNotNull(masters, "Kudu masters cannot be null");
this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null");
- this.writeMode = checkNotNull(writeMode, "Kudu write mode cannot be null");
}
public String getMasters() {
return masters;
}
- public KuduWriterMode getWriteMode() {
- return writeMode;
- }
-
public FlushMode getFlushMode() {
return flushMode;
}
@@ -58,7 +56,6 @@
return new ToStringBuilder(this)
.append("masters", masters)
.append("flushMode", flushMode)
- .append("writeMode", writeMode)
.toString();
}
@@ -67,7 +64,6 @@
*/
public static class Builder {
private String masters;
- private KuduWriterMode writeMode = KuduWriterMode.UPSERT;
private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
private Builder(String masters) {
@@ -78,27 +74,15 @@
return new Builder(masters);
}
- public Builder setWriteMode(KuduWriterMode writeMode) {
- this.writeMode = writeMode;
- return this;
- }
- public Builder setUpsertWrite() {
- return setWriteMode(KuduWriterMode.UPSERT);
- }
- public Builder setInsertWrite() {
- return setWriteMode(KuduWriterMode.INSERT);
- }
- public Builder setUpdateWrite() {
- return setWriteMode(KuduWriterMode.UPDATE);
- }
-
public Builder setConsistency(FlushMode flushMode) {
this.flushMode = flushMode;
return this;
}
+
public Builder setEventualConsistency() {
return setConsistency(FlushMode.AUTO_FLUSH_BACKGROUND);
}
+
public Builder setStrongConsistency() {
return setConsistency(FlushMode.AUTO_FLUSH_SYNC);
}
@@ -106,8 +90,7 @@
public KuduWriterConfig build() {
return new KuduWriterConfig(
masters,
- flushMode,
- writeMode);
+ flushMode);
}
}
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
new file mode 100644
index 0000000..db44eec
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@PublicEvolving
+public class PojoOperationMapper<T> extends AbstractSingleOperationMapper<T> {
+
+ private final Field[] fields;
+
+ protected PojoOperationMapper(Class<T> pojoClass, String[] columnNames) { this(pojoClass, columnNames, null); }
+
+ public PojoOperationMapper(Class<T> pojoClass, String[] columnNames, KuduOperation operation) {
+ super(columnNames, operation);
+ fields = initFields(pojoClass, columnNames);
+ }
+
+ public static List<Field> getAllFields(List<Field> fields, Class<?> type) {
+ fields.addAll(Arrays.asList(type.getDeclaredFields()));
+
+ if (type.getSuperclass() != null) {
+ getAllFields(fields, type.getSuperclass());
+ }
+
+ return fields;
+ }
+
+ private Field[] initFields(Class<T> pojoClass, String[] columnNames) {
+ Map<String, Field> allFields = new HashMap<>();
+ getAllFields(new ArrayList<>(), pojoClass).stream().forEach(f -> {
+ if (!allFields.containsKey(f.getName())) {
+ allFields.put(f.getName(), f);
+ }
+ });
+
+ Field[] fields = new Field[columnNames.length];
+
+ for (int i = 0; i < columnNames.length; i++) {
+ Field f = allFields.get(columnNames[i]);
+ if (f == null) {
+ throw new RuntimeException("Cannot find field " + columnNames[i] + ". List of detected fields: " + allFields.keySet());
+ }
+ f.setAccessible(true);
+ fields[i] = f;
+ }
+
+ return fields;
+ }
+
+ @Override
+ public Object getField(T input, int i) {
+ try {
+ return fields[i].get(input);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("This is a bug");
+ }
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowOperationMapper.java
similarity index 63%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowOperationMapper.java
index 8c9eab0..2e90598 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowOperationMapper.java
@@ -16,8 +16,22 @@
*/
package org.apache.flink.connectors.kudu.connector.writer;
-public enum KuduWriterMode {
- INSERT,
- UPDATE,
- UPSERT
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.Row;
+
+@PublicEvolving
+public class RowOperationMapper extends AbstractSingleOperationMapper<Row> {
+
+ protected RowOperationMapper(String[] columnNames) {
+ super(columnNames);
+ }
+
+ public RowOperationMapper(String[] columnNames, KuduOperation operation) {
+ super(columnNames, operation);
+ }
+
+ @Override
+ public Object getField(Row input, int i) {
+ return input.getField(i);
+ }
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/TupleOperationMapper.java
similarity index 62%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/TupleOperationMapper.java
index 27b2ed3..491168b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/TupleOperationMapper.java
@@ -16,17 +16,22 @@
*/
package org.apache.flink.connectors.kudu.connector.writer;
-import static org.apache.kudu.client.SessionConfiguration.*;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.tuple.Tuple;
-public enum KuduWriterConsistency {
- EVENTUAL(FlushMode.AUTO_FLUSH_BACKGROUND),
- STRONG(FlushMode.AUTO_FLUSH_SYNC),
- //CHECKPOINT(FlushMode.MANUAL_FLUSH)
- ;
+@PublicEvolving
+public class TupleOperationMapper<T extends Tuple> extends AbstractSingleOperationMapper<T> {
- public final FlushMode flushMode;
+ protected TupleOperationMapper(String[] columnNames) {
+ super(columnNames);
+ }
- KuduWriterConsistency(FlushMode flushMode) {
- this.flushMode = flushMode;
+ public TupleOperationMapper(String[] columnNames, KuduOperation operation) {
+ super(columnNames, operation);
+ }
+
+ @Override
+ public Object getField(T input, int i) {
+ return input.getField(i);
}
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
index d523b67..a671408 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
@@ -18,56 +18,83 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * Streaming Sink that executes Kudu operations based on the incoming elements.
+ * The target Kudu table is defined in the {@link KuduTableInfo} object together with parameters for table
+ * creation in case the table does not exist.
+ * <p>
+ * Incoming records are mapped to Kudu table operations using the provided {@link KuduOperationMapper} logic. While
+ * failures resulting from the operations are handled by the {@link KuduFailureHandler} instance.
+ *
+ * @param <IN> Type of the input records
+ */
@PublicEvolving
-public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+public class KuduSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
private final Logger log = LoggerFactory.getLogger(getClass());
private final KuduTableInfo tableInfo;
private final KuduWriterConfig writerConfig;
private final KuduFailureHandler failureHandler;
- private final KuduSerialization<OUT> serializer;
-
+ private final KuduOperationMapper<IN> opsMapper;
private transient KuduWriter kuduWriter;
- public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
- this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+ /**
+ * Creates a new {@link KuduSink} that will execute operations against the specified Kudu table (defined in {@link KuduTableInfo})
+ * for the incoming stream elements.
+ *
+ * @param writerConfig Writer configuration
+ * @param tableInfo Table information for the target table
+ * @param opsMapper Mapping logic from inputs to Kudu operations
+ */
+ public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper) {
+ this(writerConfig, tableInfo, opsMapper, new DefaultKuduFailureHandler());
}
- public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer, KuduFailureHandler failureHandler) {
- this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
- this.writerConfig = checkNotNull(writerConfig,"config could not be null");
- this.serializer = checkNotNull(serializer,"serializer could not be null");
- this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+ /**
+ * Creates a new {@link KuduSink} that will execute operations against the specified Kudu table (defined in {@link KuduTableInfo})
+ * for the incoming stream elements.
+ *
+ * @param writerConfig Writer configuration
+ * @param tableInfo Table information for the target table
+ * @param opsMapper Mapping logic from inputs to Kudu operations
+ * @param failureHandler Custom failure handler instance
+ */
+ public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper, KuduFailureHandler failureHandler) {
+ this.tableInfo = checkNotNull(tableInfo, "tableInfo could not be null");
+ this.writerConfig = checkNotNull(writerConfig, "config could not be null");
+ this.opsMapper = checkNotNull(opsMapper, "opsMapper could not be null");
+ this.failureHandler = checkNotNull(failureHandler, "failureHandler could not be null");
}
@Override
public void open(Configuration parameters) throws Exception {
- kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
-
- serializer.withSchema(kuduWriter.getSchema());
+ kuduWriter = new KuduWriter(tableInfo, writerConfig, opsMapper, failureHandler);
}
@Override
- public void invoke(OUT value) throws Exception {
- final KuduRow kuduRow = serializer.serialize(value);
- kuduWriter.write(kuduRow);
+ public void invoke(IN value) throws Exception {
+ try {
+ kuduWriter.write(value);
+ } catch (ClassCastException e) {
+ failureHandler.onTypeMismatch(e);
+ }
}
@Override
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
new file mode 100644
index 0000000..2e1c63e
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+
+/**
+ * Read-only catalog.
+ */
+@Internal
+public abstract class AbstractReadOnlyCatalog extends AbstractCatalog {
+
+ private static final CatalogException UNSUPPORTED_ERR = new CatalogException("This action is not supported for read-only catalogs");
+
+ public AbstractReadOnlyCatalog(String name, String defaultDatabase) {
+ super(name, defaultDatabase);
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException {
+ throw UNSUPPORTED_ERR;
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException {
+ throw UNSUPPORTED_ERR;
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
new file mode 100644
index 0000000..7b3c987
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.AlterTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.shaded.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_MASTERS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Catalog for reading and creating Kudu tables.
+ */
+@PublicEvolving
+public class KuduCatalog extends AbstractReadOnlyCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduCatalog.class);
+ private final KuduTableFactory tableFactory = new KuduTableFactory();
+ private final String kuduMasters;
+ private KuduClient kuduClient;
+
+ /**
+ * Create a new {@link KuduCatalog} with the specified catalog name and kudu master addresses.
+ *
+ * @param catalogName Name of the catalog (used by the table environment)
+ * @param kuduMasters Connection address to Kudu
+ */
+ public KuduCatalog(String catalogName, String kuduMasters) {
+ super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+ this.kuduMasters = kuduMasters;
+ this.kuduClient = createClient();
+ }
+
+ /**
+ * Create a new {@link KuduCatalog} with the specified kudu master addresses.
+ *
+ * @param kuduMasters Connection address to Kudu
+ */
+ public KuduCatalog(String kuduMasters) {
+ this("kudu", kuduMasters);
+ }
+
+ public Optional<TableFactory> getTableFactory() {
+ return Optional.of(getKuduTableFactory());
+ }
+
+ public KuduTableFactory getKuduTableFactory() {
+ return tableFactory;
+ }
+
+ private KuduClient createClient() {
+ return new KuduClient.KuduClientBuilder(kuduMasters).build();
+ }
+
+ @Override
+ public void open() {}
+
+ @Override
+ public void close() {
+ try {
+ if (kuduClient != null) {
+ kuduClient.close();
+ }
+ } catch (KuduException e) {
+ LOG.error("Error while closing kudu client", e);
+ }
+ }
+
+ public ObjectPath getObjectPath(String tableName) {
+ return new ObjectPath(getDefaultDatabase(), tableName);
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ try {
+ return kuduClient.getTablesList().getTablesList();
+ } catch (Throwable t) {
+ throw new CatalogException("Could not list tables", t);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) {
+ checkNotNull(tablePath);
+ try {
+ return kuduClient.tableExists(tablePath.getObjectName());
+ } catch (KuduException e) {
+ throw new CatalogException(e);
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException {
+ checkNotNull(tablePath);
+
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+
+ String tableName = tablePath.getObjectName();
+
+ try {
+ KuduTable kuduTable = kuduClient.openTable(tableName);
+
+ CatalogTableImpl table = new CatalogTableImpl(
+ KuduTableUtils.kuduToFlinkSchema(kuduTable.getSchema()),
+ createTableProperties(tableName, kuduTable.getSchema().getPrimaryKeyColumns()),
+ tableName);
+
+ return table;
+ } catch (KuduException e) {
+ throw new CatalogException(e);
+ }
+ }
+
+ protected Map<String, String> createTableProperties(String tableName, List<ColumnSchema> primaryKeyColumns) {
+ Map<String, String> props = new HashMap<>();
+ props.put(KUDU_MASTERS, kuduMasters);
+ String primaryKeyNames = primaryKeyColumns.stream().map(ColumnSchema::getName).collect(Collectors.joining(","));
+ props.put(KUDU_PRIMARY_KEY_COLS, primaryKeyNames);
+ props.put(KuduTableFactory.KUDU_TABLE, tableName);
+ return props;
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException {
+ String tableName = tablePath.getObjectName();
+ try {
+ if (tableExists(tablePath)) {
+ kuduClient.deleteTable(tableName);
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ } catch (KuduException e) {
+ throw new CatalogException("Could not delete table " + tableName, e);
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException {
+ String tableName = tablePath.getObjectName();
+ try {
+ if (tableExists(tablePath)) {
+ kuduClient.alterTable(tableName, new AlterTableOptions().renameTable(newTableName));
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ } catch (KuduException e) {
+ throw new CatalogException("Could not rename table " + tableName, e);
+ }
+ }
+
+ public void createTable(KuduTableInfo tableInfo, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException {
+ ObjectPath path = getObjectPath(tableInfo.getName());
+ if (tableExists(path)) {
+ if (ignoreIfExists) {
+ return;
+ } else {
+ throw new TableAlreadyExistException(getName(), path);
+ }
+ }
+
+ try {
+ kuduClient.createTable(tableInfo.getName(), tableInfo.getSchema(), tableInfo.getCreateTableOptions());
+ } catch (
+ KuduException e) {
+ throw new CatalogException("Could not create table " + tableInfo.getName(), e);
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
+ Map<String, String> tableProperties = table.getProperties();
+ TableSchema tableSchema = table.getSchema();
+
+ Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS));
+ Set<String> requiredProperties = new HashSet<>(Arrays.asList(KUDU_HASH_COLS));
+
+ if (!tableSchema.getPrimaryKey().isPresent()) {
+ requiredProperties.add(KUDU_PRIMARY_KEY_COLS);
+ }
+
+ if (!tableProperties.keySet().containsAll(requiredProperties)) {
+ throw new CatalogException("Missing required property. The following properties must be provided: " +
+ requiredProperties.toString());
+ }
+
+ Set<String> permittedProperties = Sets.union(requiredProperties, optionalProperties);
+ if (!permittedProperties.containsAll(tableProperties.keySet())) {
+ throw new CatalogException("Unpermitted properties were given. The following properties are allowed:" +
+ permittedProperties.toString());
+ }
+
+ String tableName = tablePath.getObjectName();
+
+ KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, tableSchema, tableProperties);
+
+ createTable(tableInfo, ignoreIfExists);
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return Lists.newArrayList(getDefaultDatabase());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ if (databaseName.equals(getDefaultDatabase())) {
+ return new CatalogDatabaseImpl(new HashMap<>(), null);
+ } else {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ return listDatabases().contains(databaseName);
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ return false;
+ }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
new file mode 100644
index 0000000..1018cae
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.CatalogFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+
+/**
+ * Factory for {@link KuduCatalog}.
+ */
+@Internal
+public class KuduCatalogFactory implements CatalogFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduCatalogFactory.class);
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CATALOG_TYPE, KuduTableFactory.KUDU);
+ context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+
+ properties.add(KuduTableFactory.KUDU_MASTERS);
+
+ return properties;
+ }
+
+ @Override
+ public Catalog createCatalog(String name, Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+ return new KuduCatalog(name, descriptorProperties.getString(KuduTableFactory.KUDU_MASTERS));
+ }
+
+ private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+ descriptorProperties.validateString(KuduTableFactory.KUDU_MASTERS, false);
+
+ return descriptorProperties;
+ }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
new file mode 100644
index 0000000..49b09a2
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.types.Row;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
+
+ public static final String KUDU_TABLE = "kudu.table";
+ public static final String KUDU_MASTERS = "kudu.masters";
+ public static final String KUDU_HASH_COLS = "kudu.hash-columns";
+ public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
+ public static final String KUDU_REPLICAS = "kudu.replicas";
+ public static final String KUDU = "kudu";
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CONNECTOR_TYPE, KUDU);
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+ properties.add(KUDU_TABLE);
+ properties.add(KUDU_MASTERS);
+ properties.add(KUDU_HASH_COLS);
+ properties.add(KUDU_PRIMARY_KEY_COLS);
+ // schema
+ properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
+ properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+ properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+ properties.add(SCHEMA + ".#." + SCHEMA_FROM);
+ // computed column
+ properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+
+ // time attributes
+ properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
+
+ // watermark
+ properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
+ properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
+ properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ return properties;
+ }
+
+ private DescriptorProperties getValidatedProps(Map<String, String> properties) {
+ checkNotNull(properties.get(KUDU_MASTERS), "Missing required property " + KUDU_MASTERS);
+ checkNotNull(properties.get(KUDU_TABLE), "Missing required property " + KUDU_TABLE);
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+ new SchemaValidator(true, false, false).validate(descriptorProperties);
+ return descriptorProperties;
+ }
+
+ @Override
+ public KuduTableSource createTableSource(Map<String, String> properties) {
+ DescriptorProperties descriptorProperties = getValidatedProps(properties);
+ String tableName = descriptorProperties.getString(KUDU_TABLE);
+ TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
+ return createTableSource(tableName, schema, properties);
+ }
+
+ @Override
+ public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) {
+ String tableName = tablePath.getObjectName();
+ return createTableSource(tableName, table.getSchema(), table.getProperties());
+ }
+
+ private KuduTableSource createTableSource(String tableName, TableSchema schema, Map<String, String> props) {
+ String masterAddresses = props.get(KUDU_MASTERS);
+ TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
+ KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);
+
+ KuduReaderConfig.Builder configBuilder = KuduReaderConfig.Builder
+ .setMasters(masterAddresses);
+
+ return new KuduTableSource(configBuilder, tableInfo, physicalSchema, null);
+ }
+
+ @Override
+ public KuduTableSink createTableSink(Map<String, String> properties) {
+ DescriptorProperties descriptorProperties = getValidatedProps(properties);
+ String tableName = descriptorProperties.getString(KUDU_TABLE);
+ TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
+
+ return createTableSink(tableName, schema, properties);
+ }
+
+ @Override
+ public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) {
+ return createTableSink(tablePath.getObjectName(), table.getSchema(), table.getProperties());
+ }
+
+ private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
+ String masterAddresses = props.get(KUDU_MASTERS);
+ TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
+ KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);
+
+ KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
+ .setMasters(masterAddresses);
+
+ return new KuduTableSink(configBuilder, tableInfo, physicalSchema);
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
new file mode 100644
index 0000000..8ff517a
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.streaming.KuduSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+public class KuduTableSink implements UpsertStreamTableSink<Row> {
+
+ private final KuduWriterConfig.Builder writerConfigBuilder;
+ private final TableSchema flinkSchema;
+ private final KuduTableInfo tableInfo;
+
+ public KuduTableSink(KuduWriterConfig.Builder configBuilder, KuduTableInfo tableInfo, TableSchema flinkSchema) {
+ this.writerConfigBuilder = configBuilder;
+ this.tableInfo = tableInfo;
+ this.flinkSchema = flinkSchema;
+ }
+
+ @Override
+ public void setKeyFields(String[] keyFields) { /* this has no effect */}
+
+ @Override
+ public void setIsAppendOnly(Boolean isAppendOnly) { /* this has no effect */}
+
+ @Override
+ public TypeInformation<Row> getRecordType() { return flinkSchema.toRowType(); }
+
+ @Override
+ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) { consumeDataStream(dataStreamTuple); }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) {
+ KuduSink upsertKuduSink = new KuduSink(writerConfigBuilder.build(), tableInfo, new UpsertOperationMapper(getTableSchema().getFieldNames()));
+
+ return dataStreamTuple
+ .addSink(upsertKuduSink)
+ .setParallelism(dataStreamTuple.getParallelism())
+ .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getTableSchema().getFieldNames()));
+ }
+
+ @Override
+ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ return new KuduTableSink(writerConfigBuilder, tableInfo, flinkSchema);
+ }
+
+ @Override
+ public TableSchema getTableSchema() { return flinkSchema; }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
new file mode 100644
index 0000000..2c972fb
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.LimitableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSource<Row>, ProjectableTableSource<Row> {
+
+ private final KuduReaderConfig.Builder configBuilder;
+ private final KuduTableInfo tableInfo;
+ private final TableSchema flinkSchema;
+ private final String[] projectedFields;
+
+ public KuduTableSource(KuduReaderConfig.Builder configBuilder, KuduTableInfo tableInfo, TableSchema flinkSchema, String[] projectedFields) {
+ this.configBuilder = configBuilder;
+ this.tableInfo = tableInfo;
+ this.flinkSchema = flinkSchema;
+ this.projectedFields = projectedFields;
+ }
+
+ @Override
+ public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+
+ KuduRowInputFormat inputFormat = new KuduRowInputFormat(configBuilder.build(), tableInfo, projectedFields == null ? null : Lists.newArrayList(projectedFields));
+
+ return env.createInput(inputFormat, (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())).name(explainSource());
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return flinkSchema;
+ }
+
+ @Override
+ public DataType getProducedDataType() {
+ if (projectedFields == null) {
+ return flinkSchema.toRowDataType();
+ } else {
+ DataTypes.Field[] fields = new DataTypes.Field[projectedFields.length];
+ for (int i = 0; i < fields.length; i++) {
+ String fieldName = projectedFields[i];
+ fields[i] = DataTypes.FIELD(
+ fieldName,
+ flinkSchema
+ .getTableColumn(fieldName)
+ .get()
+ .getType()
+ );
+ }
+ return DataTypes.ROW(fields);
+ }
+ }
+
+ @Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
+ public boolean isLimitPushedDown() {
+ return true;
+ }
+
+ @Override
+ public TableSource<Row> applyLimit(long l) {
+ return new KuduTableSource(configBuilder.setRowLimit((int) l), tableInfo, flinkSchema, projectedFields);
+ }
+
+ @Override
+ public TableSource<Row> projectFields(int[] ints) {
+ String[] fieldNames = new String[ints.length];
+ RowType producedDataType = (RowType) getProducedDataType().getLogicalType();
+ List<String> prevFieldNames = producedDataType.getFieldNames();
+ for (int i = 0; i < ints.length; i++) {
+ fieldNames[i] = prevFieldNames.get(ints[i]);
+ }
+ return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+ }
+
+ @Override
+ public String explainSource() {
+ return "KuduStreamTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames())
+ + (projectedFields != null ?", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
new file mode 100644
index 0000000..847dad4
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import java.util.Optional;
+
+@Internal
+public class UpsertOperationMapper extends AbstractSingleOperationMapper<Tuple2<Boolean, Row>> {
+
+ public UpsertOperationMapper(String[] columnNames) {
+ super(columnNames);
+ }
+
+ @Override
+ public Object getField(Tuple2<Boolean, Row> input, int i) {
+ return input.f1.getField(i);
+ }
+
+ @Override
+ public Optional<Operation> createBaseOperation(Tuple2<Boolean, Row> input, KuduTable table) {
+ return Optional.of(input.f0 ? table.newUpsert() : table.newDelete());
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
new file mode 100644
index 0000000..5fc8cca
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.ColumnSchemasFactory;
+import org.apache.flink.connectors.kudu.connector.CreateTableOptionsFactory;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.table.KuduTableFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
+
+public class KuduTableUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduTableUtils.class);
+
+ public static KuduTableInfo createTableInfo(String tableName, TableSchema schema, Map<String, String> props) {
+ // Since KUDU_HASH_COLS is a required property for table creation, we use it to infer whether to create table
+ boolean createIfMissing = props.containsKey(KUDU_HASH_COLS);
+
+ KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
+
+ if (createIfMissing) {
+
+ List<Tuple2<String, DataType>> columns = getSchemaWithSqlTimestamp(schema)
+ .getTableColumns()
+ .stream()
+ .map(tc -> Tuple2.of(tc.getName(), tc.getType()))
+ .collect(Collectors.toList());
+
+ List<String> keyColumns = getPrimaryKeyColumns(props, schema);
+ ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, keyColumns);
+ List<String> hashColumns = getHashColumns(props);
+ int replicas = Optional.ofNullable(props.get(KuduTableFactory.KUDU_REPLICAS)).map(Integer::parseInt).orElse(1);
+
+ CreateTableOptionsFactory optionsFactory = () -> new CreateTableOptions()
+ .setNumReplicas(replicas)
+ .addHashPartitions(hashColumns, replicas * 2);
+
+ tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);
+ } else {
+ LOG.debug("Property {} is missing, assuming the table is already created.", KUDU_HASH_COLS);
+ }
+
+ return tableInfo;
+ }
+
+ public static List<ColumnSchema> toKuduConnectorColumns(List<Tuple2<String, DataType>> columns, Collection<String> keyColumns) {
+ return columns.stream()
+ .map(t -> {
+ ColumnSchema.ColumnSchemaBuilder builder = new ColumnSchema
+ .ColumnSchemaBuilder(t.f0, KuduTypeUtils.toKuduType(t.f1))
+ .key(keyColumns.contains(t.f0))
+ .nullable(!keyColumns.contains(t.f0) && t.f1.getLogicalType().isNullable());
+ if(t.f1.getLogicalType() instanceof DecimalType) {
+ DecimalType decimalType = ((DecimalType) t.f1.getLogicalType());
+ builder.typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+ .precision(decimalType.getPrecision())
+ .scale(decimalType.getScale())
+ .build());
+ }
+ return builder.build();
+ }
+ ).collect(Collectors.toList());
+ }
+
+ public static TableSchema kuduToFlinkSchema(Schema schema) {
+ TableSchema.Builder builder = TableSchema.builder();
+
+ for (ColumnSchema column : schema.getColumns()) {
+ DataType flinkType = KuduTypeUtils.toFlinkType(column.getType(), column.getTypeAttributes()).nullable();
+ builder.field(column.getName(), flinkType);
+ }
+
+ return builder.build();
+ }
+
+ public static List<String> getPrimaryKeyColumns(Map<String, String> tableProperties, TableSchema tableSchema) {
+ return tableProperties.containsKey(KUDU_PRIMARY_KEY_COLS) ? Arrays.asList(tableProperties.get(KUDU_PRIMARY_KEY_COLS).split(",")) : tableSchema.getPrimaryKey().get().getColumns();
+ }
+
+ public static List<String> getHashColumns(Map<String, String> tableProperties) {
+ return Lists.newArrayList(tableProperties.get(KUDU_HASH_COLS).split(","));
+ }
+
+ public static TableSchema getSchemaWithSqlTimestamp(TableSchema schema) {
+ TableSchema.Builder builder = new TableSchema.Builder();
+ TableSchemaUtils.getPhysicalSchema(schema).getTableColumns().forEach(
+ tableColumn -> {
+ if (tableColumn.getType().getLogicalType() instanceof TimestampType) {
+ builder.field(tableColumn.getName(), tableColumn.getType().bridgedTo(Timestamp.class));
+ } else {
+ builder.field(tableColumn.getName(), tableColumn.getType());
+ }
+ });
+ return builder.build();
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
new file mode 100644
index 0000000..c445465
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Type;
+
+import java.sql.Timestamp;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class KuduTypeUtils {
+
+ public static DataType toFlinkType(Type type, ColumnTypeAttributes typeAttributes) {
+ switch (type) {
+ case STRING:
+ return DataTypes.STRING();
+ case FLOAT:
+ return DataTypes.FLOAT();
+ case INT8:
+ return DataTypes.TINYINT();
+ case INT16:
+ return DataTypes.SMALLINT();
+ case INT32:
+ return DataTypes.INT();
+ case INT64:
+ return DataTypes.BIGINT();
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case DECIMAL:
+ return DataTypes.DECIMAL(typeAttributes.getPrecision(), typeAttributes.getScale());
+ case BOOL:
+ return DataTypes.BOOLEAN();
+ case BINARY:
+ return DataTypes.BYTES();
+ case UNIXTIME_MICROS:
+ return new AtomicDataType(new TimestampType(3), Timestamp.class);
+
+ default:
+ throw new IllegalArgumentException("Illegal var type: " + type);
+ }
+ }
+
+ public static Type toKuduType(DataType dataType) {
+ checkNotNull(dataType, "type cannot be null");
+ LogicalType logicalType = dataType.getLogicalType();
+ return logicalType.accept(new KuduTypeLogicalTypeVisitor(dataType));
+ }
+
+ private static class KuduTypeLogicalTypeVisitor extends LogicalTypeDefaultVisitor<Type> {
+
+ private final DataType dataType;
+
+ KuduTypeLogicalTypeVisitor(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public Type visit(BooleanType booleanType) {
+ return Type.BOOL;
+ }
+
+ @Override
+ public Type visit(TinyIntType tinyIntType) {
+ return Type.INT8;
+ }
+
+ @Override
+ public Type visit(SmallIntType smallIntType) {
+ return Type.INT16;
+ }
+
+ @Override
+ public Type visit(IntType intType) {
+ return Type.INT32;
+ }
+
+ @Override
+ public Type visit(BigIntType bigIntType) {
+ return Type.INT64;
+ }
+
+ @Override
+ public Type visit(FloatType floatType) {
+ return Type.FLOAT;
+ }
+
+ @Override
+ public Type visit(DoubleType doubleType) {
+ return Type.DOUBLE;
+ }
+
+ @Override
+ public Type visit(DecimalType decimalType) {
+ return Type.DECIMAL;
+ }
+
+ @Override
+ public Type visit(TimestampType timestampType) {
+ return Type.UNIXTIME_MICROS;
+ }
+
+ @Override
+ public Type visit(VarCharType varCharType) {
+ return Type.STRING;
+ }
+
+ @Override
+ public Type visit(VarBinaryType varBinaryType) {
+ return Type.BINARY;
+ }
+
+ @Override
+ protected Type defaultMethod(LogicalType logicalType) {
+ throw new UnsupportedOperationException(
+ String.format("Flink doesn't support converting type %s to Kudu type yet.", dataType.toString()));
+ }
+
+ }
+}
diff --git a/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..487f2d9
--- /dev/null
+++ b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connectors.kudu.table.KuduTableFactory
+org.apache.flink.connectors.kudu.table.KuduCatalogFactory
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
index e22f40e..fb7d8e3 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
@@ -16,12 +16,12 @@
*/
package org.apache.flink.connectors.kudu.batch;
-import org.apache.flink.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.flink.types.Row;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -29,27 +29,27 @@
import java.util.Arrays;
import java.util.List;
-class KuduInputFormatTest extends KuduDatabase {
+class KuduInputFormatTest extends KuduTestBase {
@Test
void testInvalidKuduMaster() {
- KuduTableInfo tableInfo = booksTableInfo("books",false);
- Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(null, tableInfo, new DefaultSerDe()));
+ KuduTableInfo tableInfo = booksTableInfo("books", false);
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(null, tableInfo));
}
@Test
void testInvalidTableInfo() {
String masterAddresses = harness.getMasterAddressesAsString();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
- Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(readerConfig, null, new DefaultSerDe()));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null));
}
@Test
void testInputFormat() throws Exception {
- KuduTableInfo tableInfo = booksTableInfo("books",true);
+ KuduTableInfo tableInfo = booksTableInfo("books", true);
setUpDatabase(tableInfo);
- List<KuduRow> rows = readRows(tableInfo);
+ List<Row> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
cleanDatabase(tableInfo);
@@ -57,32 +57,32 @@
@Test
void testInputFormatWithProjection() throws Exception {
- KuduTableInfo tableInfo = booksTableInfo("books",true);
+ KuduTableInfo tableInfo = booksTableInfo("books", true);
setUpDatabase(tableInfo);
- List<KuduRow> rows = readRows(tableInfo,"title","id");
+ List<Row> rows = readRows(tableInfo, "title", "id");
Assertions.assertEquals(5, rows.size());
- for (KuduRow row: rows) {
+ for (Row row : rows) {
Assertions.assertEquals(2, row.getArity());
}
cleanDatabase(tableInfo);
}
-
- private List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
+ private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
- KuduInputFormat<KuduRow> inputFormat = new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe(), new ArrayList<>(), Arrays.asList(fieldProjection));
+ KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(),
+ fieldProjection == null ? null : Arrays.asList(fieldProjection));
KuduInputSplit[] splits = inputFormat.createInputSplits(1);
- List<KuduRow> rows = new ArrayList<>();
+ List<Row> rows = new ArrayList<>();
for (KuduInputSplit split : splits) {
inputFormat.open(split);
- while(!inputFormat.reachedEnd()) {
- KuduRow row = inputFormat.nextRecord(new KuduRow(5));
- if(row != null) {
+ while (!inputFormat.reachedEnd()) {
+ Row row = inputFormat.nextRecord(new Row(5));
+ if (row != null) {
rows.add(row);
}
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
index f14eaa0..abf8a30 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
@@ -16,60 +16,62 @@
*/
package org.apache.flink.connectors.kudu.batch;
-import org.apache.flink.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.types.Row;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.UUID;
-class KuduOuputFormatTest extends KuduDatabase {
+class KuduOuputFormatTest extends KuduTestBase {
@Test
void testInvalidKuduMaster() {
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, null));
}
@Test
void testInvalidTableInfo() {
String masterAddresses = harness.getMasterAddressesAsString();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, new DefaultSerDe()));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, null));
}
@Test
void testNotTableExist() {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
- KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
- Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
+ KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+ Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0, 1));
}
@Test
void testOutputWithStrongConsistency() throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
.setMasters(masterAddresses)
.setStrongConsistency()
.build();
- KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+ KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
- outputFormat.open(0,1);
+ outputFormat.open(0, 1);
- for (KuduRow kuduRow : booksDataRow()) {
+ for (Row kuduRow : booksDataRow()) {
outputFormat.writeRecord(kuduRow);
}
outputFormat.close();
- List<KuduRow> rows = readRows(tableInfo);
+ List<Row> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
kuduRowsTest(rows);
@@ -80,16 +82,16 @@
void testOutputWithEventualConsistency() throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
.setMasters(masterAddresses)
.setEventualConsistency()
.build();
- KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+ KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
- outputFormat.open(0,1);
+ outputFormat.open(0, 1);
- for (KuduRow kuduRow : booksDataRow()) {
+ for (Row kuduRow : booksDataRow()) {
outputFormat.writeRecord(kuduRow);
}
@@ -98,7 +100,7 @@
outputFormat.close();
- List<KuduRow> rows = readRows(tableInfo);
+ List<Row> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
kuduRowsTest(rows);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
deleted file mode 100644
index cda8c21..0000000
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector;
-
-import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
-import org.apache.kudu.Type;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Rule;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@ExtendWith(ExternalResourceSupport.class)
-public class KuduDatabase {
-
- @Rule
- public static KuduTestHarness harness = new KuduTestHarness();
-
- private static final Object[][] booksTableData = {
- {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
- {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
- {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
- {1004, "A Cup of Java", "Kumar", 44.44, 44},
- {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
-
-
- protected static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
- return KuduTableInfo.Builder
- .create(tableName)
- .createIfNotExist(createIfNotExist)
- .replicas(1)
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).asNullable().build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).asNullable().build())
- .build();
- }
-
- protected static List<KuduRow> booksDataRow() {
- return Arrays.stream(booksTableData)
- .map(row -> {
- Integer rowId = (Integer)row[0];
- if (rowId % 2 == 1) {
- KuduRow values = new KuduRow(5);
- values.setField(0, "id", row[0]);
- values.setField(1, "title", row[1]);
- values.setField(2, "author", row[2]);
- values.setField(3, "price", row[3]);
- values.setField(4, "quantity", row[4]);
- return values;
- } else {
- KuduRow values = new KuduRow(3);
- values.setField(0, "id", row[0]);
- values.setField(1, "title", row[1]);
- values.setField(2, "author", row[2]);
- return values;
- }
- })
- .collect(Collectors.toList());
- }
-
- protected void setUpDatabase(KuduTableInfo tableInfo) {
- try {
- String masterAddresses = harness.getMasterAddressesAsString();
- KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
- KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
- booksDataRow().forEach(row -> {
- try {
- kuduWriter.write(row);
- }catch (Exception e) {
- e.printStackTrace();
- }
- });
- kuduWriter.close();
- } catch (Exception e) {
- Assertions.fail();
- }
- }
-
- protected void cleanDatabase(KuduTableInfo tableInfo) {
- try {
- String masterAddresses = harness.getMasterAddressesAsString();
- KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
- KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
- kuduWriter.deleteTable();
- kuduWriter.close();
- } catch (Exception e) {
- Assertions.fail();
- }
- }
-
- protected List<KuduRow> readRows(KuduTableInfo tableInfo) throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
- KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
- KuduReader reader = new KuduReader(tableInfo, readerConfig);
-
- KuduInputSplit[] splits = reader.createInputSplits(1);
- List<KuduRow> rows = new ArrayList<>();
- for (KuduInputSplit split : splits) {
- KuduReaderIterator resultIterator = reader.scanner(split.getScanToken());
- while(resultIterator.hasNext()) {
- KuduRow row = resultIterator.next();
- if(row != null) {
- rows.add(row);
- }
- }
- }
- reader.close();
-
- return rows;
- }
-
- protected void kuduRowsTest(List<KuduRow> rows) {
- for (KuduRow row: rows) {
- Integer rowId = (Integer)row.getField("id");
- if (rowId % 2 == 1) {
- Assertions.assertNotEquals(null, row.getField("price"));
- Assertions.assertNotEquals(null, row.getField("quantity"));
- }
- else {
- Assertions.assertNull(row.getField("price"));
- Assertions.assertNull(row.getField("quantity"));
- }
- }
- }
-}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
new file mode 100644
index 0000000..36572c6
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(ExternalResourceSupport.class)
+public class KuduTestBase {
+
+ private static final Object[][] booksTableData = {
+ {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
+ {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
+ {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
+ {1004, "A Cup of Java", "Kumar", 44.44, 44},
+ {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
+
+ public static KuduTestHarness harness;
+ public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
+
+ @BeforeAll
+ public static void beforeClass() throws Exception {
+ harness = new KuduTestHarness();
+ harness.before();
+ }
+
+ @AfterAll
+ public static void afterClass() throws Exception {
+ harness.after();
+ }
+
+ public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
+
+ KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
+
+ if (createIfNotExist) {
+ ColumnSchemasFactory schemasFactory = () -> {
+ List<ColumnSchema> schemas = new ArrayList<>();
+ schemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
+ schemas.add(new ColumnSchema.ColumnSchemaBuilder("title", Type.STRING).build());
+ schemas.add(new ColumnSchema.ColumnSchemaBuilder("author", Type.STRING).build());
+ schemas.add(new ColumnSchema.ColumnSchemaBuilder("price", Type.DOUBLE).nullable(true).build());
+ schemas.add(new ColumnSchema.ColumnSchemaBuilder("quantity", Type.INT32).nullable(true).build());
+ return schemas;
+ };
+
+ tableInfo.createTableIfNotExists(
+ schemasFactory,
+ () -> new CreateTableOptions()
+ .setNumReplicas(1)
+ .addHashPartitions(Lists.newArrayList("id"), 2));
+ }
+
+ return tableInfo;
+ }
+
+ public static List<Tuple5<Integer, String, String, Double, Integer>> booksDataTuple() {
+ return Arrays.stream(booksTableData)
+ .map(row -> {
+ Integer rowId = (Integer) row[0];
+ if (rowId % 2 == 1) {
+ Tuple5<Integer, String, String, Double, Integer> values =
+ Tuple5.of((Integer) row[0],
+ (String) row[1],
+ (String) row[2],
+ (Double) row[3],
+ (Integer)row[4]);
+ return values;
+ } else {
+ Tuple5<Integer, String, String, Double, Integer> values =
+ Tuple5.of((Integer) row[0],
+ (String) row[1],
+ (String) row[2],
+ null, null);
+ return values;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ public static List<Row> booksDataRow() {
+ return Arrays.stream(booksTableData)
+ .map(row -> {
+ Integer rowId = (Integer) row[0];
+ if (rowId % 2 == 1) {
+ Row values = new Row(5);
+ values.setField(0, row[0]);
+ values.setField(1, row[1]);
+ values.setField(2, row[2]);
+ values.setField(3, row[3]);
+ values.setField(4, row[4]);
+ return values;
+ } else {
+ Row values = new Row(5);
+ values.setField(0, row[0]);
+ values.setField(1, row[1]);
+ values.setField(2, row[2]);
+ return values;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ public static List<BookInfo> booksDataPojo() {
+ return Arrays.stream(booksTableData).map(row -> new BookInfo(
+ (int) row[0],
+ (String) row[1],
+ (String) row[2],
+ (Double) row[3],
+ (int) row[4]))
+ .collect(Collectors.toList());
+ }
+
+ protected void setUpDatabase(KuduTableInfo tableInfo) {
+ try {
+ String masterAddresses = harness.getMasterAddressesAsString();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+ booksDataRow().forEach(row -> {
+ try {
+ kuduWriter.write(row);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ kuduWriter.close();
+ } catch (Exception e) {
+ Assertions.fail();
+ }
+ }
+
+ protected void cleanDatabase(KuduTableInfo tableInfo) {
+ try {
+ String masterAddresses = harness.getMasterAddressesAsString();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+ kuduWriter.deleteTable();
+ kuduWriter.close();
+ } catch (Exception e) {
+ Assertions.fail();
+ }
+ }
+
+ protected List<Row> readRows(KuduTableInfo tableInfo) throws Exception {
+ String masterAddresses = harness.getMasterAddressesAsString();
+ KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+ KuduReader reader = new KuduReader(tableInfo, readerConfig);
+
+ KuduInputSplit[] splits = reader.createInputSplits(1);
+ List<Row> rows = new ArrayList<>();
+ for (KuduInputSplit split : splits) {
+ KuduReaderIterator resultIterator = reader.scanner(split.getScanToken());
+ while (resultIterator.hasNext()) {
+ Row row = resultIterator.next();
+ if (row != null) {
+ rows.add(row);
+ }
+ }
+ }
+ reader.close();
+
+ return rows;
+ }
+
+ protected void kuduRowsTest(List<Row> rows) {
+ for (Row row : rows) {
+ Integer rowId = (Integer) row.getField(0);
+ if (rowId % 2 == 1) {
+ Assertions.assertNotEquals(null, row.getField(3));
+ Assertions.assertNotEquals(null, row.getField(4));
+ } else {
+ Assertions.assertNull(row.getField(3));
+ Assertions.assertNull(row.getField(4));
+ }
+ }
+ }
+
+ protected void validateSingleKey(String tableName) throws Exception {
+ KuduTable kuduTable = harness.getClient().openTable(tableName);
+ Schema schema = kuduTable.getSchema();
+
+ assertEquals(1, schema.getPrimaryKeyColumnCount());
+ assertEquals(2, schema.getColumnCount());
+
+ assertTrue(schema.getColumn("first").isKey());
+ assertFalse(schema.getColumn("second").isKey());
+
+ KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ List<RowResult> rows = new ArrayList<>();
+ scanner.forEach(rows::add);
+
+ assertEquals(1, rows.size());
+ assertEquals("f", rows.get(0).getString("first"));
+ assertEquals("s", rows.get(0).getString("second"));
+ }
+
+ public static class BookInfo {
+ public int id, quantity;
+ public String title, author;
+ public Double price;
+
+ public BookInfo() {
+ }
+
+ public BookInfo(int id, String title, String author, Double price, int quantity) {
+ this.id = id;
+ this.title = title;
+ this.author = author;
+ this.price = price;
+ this.quantity = quantity;
+ }
+ }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
deleted file mode 100644
index 6057113..0000000
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.kudu.Type;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class PojoSerDeTest {
-
- public class TestPojo {
- private String field1;
- private String field2;
- private String field3;
-
- public TestPojo() {
- field1 = "field1";
- field2 = "field2";
- field3 = "field3";
- }
- }
-
- @Test
- public void testFieldsNotInSchema() {
-
- TestPojo pojo = new TestPojo();
-
- KuduTableInfo tableInfo = KuduTableInfo.Builder.create("test")
- .addColumn(KuduColumnInfo.Builder.create("field1", Type.STRING).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("field2", Type.STRING).build())
- .build();
-
- KuduRow row = new PojoSerDe<>(TestPojo.class).withSchema(tableInfo.getSchema()).serialize(pojo);
-
- Assertions.assertEquals(2, row.blindMap().size());
- Assertions.assertEquals("field1", row.getField("field1"));
- Assertions.assertEquals("field2", row.getField("field2"));
-
- }
-}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 38d0115..077306d 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -17,24 +17,32 @@
package org.apache.flink.connectors.kudu.streaming;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
-import org.apache.flink.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.UUID;
-class KuduSinkTest extends KuduDatabase {
+public class KuduSinkTest extends KuduTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
+ private static final String[] columns = new String[]{"id", "uuid"};
private static StreamingRuntimeContext context;
@BeforeAll
@@ -45,23 +53,24 @@
@Test
void testInvalidKuduMaster() {
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(null, tableInfo, new DefaultSerDe()));
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(null, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)));
}
@Test
void testInvalidTableInfo() {
+ harness.getClient();
String masterAddresses = harness.getMasterAddressesAsString();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
- Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new DefaultSerDe()));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)));
}
@Test
void testNotTableExist() {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
- KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+ KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
sink.setRuntimeContext(context);
Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
@@ -71,22 +80,22 @@
void testOutputWithStrongConsistency() throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
.setMasters(masterAddresses)
.setStrongConsistency()
.build();
- KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+ KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
sink.setRuntimeContext(context);
sink.open(new Configuration());
- for (KuduRow kuduRow : booksDataRow()) {
+ for (Row kuduRow : booksDataRow()) {
sink.invoke(kuduRow);
}
sink.close();
- List<KuduRow> rows = readRows(tableInfo);
+ List<Row> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
kuduRowsTest(rows);
}
@@ -95,17 +104,17 @@
void testOutputWithEventualConsistency() throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
.setMasters(masterAddresses)
.setEventualConsistency()
.build();
- KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+ KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
sink.setRuntimeContext(context);
sink.open(new Configuration());
- for (KuduRow kuduRow : booksDataRow()) {
+ for (Row kuduRow : booksDataRow()) {
sink.invoke(kuduRow);
}
@@ -114,37 +123,46 @@
sink.close();
- List<KuduRow> rows = readRows(tableInfo);
+ List<Row> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
kuduRowsTest(rows);
}
-
@Test
void testSpeed() throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = KuduTableInfo.Builder
- .create("test_speed")
- .createIfNotExist(true)
- .replicas(3)
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("uuid", Type.STRING).build())
- .build();
+ KuduTableInfo tableInfo = KuduTableInfo
+ .forTable("test_speed")
+ .createTableIfNotExists(
+ () ->
+ Lists.newArrayList(
+ new ColumnSchema
+ .ColumnSchemaBuilder("id", Type.INT32)
+ .key(true)
+ .build(),
+ new ColumnSchema
+ .ColumnSchemaBuilder("uuid", Type.STRING)
+ .build()
+ ),
+ () -> new CreateTableOptions()
+ .setNumReplicas(3)
+ .addHashPartitions(Lists.newArrayList("id"), 6));
+
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
.setMasters(masterAddresses)
.setEventualConsistency()
.build();
- KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+ KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
sink.setRuntimeContext(context);
sink.open(new Configuration());
int totalRecords = 100000;
- for (int i=0; i < totalRecords; i++) {
- KuduRow kuduRow = new KuduRow(2);
- kuduRow.setField(0, "id", i);
- kuduRow.setField(1, "uuid", UUID.randomUUID().toString());
+ for (int i = 0; i < totalRecords; i++) {
+ Row kuduRow = new Row(2);
+ kuduRow.setField(0, i);
+ kuduRow.setField(1, UUID.randomUUID().toString());
sink.invoke(kuduRow);
}
@@ -153,7 +171,7 @@
sink.close();
- List<KuduRow> rows = readRows(tableInfo);
+ List<Row> rows = readRows(tableInfo);
Assertions.assertEquals(totalRecords, rows.size());
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
new file mode 100644
index 0000000..f694108
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
+import org.apache.flink.connectors.kudu.streaming.KuduSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.flink.types.Row;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class KuduCatalogTest extends KuduTestBase {
+
+ private KuduCatalog catalog;
+ private StreamTableEnvironment tableEnv;
+
+ @BeforeEach
+ public void init() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+ tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
+ tableEnv.registerCatalog("kudu", catalog);
+ tableEnv.useCatalog("kudu");
+ }
+
+ @Test
+ public void testCreateAlterDrop() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+ tableEnv.sqlUpdate("INSERT INTO TestTable1 VALUES ('f', 's')");
+
+ // Add this once Primary key support has been enabled
+ // tableEnv.sqlUpdate("CREATE TABLE TestTable2 (`first` STRING, `second` String, PRIMARY KEY(`first`)) WITH ('kudu.hash-columns' = 'first')");
+ // tableEnv.sqlUpdate("INSERT INTO TestTable2 VALUES ('f', 's')");
+
+ tableEnv.execute("test");
+
+ validateSingleKey("TestTable1");
+ // validateSingleKey("TestTable2");
+
+ tableEnv.sqlUpdate("ALTER TABLE TestTable1 RENAME TO TestTable1R");
+ validateSingleKey("TestTable1R");
+
+ tableEnv.sqlUpdate("DROP TABLE TestTable1R");
+ assertFalse(harness.getClient().tableExists("TestTable1R"));
+ }
+
+ @Test
+ public void testCreateAndInsertMultiKey() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')");
+ tableEnv.sqlUpdate("INSERT INTO TestTable3 VALUES ('f', 2, 't')");
+
+ // Add this once Primary key support has been enabled
+ // tableEnv.sqlUpdate("CREATE TABLE TestTable4 (`first` STRING, `second` INT, `third` STRING) PRIMARY KEY (`first`, `second`) WITH ('kudu.hash-columns' = 'first,second')");
+ // tableEnv.sqlUpdate("INSERT INTO TestTable4 VALUES ('f', 2, 't')");
+
+ tableEnv.execute("test");
+
+ validateMultiKey("TestTable3");
+ // validateMultiKey("TestTable4");
+ }
+
+ @Test
+ public void testSourceProjection() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')");
+ tableEnv.sqlUpdate("INSERT INTO TestTable5 VALUES ('s', 'f', 't')");
+ tableEnv.execute("test");
+
+ tableEnv.sqlUpdate("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+ tableEnv.sqlUpdate("INSERT INTO TestTable6 (SELECT `first`, `second` FROM TestTable5)");
+ tableEnv.execute("test");
+
+ validateSingleKey("TestTable6");
+ }
+
+ @Test
+ public void testEmptyProjection() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+ tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f','s')");
+ tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f2','s2')");
+ tableEnv.execute("test");
+
+ Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP");
+
+ DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(result, Types.ROW(Types.LONG));
+
+ CollectionSink.output.clear();
+
+ resultDataStream
+ .map(t -> Tuple2.of(t.f0, t.f1.getField(0)))
+ .returns(Types.TUPLE(Types.BOOLEAN, Types.LONG))
+ .addSink(new CollectionSink<>()).setParallelism(1);
+
+ tableEnv.execute("test");
+
+ List<Tuple2<Boolean, Long>> expected = Lists.newArrayList(Tuple2.of(true, 1L), Tuple2.of(false, 1L), Tuple2.of(true, 2L));
+
+ assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
+ CollectionSink.output.clear();
+
+ }
+
+ @Test
+ public void dataStreamEndToEstTest() throws Exception {
+ KuduCatalog catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+ // Creating table through catalog
+ KuduTableFactory tableFactory = catalog.getKuduTableFactory();
+
+ KuduTableInfo tableInfo = KuduTableInfo.forTable("TestTable7").createTableIfNotExists(
+ () ->
+ Lists.newArrayList(
+ new ColumnSchema
+ .ColumnSchemaBuilder("k", Type.INT32)
+ .key(true)
+ .build(),
+ new ColumnSchema
+ .ColumnSchemaBuilder("v", Type.STRING)
+ .build()
+ ),
+ () -> new CreateTableOptions()
+ .setNumReplicas(1)
+ .addHashPartitions(Lists.newArrayList("k"), 2));
+
+ catalog.createTable(tableInfo, false);
+
+ ObjectPath path = catalog.getObjectPath("TestTable7");
+ CatalogTable table = catalog.getTable(path);
+
+ List<Tuple2<Integer, String>> input = Lists.newArrayList(Tuple2.of(1, "one"), Tuple2.of(2, "two"), Tuple2.of(3, "three"));
+
+ // Writing with simple sink
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(harness.getMasterAddressesAsString()).build();
+ env.fromCollection(input).addSink(
+ new KuduSink<>(
+ writerConfig,
+ tableInfo,
+ new TupleOperationMapper<>(
+ new String[]{"k", "v"},
+ AbstractSingleOperationMapper.KuduOperation.INSERT)
+ )
+ );
+ env.execute();
+ // Reading and validating data
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ CollectionSink.output.clear();
+ tableFactory.createTableSource(path, table)
+ .getDataStream(env)
+ .map(row -> Tuple2.of((int) row.getField(0), (String) row.getField(1)))
+ .returns(new TypeHint<Tuple2<Integer, String>>() {
+ })
+ .addSink(new CollectionSink<>()).setParallelism(1);
+ env.execute();
+
+ List<Tuple2<Integer, String>> expected = Lists.newArrayList(Tuple2.of(1, "one"), Tuple2.of(2, "two"), Tuple2.of(3, "three"));
+ assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
+ CollectionSink.output.clear();
+ }
+
+ @Test
+ public void testTimestamp() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " +
+ "WITH ('kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+ tableEnv.sqlUpdate("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
+ tableEnv.execute("test");
+
+ KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
+ assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
+
+ KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ List<RowResult> rows = new ArrayList<>();
+ scanner.forEach(rows::add);
+
+ assertEquals(1, rows.size());
+ assertEquals("f", rows.get(0).getString(0));
+ assertEquals(Timestamp.valueOf("2020-01-01 12:12:12.123"), rows.get(0).getTimestamp(1));
+ }
+
+ @Test
+ public void testDatatypes() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," +
+ "`fourth` TINYINT, `fifth` SMALLINT, `sixth` INT, `seventh` BIGINT, `eighth` FLOAT, `ninth` DOUBLE, " +
+ "`tenth` TIMESTAMP)" +
+ "WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+
+ tableEnv.sqlUpdate("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," +
+ "cast(34 as SMALLINT), 56, cast(78 as BIGINT), cast(3.14 as FLOAT), cast(1.2345 as DOUBLE)," +
+ "TIMESTAMP '2020-04-15 12:34:56.123') ");
+
+ tableEnv.execute("test");
+ validateManyTypes("TestTable8");
+ }
+
+ @Test
+ public void testMissingPropertiesCatalog() throws Exception {
+ assertThrows(TableException.class,
+ () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9a (`first` STRING, `second` String) " +
+ "WITH ('kudu.primary-key-columns' = 'second')"));
+ assertThrows(TableException.class,
+ () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+ "WITH ('kudu.hash-columns' = 'first')"));
+ assertThrows(TableException.class,
+ () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+ "WITH ('kudu.primary-key-columns' = 'second', 'kudu.hash-columns' = 'first')"));
+ }
+
+ private void validateManyTypes(String tableName) throws Exception {
+ KuduTable kuduTable = harness.getClient().openTable(tableName);
+ Schema schema = kuduTable.getSchema();
+
+ assertEquals(Type.STRING, schema.getColumn("first").getType());
+ assertEquals(Type.BOOL, schema.getColumn("second").getType());
+ assertEquals(Type.BINARY, schema.getColumn("third").getType());
+ assertEquals(Type.INT8, schema.getColumn("fourth").getType());
+ assertEquals(Type.INT16, schema.getColumn("fifth").getType());
+ assertEquals(Type.INT32, schema.getColumn("sixth").getType());
+ assertEquals(Type.INT64, schema.getColumn("seventh").getType());
+ assertEquals(Type.FLOAT, schema.getColumn("eighth").getType());
+ assertEquals(Type.DOUBLE, schema.getColumn("ninth").getType());
+ assertEquals(Type.UNIXTIME_MICROS, schema.getColumn("tenth").getType());
+
+ KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ List<RowResult> rows = new ArrayList<>();
+ scanner.forEach(rows::add);
+
+ assertEquals(1, rows.size());
+ assertEquals("f", rows.get(0).getString(0));
+ assertEquals(false, rows.get(0).getBoolean(1));
+ assertEquals(ByteBuffer.wrap("bbbb".getBytes()), rows.get(0).getBinary(2));
+ assertEquals(12, rows.get(0).getByte(3));
+ assertEquals(34, rows.get(0).getShort(4));
+ assertEquals(56, rows.get(0).getInt(5));
+ assertEquals(78, rows.get(0).getLong(6));
+ assertEquals(3.14, rows.get(0).getFloat(7), 0.01);
+ assertEquals(1.2345, rows.get(0).getDouble(8), 0.0001);
+ assertEquals(Timestamp.valueOf("2020-04-15 12:34:56.123"), rows.get(0).getTimestamp(9));
+ }
+
+ private void validateMultiKey(String tableName) throws Exception {
+ KuduTable kuduTable = harness.getClient().openTable(tableName);
+ Schema schema = kuduTable.getSchema();
+
+ assertEquals(2, schema.getPrimaryKeyColumnCount());
+ assertEquals(3, schema.getColumnCount());
+
+ assertTrue(schema.getColumn("first").isKey());
+ assertTrue(schema.getColumn("second").isKey());
+
+ assertFalse(schema.getColumn("third").isKey());
+
+ KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ List<RowResult> rows = new ArrayList<>();
+ scanner.forEach(rows::add);
+
+ assertEquals(1, rows.size());
+ assertEquals("f", rows.get(0).getString("first"));
+ assertEquals(2, rows.get(0).getInt("second"));
+ assertEquals("t", rows.get(0).getString("third"));
+ }
+
+ public static class CollectionSink<T> implements SinkFunction<T> {
+
+ public static List<Object> output = Collections.synchronizedList(new ArrayList<>());
+
+ public void invoke(T value, SinkFunction.Context context) throws Exception {
+ output.add(value);
+ }
+
+ }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
new file mode 100644
index 0000000..398ba9f
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class KuduTableFactoryTest extends KuduTestBase {
+ private StreamTableEnvironment tableEnv;
+ private String kuduMasters;
+
+ @BeforeEach
+ public void init() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
+ tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
+ kuduMasters = harness.getMasterAddressesAsString();
+ }
+
+ @Test
+ public void testMissingTable() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+ "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "')");
+ assertThrows(NullPointerException.class,
+ () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
+ }
+
+ @Test
+ public void testMissingMasters() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+ "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11')");
+ assertThrows(NullPointerException.class,
+ () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
+ }
+
+ @Test
+ public void testNonExistingTable() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+ "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "')");
+ tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)");
+ assertThrows(java.util.concurrent.ExecutionException.class,
+ () -> tableEnv.execute("test"));
+ }
+
+ @Test
+ public void testCreateTable() throws Exception {
+ tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
+ "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "', " +
+ "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+ tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 's')");
+ tableEnv.execute("test");
+
+ validateSingleKey("TestTable11");
+ }
+ @Test
+ public void testTimestamp() throws Exception {
+ // Timestamp should be bridged to sql.Timestamp
+ // Test it when creating the table...
+ tableEnv.sqlUpdate("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
+ "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "', " +
+ "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+ tableEnv.sqlUpdate("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
+ tableEnv.execute("test");
+
+ // And also when inserting into existing table
+ tableEnv.sqlUpdate("CREATE TABLE TestTableTsE (`first` STRING, `second` TIMESTAMP(3)) " +
+ "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "')");
+
+ tableEnv.sqlUpdate("INSERT INTO TestTableTsE values ('s', TIMESTAMP '2020-02-02 23:23:23')");
+ tableEnv.execute("test");
+
+ KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
+ assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
+
+ KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ HashSet<Timestamp> results = new HashSet<>();
+ scanner.forEach(sc -> results.add(sc.getTimestamp("second")));
+
+ assertEquals(2, results.size());
+ List<Timestamp> expected = Lists.newArrayList(
+ Timestamp.valueOf("2020-01-01 12:12:12.123"),
+ Timestamp.valueOf("2020-02-02 23:23:23"));
+ assertEquals(new HashSet<>(expected), results);
+ }
+
+ @Test
+ public void testExistingTable() throws Exception {
+ // Creating a table
+ tableEnv.sqlUpdate("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
+ "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "', " +
+ "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+
+ tableEnv.sqlUpdate("INSERT INTO TestTable12 values ('f', 's')");
+ tableEnv.execute("test");
+
+ // Then another one in SQL that refers to the previously created one
+ tableEnv.sqlUpdate("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
+ "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "')");
+ tableEnv.sqlUpdate("INSERT INTO TestTable12b values ('f2','s2')");
+ tableEnv.execute("test2");
+
+ // Validate that both insertions were into the same table
+ KuduTable kuduTable = harness.getClient().openTable("TestTable12");
+ KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ List<RowResult> rows = new ArrayList<>();
+ scanner.forEach(rows::add);
+
+ assertEquals(2, rows.size());
+ assertEquals("f", rows.get(0).getString("first"));
+ assertEquals("s", rows.get(0).getString("second"));
+ assertEquals("f2", rows.get(1).getString("first"));
+ assertEquals("s2", rows.get(1).getString("second"));
+ }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
new file mode 100644
index 0000000..b522c51
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+
+public class KuduTableTestUtils {
+
+ public static StreamTableEnvironment createTableEnvWithBlinkPlannerBatchMode(StreamExecutionEnvironment env) {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+ return tableEnv;
+ }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
new file mode 100644
index 0000000..f37b40d
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Update;
+import org.apache.kudu.client.Upsert;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractOperationTest {
+
+ public static final Schema tableSchema = KuduTestBase.booksTableInfo("test_table", true).getSchema();
+ @Mock
+ Insert mockInsert;
+ @Mock
+ Upsert mockUpsert;
+ @Mock
+ Update mockUpdate;
+ @Mock
+ Delete mockDelete;
+ @Mock
+ KuduTable mockTable;
+ @Mock
+ PartialRow mockPartialRow;
+
+ @BeforeEach
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ when(mockInsert.getRow()).thenReturn(mockPartialRow);
+ when(mockUpsert.getRow()).thenReturn(mockPartialRow);
+ when(mockUpdate.getRow()).thenReturn(mockPartialRow);
+ when(mockDelete.getRow()).thenReturn(mockPartialRow);
+ when(mockTable.newInsert()).thenReturn(mockInsert);
+ when(mockTable.newUpsert()).thenReturn(mockUpsert);
+ when(mockTable.newUpdate()).thenReturn(mockUpdate);
+ when(mockTable.newDelete()).thenReturn(mockDelete);
+ when(mockTable.getSchema()).thenReturn(tableSchema);
+ }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
new file mode 100644
index 0000000..45e0b1b
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.PojoOperationMapper;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class PojoOperationMapperTest extends AbstractOperationTest {
+
+ @Test
+ void testPojoMapper() {
+
+ PojoOperationMapper<BookInfo> mapper = new PojoOperationMapper<>(BookInfo.class, KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+
+ BookInfo bookInfo = KuduTestBase.booksDataPojo().get(0);
+
+ assertEquals(bookInfo.id, mapper.getField(bookInfo, 0));
+ assertEquals(bookInfo.title, mapper.getField(bookInfo, 1));
+ assertEquals(bookInfo.author, mapper.getField(bookInfo, 2));
+ assertEquals(bookInfo.price, mapper.getField(bookInfo, 3));
+ assertEquals(bookInfo.quantity, mapper.getField(bookInfo, 4));
+
+ List<Operation> operations = mapper.createOperations(bookInfo, mockTable);
+ assertEquals(1, operations.size());
+
+ PartialRow row = operations.get(0).getRow();
+ Mockito.verify(row, Mockito.times(1)).addObject("id", bookInfo.id);
+ Mockito.verify(row, Mockito.times(1)).addObject("quantity", bookInfo.quantity);
+
+ Mockito.verify(row, Mockito.times(1)).addObject("title", bookInfo.title);
+ Mockito.verify(row, Mockito.times(1)).addObject("author", bookInfo.author);
+
+ Mockito.verify(row, Mockito.times(1)).addObject("price", bookInfo.price);
+ }
+
+ @Test
+ public void testFieldInheritance() {
+ PojoOperationMapper<Second> mapper = new PojoOperationMapper<>(Second.class, new String[]{"s1", "i1", "i2"}, AbstractSingleOperationMapper.KuduOperation.INSERT);
+ Second s = new Second();
+ assertEquals("s1", mapper.getField(s, 0));
+ assertEquals(1, mapper.getField(s, 1));
+ assertEquals(2, mapper.getField(s, 2));
+ }
+
+ private static class First {
+ private int i1 = 1;
+ public int i2 = 2;
+ private String s1 = "ignore";
+ }
+
+ private static class Second extends First {
+ private String s1 = "s1";
+ }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
new file mode 100644
index 0000000..e737063
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.client.Operation;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+
+public class RowOperationMapperTest extends AbstractOperationTest {
+
+ @Test
+ void testGetField() {
+ RowOperationMapper mapper = new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+ Row inputRow = KuduTestBase.booksDataRow().get(0);
+
+ for (int i = 0; i < inputRow.getArity(); i++) {
+ Assertions.assertEquals(inputRow.getField(i), mapper.getField(inputRow, i));
+ }
+ }
+
+ @Test
+ void testCorrectOperationInsert() {
+ RowOperationMapper mapper = new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+ Row inputRow = KuduTestBase.booksDataRow().get(0);
+
+ List<Operation> operations = mapper.createOperations(inputRow, mockTable);
+
+ assertEquals(1, operations.size());
+ verify(mockTable).newInsert();
+ }
+
+ @Test
+ void testCorrectOperationUpsert() {
+ RowOperationMapper mapper = new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.UPSERT);
+ Row inputRow = KuduTestBase.booksDataRow().get(0);
+
+ List<Operation> operations = mapper.createOperations(inputRow, mockTable);
+
+ assertEquals(1, operations.size());
+ verify(mockTable).newUpsert();
+ }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
new file mode 100644
index 0000000..308a011
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
+
+import org.apache.kudu.client.Operation;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+
+public class TupleOpertaionMapperTest extends AbstractOperationTest {
+ @Test
+ void testGetField() {
+ TupleOperationMapper<Tuple5<Integer, String, String, Double, Integer>> mapper =
+ new TupleOperationMapper<>(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+ Tuple5<Integer, String, String, Double, Integer> inputTuple = KuduTestBase.booksDataTuple().get(0);
+
+ for (int i = 0; i < inputTuple.getArity(); i++) {
+ Assertions.assertEquals(inputTuple.getField(i), mapper.getField(inputTuple, i));
+ }
+ }
+
+ @Test
+ void testCorrectOperationInsert() {
+ TupleOperationMapper<Tuple5<Integer, String, String, Double, Integer>> mapper =
+ new TupleOperationMapper<>(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+ Tuple5<Integer, String, String, Double, Integer> inputTuple = KuduTestBase.booksDataTuple().get(0);
+
+ List<Operation> operations = mapper.createOperations(inputTuple, mockTable);
+
+ assertEquals(1, operations.size());
+ verify(mockTable).newInsert();
+ }
+
+ @Test
+ void testCorrectOperationUpsert() {
+ TupleOperationMapper<Tuple5<Integer, String, String, Double, Integer>> mapper =
+ new TupleOperationMapper<>(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.UPSERT);
+ Tuple5<Integer, String, String, Double, Integer> inputTuple = KuduTestBase.booksDataTuple().get(0);
+
+ List<Operation> operations = mapper.createOperations(inputTuple, mockTable);
+
+ assertEquals(1, operations.size());
+ verify(mockTable).newUpsert();
+ }
+}