Kudu Connector rework (#78)

Kudu connector rework including the addition of a
connector to the Table API for it. 

Co-authored-by: Gyula Fora <gyula@cloudera.com>
Co-authored-by: Balazs Varga <bvarga@cloudera.com>
diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 8692ca5..14c13eb 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -1,7 +1,11 @@
 # Flink Kudu Connector
 
-This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the
-following dependency to your project:
+This connector provides a source (```KuduInputFormat```), a sink/output
+(```KuduSink``` and ```KuduOutputFormat```, respectively),
+ as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`),
+ to allow reading and writing to [Kudu](https://kudu.apache.org/).
+
+To use this connector, add the following dependency to your project:
 
     <dependency>
       <groupId>org.apache.bahir</groupId>
@@ -9,104 +13,283 @@
       <version>1.1-SNAPSHOT</version>
     </dependency>
 
-*Version Compatibility*: This module is compatible with Apache Kudu *1.9.0* (last stable version).
+ *Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+.
 
 Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
-See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html).
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).
 
 ## Installing Kudu
 
 Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html).
-Optionally, you can use the docker images provided in dockers folder. 
+Optionally, you can use the docker images provided in dockers folder.
 
-## KuduInputFormat
+## SQL and Table API
+
+The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section)
+we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API.
+
+For more information about the possible queries please check the [official documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/)
+
+### Kudu Catalog
+
+The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management.
+By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only
+allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as
+in-memory catalog or Hive catalog.
+
+When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file:
 
 ```
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-env.setParallelism(PARALLELISM);
-
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
-        .create("books")
-        .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
-        .addColumn(KuduColumnInfo.Builder.createString("title").build())
-        .addColumn(KuduColumnInfo.Builder.createString("author").build())
-        .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
-        .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
-        .build();
-// create a reader configuration
-KuduReaderConfig readerConfig = KuduReaderConfig.Builder
-        .setMasters("172.25.0.6")
-        .setRowLimit(1000)
-        .build();    
-// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
-env.createInput(new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe()))
-        .count();
-        
-env.execute();
+catalogs:
+  - name: kudu
+    type: kudu
+    kudu.masters: <host>:7051
 ```
 
-## KuduOutputFormat
+Once the SQL CLI is started you can simply switch to the Kudu catalog by calling `USE CATALOG kudu;`
 
-```
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+You can also create and use the KuduCatalog directly in the Table environment:
 
-env.setParallelism(PARALLELISM);
-
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
-        .create("books")
-        .createIfNotExist(true)
-        .replicas(3)
-        .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
-        .addColumn(KuduColumnInfo.Builder.createString("title").build())
-        .addColumn(KuduColumnInfo.Builder.createString("author").build())
-        .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
-        .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
-        .build();
-// create a writer configuration
-KuduWriterConfig writerConfig = KuduWriterConfig.Builder
-        .setMasters("172.25.0.6")
-        .setUpsertWrite()
-        .setStrongConsistency()
-        .build();
-...
-
-env.fromCollection(books)
-        .output(new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe()));
-
-env.execute();
+```java
+String KUDU_MASTERS="host1:port1,host2:port2"
+KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
+tableEnv.registerCatalog("kudu", catalog);
+tableEnv.useCatalog("kudu");
 ```
 
-## KuduSink
+### DDL operations using SQL
+
+It is possible to manipulate Kudu tables using SQL DDL.
+
+When not using the Kudu catalog, the following additional properties must be specified in the `WITH` clause:
+* `'connector.type'='kudu'`
+* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of Kudu masters
+* `'kudu.table'='...'`: The table's name within the Kudu database.
+
+If you have registered and are using the Kudu catalog, these properties are handled automatically.
+
+To create a table, the additional properties `kudu.primary-key-columns` and `kudu.hash-columns` must be specified
+as comma-delimited lists. Optionally, you can set the `kudu.replicas` property (defaults to 1).
+Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use
+`catalog.createTable` as described in [this](#Creating-a-KuduTable-directly-with-KuduCatalog) section or create the table directly in Kudu.
+
+The `NOT NULL` constraint can be added to any of the column definitions.
+By setting a column as a primary key, it will automatically by created with the `NOT NULL` constraint.
+Hash columns must be a subset of primary key columns.
+
+Kudu Catalog
 
 ```
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setParallelism(PARALLELISM);
-
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
-        .create("books")
-        .createIfNotExist(true)
-        .replicas(3)
-        .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
-        .addColumn(KuduColumnInfo.Builder.createString("title").build())
-        .addColumn(KuduColumnInfo.Builder.createString("author").build())
-        .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
-        .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
-        .build();
-// create a writer configuration
-KuduWriterConfig writerConfig = KuduWriterConfig.Builder
-        .setMasters("172.25.0.6")
-        .setUpsertWrite()
-        .setStrongConsistency()
-        .build();
-...
-
-env.fromCollection(books)
-    .addSink(new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()));
-
-env.execute();
+CREATE TABLE TestTable (
+  first STRING,
+  second STRING,
+  third INT NOT NULL
+) WITH (
+  'kudu.hash-columns' = 'first',
+  'kudu.primary-key-columns' = 'first,second'
+)
 ```
+
+Other catalogs
+```
+CREATE TABLE TestTable (
+  first STRING,
+  second STRING,
+  third INT NOT NULL
+) WITH (
+  'connector.type' = 'kudu',
+  'kudu.masters' = '...',
+  'kudu.table' = 'TestTable',
+  'kudu.hash-columns' = 'first',
+  'kudu.primary-key-columns' = 'first,second'
+)
+```
+
+Renaming a table:
+```
+ALTER TABLE TestTable RENAME TO TestTableRen
+```
+
+Dropping a table:
+```sql
+DROP TABLE TestTableRen
+```
+
+#### Creating a KuduTable directly with KuduCatalog
+
+The KuduCatalog also exposes a simple `createTable` method that required only the where table configuration,
+including schema, partitioning, replication, etc. can be specified using a `KuduTableInfo` object.
+
+Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory` and
+a `CreateTableOptionsFactory` parameter, that implement respectively `getColumnSchemas()`
+returning a list of Kudu [ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html) objects;
+ and  `getCreateTableOptions()` returning a
+[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html) object.
+
+This example shows the creation of a table called `ExampleTable` with two columns,
+`first` being a primary key; and configuration of replicas and hash partitioning.
+
+```java
+KuduTableInfo tableInfo = KuduTableInfo
+    .forTable("ExampleTable")
+    .createTableIfNotExists(
+        () ->
+            Lists.newArrayList(
+                new ColumnSchema
+                    .ColumnSchemaBuilder("first", Type.INT32)
+                    .key(true)
+                    .build(),
+                new ColumnSchema
+                    .ColumnSchemaBuilder("second", Type.STRING)
+                    .build()
+            ),
+        () -> new CreateTableOptions()
+            .setNumReplicas(1)
+            .addHashPartitions(Lists.newArrayList("first"), 2));
+
+catalog.createTable(tableInfo, false);
+```
+The example uses lambda expressions to implement the functional interfaces.
+
+Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/docs/schema_design.html).
+
+### Supported data types
+| Flink/SQL     | Kudu           | 
+| ------------- |:-------------:| 
+|    STRING     | STRING        | 
+| BOOLEAN       |    BOOL       | 
+| TINYINT       |   INT8        | 
+| SMALLINT      |  INT16        | 
+| INT           |  INT32        | 
+| BIGINT        |   INT64     |
+| FLOAT         |  FLOAT      |
+| DOUBLE        |    DOUBLE    |
+| BYTES        |    BINARY    |
+| TIMESTAMP(3)     |    UNIXTIME_MICROS |
+
+Note:
+* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp` 
+* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a `VARBINARY(2147483647)`
+*  `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)`
+* `DECIMAL` types are not yet supported
+
+### Known limitations
+* Data type limitations (see above).
+* SQL Create table: primary keys can only be set by the `kudu.primary-key-columns` property, using the
+`PRIMARY KEY` constraint is not yet possible.
+* SQL Create table: range partitioning is not supported.
+* When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns
+are described as being nullable, and not being primary keys.
+* Kudu tables cannot be altered through the catalog other than simple renaming
+
+## DataStream API
+
+It is also possible to use the the Kudu connector directly from the DataStream API however we
+encourage all users to explore the Table API as it provides a lot of useful tooling when working
+with Kudu data.
+
+### Reading tables into a DataStreams
+
+There are 2 main ways of reading a Kudu Table into a DataStream
+ 1. Using the `KuduCatalog` and the Table API
+ 2. Using the `KuduRowInputFormat` directly
+
+Using the `KuduCatalog` and Table API is the recommended way of reading tables as it automatically
+guarantees type safety and takes care of configuration of our readers.
+
+This is how it works in practice:
+```java
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableSettings);
+
+tableEnv.registerCatalog("kudu", new KuduCatalog("master:port"));
+tableEnv.useCatalog("kudu");
+
+Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable");
+DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class);
+```
+
+The second way of achieving the same thing is by using the `KuduRowInputFormat` directly.
+In this case we have to manually provide all information about our table:
+
+```java
+KuduTableInfo tableInfo = ...
+KuduReaderConfig readerConfig = ...
+KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo);
+
+DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo);
+```
+
+At the end of the day the `KuduTableSource` is just a convenient wrapper around the `KuduRowInputFormat`.
+
+### Kudu Sink
+The connector provides a `KuduSink` class that can be used to consume DataStreams
+and write the results into a Kudu table.
+
+The constructor takes 3 or 4 arguments.
+ * `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
+ * `KuduTableInfo` identifies the table to be written
+ * `KuduOperationMapper` maps the records coming from the DataStream to a list of Kudu operations.
+ * `KuduFailureHandler` (optional): If you want to provide your own logic for handling writing failures.
+
+The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record.
+It is assumed that a Kudu table with columns `col1, col2, col3` called `AlreadyExistingTable` exists. Note that if this were not the case,
+we could pass a `KuduTableInfo` as described in the [Catalog - Creating a table](#creating-a-table) section,
+and the sink would create the table with the provided configuration.
+
+```java
+KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(KUDU_MASTERS).build();
+
+KuduSink<Row> sink = new KuduSink<>(
+    writerConfig,
+    KuduTableInfo.forTable("AlreadyExistingTable"),
+    new RowOperationMapper<>(
+            new String[]{"col1", "col2", "col3"},
+            AbstractSingleOperationMapper.KuduOperation.UPSERT)
+)
+```
+
+#### KuduOperationMapper
+
+This section describes the Operation mapping logic in more detail.
+
+The connector supports insert, upsert, update, and delete operations.
+The operation to be performed can vary dynamically based on the record.
+To allow for more flexibility, it is also possible for one record to trigger
+0, 1, or more operations.
+For the highest level of control, implement the `KuduOperationMapper` interface.
+
+If one record from the DataStream corresponds to one table operation,
+extend the `AbstractSingleOperationMapper` class. An array of column
+names must be provided. This must match the Kudu table's schema.
+
+The `getField` method must be overridden, which extracts the value for the table column whose name is
+at the `i`th place in the `columnNames` array.
+If the operation is one of (`CREATE, UPSERT, UPDATE, DELETE`)
+and doesn't depend on the input record (constant during the life of the sink), it can be set in the constructor
+of `AbstractSingleOperationMapper`.
+It is also possible to implement your own logic by overriding the
+`createBaseOperation` method that returns a Kudu [Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html).
+
+There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks.
+* `PojoOperationMapper`: Each table column must correspond to a POJO field
+with the same name. The  `columnNames` array should contain those fields of the POJO that
+are present as table columns (the POJO fields can be a superset of table columns).
+* `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on position. The
+`i`th field of the Row/Tuple corresponds to the column of the table at the `i`th
+position in the `columnNames` array.
+
+## Building the connector
+
+The connector can be easily built by using maven:
+
+```
+cd bahir-flink
+mvn clean install
+```
+
+### Running the tests
+
+The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost.
+
+This might not work out of the box on some operating systems (such as Mac OS X).
+To solve this problem go to *System Preferences/Sharing* and enable Remote login for your user.
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index bd42c55..fe7887c 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,8 +30,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <kudu.version>1.10.0</kudu.version>
-
+    <kudu.version>1.11.1</kudu.version>
     <mockito.version>1.10.19</mockito.version>
     <junit.groups>!DockerTest</junit.groups>
   </properties>
@@ -39,17 +38,32 @@
   <dependencies>
 
     <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
       <version>${kudu.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-planner-blink_2.11</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-common</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <!--test dependencies-->
     <dependency>
       <groupId>org.apache.kudu</groupId>
@@ -65,6 +79,7 @@
       <classifier>${os.detected.classifier}</classifier>
       <scope>test</scope>
     </dependency>
+
     <!-- this is added because test cluster use @Rule from junit4 -->
     <dependency>
       <groupId>org.junit.jupiter</groupId>
@@ -72,13 +87,28 @@
       <version>${junit.jupiter.version}</version>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
-  </dependencies>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+</dependencies>
 
   <build>
     <extensions>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
index 9d7d017..4bf81fe 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
@@ -16,18 +16,19 @@
  */
 package org.apache.flink.connectors.kudu.batch;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
 import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +36,11 @@
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Output format for writing data into a Kudu table (defined by the provided {@link KuduTableInfo}) in both batch
+ * and stream programs.
+ */
+@PublicEvolving
 public class KuduOutputFormat<IN> extends RichOutputFormat<IN> implements CheckpointedFunction {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
@@ -42,19 +48,19 @@
     private final KuduTableInfo tableInfo;
     private final KuduWriterConfig writerConfig;
     private final KuduFailureHandler failureHandler;
-    private final KuduSerialization<IN> serializer;
+    private final KuduOperationMapper<IN> opsMapper;
 
     private transient KuduWriter kuduWriter;
 
-    public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer) {
-        this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+    public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper) {
+        this(writerConfig, tableInfo, opsMapper, new DefaultKuduFailureHandler());
     }
 
-    public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer, KuduFailureHandler failureHandler) {
-        this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
-        this.writerConfig = checkNotNull(writerConfig,"config could not be null");
-        this.serializer = checkNotNull(serializer,"serializer could not be null");
-        this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+    public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper, KuduFailureHandler failureHandler) {
+        this.tableInfo = checkNotNull(tableInfo, "tableInfo could not be null");
+        this.writerConfig = checkNotNull(writerConfig, "config could not be null");
+        this.opsMapper = checkNotNull(opsMapper, "opsMapper could not be null");
+        this.failureHandler = checkNotNull(failureHandler, "failureHandler could not be null");
     }
 
     @Override
@@ -63,15 +69,12 @@
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
-        kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
-
-        serializer.withSchema(kuduWriter.getSchema());
+        kuduWriter = new KuduWriter(tableInfo, writerConfig, opsMapper, failureHandler);
     }
 
     @Override
     public void writeRecord(IN row) throws IOException {
-        final KuduRow kuduRow = serializer.serialize(row);
-        kuduWriter.write(kuduRow);
+        kuduWriter.write(row);
     }
 
     @Override
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
similarity index 70%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
index 98877d8..dc17ed4 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
@@ -16,19 +16,20 @@
  */
 package org.apache.flink.connectors.kudu.batch;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
-import org.apache.flink.connectors.kudu.connector.serde.KuduDeserialization;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+
 import org.apache.kudu.client.KuduException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,13 +40,21 @@
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-public class KuduInputFormat<OUT> extends RichInputFormat<OUT, KuduInputSplit> {
+/**
+ * Input format for reading the contents of a Kudu table (defined by the provided {@link KuduTableInfo}) in both batch
+ * and stream programs. Rows of the Kudu table are mapped to {@link Row} instances that can converted to other data
+ * types by the user later if necessary.
+ *
+ * <p> For programmatic access to the schema of the input rows users can use the {@link org.apache.flink.connectors.kudu.table.KuduCatalog}
+ * or overwrite the column order manually by providing a list of projected column names.
+ */
+@PublicEvolving
+public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final KuduReaderConfig readerConfig;
     private final KuduTableInfo tableInfo;
-    private final KuduDeserialization<OUT> deserializer;
 
     private List<KuduFilterInfo> tableFilters;
     private List<String> tableProjections;
@@ -55,16 +64,20 @@
     private transient KuduReader kuduReader;
     private transient KuduReaderIterator resultIterator;
 
-    public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer) {
-        this(readerConfig, tableInfo, deserializer, new ArrayList<>(), new ArrayList<>());
+    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo) {
+        this(readerConfig, tableInfo, new ArrayList<>(), null);
     }
-    public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
 
-        this.readerConfig = checkNotNull(readerConfig,"readerConfig could not be null");
-        this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
-        this.deserializer = checkNotNull(deserializer,"deserializer could not be null");
-        this.tableFilters = checkNotNull(tableFilters,"tableFilters could not be null");
-        this.tableProjections = checkNotNull(tableProjections,"tableProjections could not be null");
+    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<String> tableProjections) {
+        this(readerConfig, tableInfo, new ArrayList<>(), tableProjections);
+    }
+
+    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+
+        this.readerConfig = checkNotNull(readerConfig, "readerConfig could not be null");
+        this.tableInfo = checkNotNull(tableInfo, "tableInfo could not be null");
+        this.tableFilters = checkNotNull(tableFilters, "tableFilters could not be null");
+        this.tableProjections = tableProjections;
 
         this.endReached = false;
     }
@@ -124,11 +137,10 @@
     }
 
     @Override
-    public OUT nextRecord(OUT reuse) throws IOException {
+    public Row nextRecord(Row reuse) throws IOException {
         // check that current iterator has next rows
         if (this.resultIterator.hasNext()) {
-            KuduRow row = resultIterator.next();
-            return deserializer.deserialize(row);
+            return resultIterator.next();
         } else {
             endReached = true;
             return null;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
new file mode 100644
index 0000000..b178308
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.ColumnSchema;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Factory for creating {@link ColumnSchema}s to be used when creating a table that
+ * does not currently exist in Kudu. Usable through {@link KuduTableInfo#createTableIfNotExists}.
+ *
+ * <p> This factory implementation must be Serializable as it will be used directly in the Flink sources
+ * and sinks.
+ */
+@PublicEvolving
+public interface ColumnSchemasFactory extends Serializable {
+
+    /**
+     * Creates the columns of the Kudu table that will be used during the createTable operation.
+     *
+     * @return List of columns.
+     */
+    List<ColumnSchema> getColumnSchemas();
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
new file mode 100644
index 0000000..4a475e9
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.client.CreateTableOptions;
+
+import java.io.Serializable;
+
+/**
+ * Factory for creating {@link CreateTableOptions} to be used when creating a table that
+ * does not currently exist in Kudu. Usable through {@link KuduTableInfo#createTableIfNotExists}.
+ *
+ * <p> This factory implementation must be Serializable as it will be used directly in the Flink sources
+ * and sinks.
+ */
+@PublicEvolving
+public interface CreateTableOptionsFactory extends Serializable {
+
+    /**
+     * Creates the {@link CreateTableOptions} that will be used during the createTable operation.
+     *
+     * @return CreateTableOptions for creating the table.
+     */
+    CreateTableOptions getCreateTableOptions();
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
deleted file mode 100644
index ff8a601..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Type;
-
-import java.io.Serializable;
-
-@PublicEvolving
-public class KuduColumnInfo implements Serializable {
-
-    private String name;
-    private Type type;
-    private boolean key;
-    private boolean rangeKey;
-    private boolean hashKey;
-    private boolean nullable;
-    private Object defaultValue;
-    private int blockSize;
-    private Encoding encoding;
-    private Compression compression;
-
-    private KuduColumnInfo(String name, Type type) {
-        this.name = name;
-        this.type = type;
-        this.blockSize = 0;
-        this.key = false;
-        this.rangeKey = false;
-        this.hashKey = false;
-        this.nullable = false;
-        this.defaultValue = null;
-        this.encoding = Encoding.AUTO;
-        this.compression = Compression.DEFAULT;
-    }
-
-    protected String name() {
-        return name;
-    }
-
-    protected boolean isRangeKey() {
-        return rangeKey;
-    }
-
-    protected boolean isHashKey() {
-        return hashKey;
-    }
-
-    protected ColumnSchema columnSchema() {
-        return new ColumnSchema.ColumnSchemaBuilder(name, type)
-                    .key(key)
-                    .nullable(nullable)
-                    .defaultValue(defaultValue)
-                    .desiredBlockSize(blockSize)
-                    .encoding(encoding.encode)
-                    .compressionAlgorithm(compression.algorithm)
-                    .build();
-    }
-
-    public static class Builder {
-        private KuduColumnInfo column;
-
-        private Builder(String name, Type type) {
-            this.column = new KuduColumnInfo(name, type);
-        }
-
-        public static Builder create(String name, Type type) {
-            return new Builder(name, type);
-        }
-        public static Builder createByte(String name) {
-            return create(name, Type.INT8);
-        }
-        public static Builder createShort(String name) {
-            return create(name, Type.INT16);
-        }
-        public static Builder createInteger(String name) {
-            return create(name, Type.INT32);
-        }
-        public static Builder createLong(String name) {
-            return create(name, Type.INT64);
-        }
-        public static Builder createDouble(String name) {
-            return create(name, Type.DOUBLE);
-        }
-        public static Builder createFloat(String name) {
-            return create(name, Type.FLOAT);
-        }
-        public static Builder createString(String name) {
-            return create(name, Type.STRING);
-        }
-        public static Builder createBool(String name) {
-            return create(name, Type.BOOL);
-        }
-        public static Builder createByteArray(String name) {
-            return create(name, Type.BINARY);
-        }
-        public static Builder createUnixTime(String name) {
-            return create(name, Type.UNIXTIME_MICROS);
-        }
-
-        public Builder asKey() {
-            return key(true);
-        }
-
-        public Builder key(boolean key) {
-            this.column.key = key;
-            return this;
-        }
-
-        public Builder asRangeKey() {
-            return rangeKey(true);
-        }
-
-        public Builder rangeKey(boolean rangeKey) {
-            this.column.rangeKey = rangeKey;
-            return this;
-        }
-
-        public Builder asHashKey() {
-            return hashKey(true);
-        }
-
-        public Builder hashKey(boolean hashKey) {
-            this.column.hashKey = hashKey;
-            return this;
-        }
-
-        public Builder asNullable() {
-            return nullable(true);
-        }
-
-        public Builder asNotNullable() {
-            return nullable(false);
-        }
-
-        public Builder nullable(boolean nullable) {
-            this.column.nullable = nullable;
-            return this;
-        }
-
-        public Builder defaultValue(Object defaultValue) {
-            this.column.defaultValue = defaultValue;
-            return this;
-        }
-
-        public Builder desiredBlockSize(int blockSize) {
-            this.column.blockSize = blockSize;
-            return this;
-        }
-
-        public Builder encoding(Encoding encoding) {
-            this.column.encoding = encoding;
-            return this;
-        }
-
-        public Builder compressionAlgorithm(Compression compression) {
-            this.column.compression = compression;
-            return this;
-        }
-
-        public KuduColumnInfo build() {
-            return column;
-        }
-    }
-
-    public enum Compression {
-        UNKNOWN(ColumnSchema.CompressionAlgorithm.UNKNOWN),
-        DEFAULT(ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION),
-        WITHOUT(ColumnSchema.CompressionAlgorithm.NO_COMPRESSION),
-        SNAPPY(ColumnSchema.CompressionAlgorithm.SNAPPY),
-        LZ4(ColumnSchema.CompressionAlgorithm.LZ4),
-        ZLIB(ColumnSchema.CompressionAlgorithm.ZLIB);
-
-        final ColumnSchema.CompressionAlgorithm algorithm;
-
-        Compression(ColumnSchema.CompressionAlgorithm algorithm) {
-            this.algorithm = algorithm;
-        }
-    }
-
-    public enum Encoding {
-        UNKNOWN(ColumnSchema.Encoding.UNKNOWN),
-        AUTO(ColumnSchema.Encoding.AUTO_ENCODING),
-        PLAIN(ColumnSchema.Encoding.PLAIN_ENCODING),
-        PREFIX(ColumnSchema.Encoding.PREFIX_ENCODING),
-        GROUP_VARINT(ColumnSchema.Encoding.GROUP_VARINT),
-        RLE(ColumnSchema.Encoding.RLE),
-        DICT(ColumnSchema.Encoding.DICT_ENCODING),
-        BIT_SHUFFLE(ColumnSchema.Encoding.BIT_SHUFFLE);
-
-        final ColumnSchema.Encoding encode;
-
-        Encoding(ColumnSchema.Encoding encode) {
-            this.encode = encode;
-        }
-    }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index c7ae4a4..0a89cad 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -17,6 +17,7 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
+
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduPredicate;
@@ -36,6 +37,7 @@
     public KuduPredicate toPredicate(Schema schema) {
         return toPredicate(schema.getColumn(this.column));
     }
+
     public KuduPredicate toPredicate(ColumnSchema column) {
         KuduPredicate predicate;
         switch (this.type) {
@@ -63,35 +65,35 @@
 
         switch (column.getType()) {
             case STRING:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String) this.value);
                 break;
             case FLOAT:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Float)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
                 break;
             case INT8:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Byte)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
                 break;
             case INT16:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Short)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
                 break;
             case INT32:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Integer)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
                 break;
             case INT64:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Long)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
                 break;
             case DOUBLE:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Double)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
                 break;
             case BOOL:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Boolean)this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
                 break;
             case UNIXTIME_MICROS:
-                Long time = (Long)this.value;
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, time*1000);
+                Long time = (Long) this.value;
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, time * 1000);
                 break;
             case BINARY:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[])this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[]) this.value);
                 break;
             default:
                 throw new IllegalArgumentException("Illegal var type: " + column.getType());
@@ -99,6 +101,24 @@
         return predicate;
     }
 
+    public enum FilterType {
+        GREATER(KuduPredicate.ComparisonOp.GREATER),
+        GREATER_EQUAL(KuduPredicate.ComparisonOp.GREATER_EQUAL),
+        EQUAL(KuduPredicate.ComparisonOp.EQUAL),
+        LESS(KuduPredicate.ComparisonOp.LESS),
+        LESS_EQUAL(KuduPredicate.ComparisonOp.LESS_EQUAL),
+        IS_NOT_NULL(null),
+        IS_NULL(null),
+        IS_IN(null);
+
+        final KuduPredicate.ComparisonOp comparator;
+
+        FilterType(KuduPredicate.ComparisonOp comparator) {
+            this.comparator = comparator;
+        }
+
+    }
+
     public static class Builder {
         private KuduFilterInfo filter;
 
@@ -154,22 +174,4 @@
         }
     }
 
-    public enum FilterType {
-        GREATER(KuduPredicate.ComparisonOp.GREATER),
-        GREATER_EQUAL(KuduPredicate.ComparisonOp.GREATER_EQUAL),
-        EQUAL(KuduPredicate.ComparisonOp.EQUAL),
-        LESS(KuduPredicate.ComparisonOp.LESS),
-        LESS_EQUAL(KuduPredicate.ComparisonOp.LESS_EQUAL),
-        IS_NOT_NULL(null),
-        IS_NULL(null),
-        IS_IN(null);
-
-        final KuduPredicate.ComparisonOp comparator;
-
-        FilterType(KuduPredicate.ComparisonOp comparator) {
-            this.comparator = comparator;
-        }
-
-    }
-
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
deleted file mode 100644
index 78e6e6e..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.types.Row;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.stream.Stream;
-
-@PublicEvolving
-public class KuduRow extends Row {
-
-    private Map<String, Integer> rowNames;
-
-    public KuduRow(Integer arity) {
-        super(arity);
-        rowNames = new LinkedHashMap<>();
-    }
-
-    public Object getField(String name) {
-        return super.getField(rowNames.get(name));
-    }
-
-    public boolean hasField(String name) {
-        return rowNames.get(name) != null;
-    }
-
-    public void setField(int pos, String name, Object value) {
-        super.setField(pos, value);
-        this.rowNames.put(name, pos);
-    }
-
-    public boolean isNull(String name) {
-        return isNull(rowNames.get(name));
-    }
-
-    public boolean isNull(int pos) {
-        return getField(pos) == null;
-    }
-
-    private static int validFields(Object object) {
-        Long validField = 0L;
-        for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
-            validField += basicValidation(c.getDeclaredFields()).count();
-        }
-        return validField.intValue();
-    }
-
-    private static Stream<Field> basicValidation(Field[] fields) {
-        return Arrays.stream(fields)
-                .filter(cField -> !Modifier.isStatic(cField.getModifiers()))
-                .filter(cField -> !Modifier.isTransient(cField.getModifiers()));
-    }
-
-    public Map<String,Object> blindMap() {
-        Map<String,Object> toRet = new LinkedHashMap<>();
-        rowNames.entrySet().stream()
-                .sorted(Comparator.comparing(Map.Entry::getValue))
-                .forEach(entry -> toRet.put(entry.getKey(), super.getField(entry.getValue())));
-        return  toRet;
-    }
-
-    @Override
-    public String toString() {
-        return blindMap().toString();
-    }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index eb63b3f..83c7dde 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -17,119 +17,90 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.kudu.ColumnSchema;
+
+import org.apache.commons.lang3.Validate;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 
+/**
+ * Describes which table should be used in sources and sinks along with specifications
+ * on how to create it if it does not exist.
+ *
+ * <p> For sources and sinks reading from already existing tables, simply use @{@link KuduTableInfo#forTable(String)}
+ * and if you want the system to create the table if it does not exist you need to specify the column and options
+ * factories through {@link KuduTableInfo#createTableIfNotExists}
+ */
 @PublicEvolving
 public class KuduTableInfo implements Serializable {
 
-    private static final Integer DEFAULT_REPLICAS = 1;
-    private static final boolean DEFAULT_CREATE_IF_NOT_EXIST = false;
-
-    private Integer replicas;
     private String name;
-    private boolean createIfNotExist;
-    private List<KuduColumnInfo> columns;
+    private CreateTableOptionsFactory createTableOptionsFactory = null;
+    private ColumnSchemasFactory schemasFactory = null;
 
-    private KuduTableInfo(String name){
-        this.name = name;
-        this.replicas = DEFAULT_REPLICAS;
-        this.createIfNotExist = DEFAULT_CREATE_IF_NOT_EXIST;
-        this.columns = new ArrayList<>();
+    private KuduTableInfo(String name) {
+        this.name = Validate.notNull(name);
     }
 
+    /**
+     * Creates a new {@link KuduTableInfo} that is sufficient for reading/writing to existing Kudu Tables.
+     * For creating new tables call {@link #createTableIfNotExists} afterwards.
+     *
+     * @param name Table name in Kudu
+     * @return KuduTableInfo for the given table name
+     */
+    public static KuduTableInfo forTable(String name) {
+        return new KuduTableInfo(name);
+    }
+
+    /**
+     * Defines table parameters to be used when creating the Kudu table if it does not exist (read or write)
+     *
+     * @param schemasFactory            factory for defining columns
+     * @param createTableOptionsFactory factory for defining create table options
+     * @return KuduTableInfo that will create tables that does not exist with the given settings.
+     */
+    public KuduTableInfo createTableIfNotExists(ColumnSchemasFactory schemasFactory, CreateTableOptionsFactory createTableOptionsFactory) {
+        this.createTableOptionsFactory = Validate.notNull(createTableOptionsFactory);
+        this.schemasFactory = Validate.notNull(schemasFactory);
+        return this;
+    }
+
+    /**
+     * Returns the {@link Schema} of the table. Only works if {@link #createTableIfNotExists} was specified otherwise throws an error.
+     *
+     * @return Schema of the target table.
+     */
+    public Schema getSchema() {
+        if (!getCreateTableIfNotExists()) {
+            throw new RuntimeException("Cannot access schema for KuduTableInfo. Use createTableIfNotExists to specify the columns.");
+        }
+
+        return new Schema(schemasFactory.getColumnSchemas());
+    }
+
+    /**
+     * @return Name of the table.
+     */
     public String getName() {
         return name;
     }
 
-    public Schema getSchema() {
-        if(hasNotColumns()) return null;
-        List<ColumnSchema> schemaColumns = new ArrayList<>();
-        for(KuduColumnInfo column : columns){
-            schemaColumns.add(column.columnSchema());
-        }
-        return new Schema(schemaColumns);
+    /**
+     * @return True if table creation is enabled if target table does not exist.
+     */
+    public boolean getCreateTableIfNotExists() {
+        return createTableOptionsFactory != null;
     }
 
-    public boolean createIfNotExist() {
-        return createIfNotExist;
-    }
-
+    /**
+     * @return CreateTableOptions if {@link #createTableIfNotExists} was specified.
+     */
     public CreateTableOptions getCreateTableOptions() {
-        CreateTableOptions options = new CreateTableOptions();
-        if(replicas!=null){
-            options.setNumReplicas(replicas);
+        if (!getCreateTableIfNotExists()) {
+            throw new RuntimeException("Cannot access CreateTableOptions for KuduTableInfo. Use createTableIfNotExists to specify.");
         }
-        if(hasColummns()) {
-            List<String> rangeKeys = new ArrayList<>();
-            List<String> hashKeys = new ArrayList<>();
-            for(KuduColumnInfo column : columns){
-                if(column.isRangeKey()){
-                    rangeKeys.add(column.name());
-                }
-                if(column.isHashKey()){
-                    hashKeys.add(column.name());
-                }
-            }
-            options.setRangePartitionColumns(rangeKeys);
-            options.addHashPartitions(hashKeys, replicas*2);
-        }
-
-        return options;
-    }
-
-    public boolean hasNotColumns(){
-        return !hasColummns();
-    }
-    public boolean hasColummns(){
-        return (columns!=null && columns.size()>0);
-    }
-
-    public static class Builder {
-        KuduTableInfo table;
-
-        private Builder(String name) {
-            table = new KuduTableInfo(name);
-        }
-
-        public static Builder create(String name) {
-            return new Builder(name);
-        }
-
-        public static Builder open(String name) {
-            return new Builder(name);
-        }
-
-        public Builder createIfNotExist(boolean createIfNotExist) {
-            this.table.createIfNotExist = createIfNotExist;
-            return this;
-        }
-
-        public Builder replicas(int replicas) {
-            if (replicas == 0) return this;
-            this.table.replicas = replicas;
-            return this;
-        }
-
-        public Builder columns(List<KuduColumnInfo> columns) {
-            if(columns==null) return this;
-            this.table.columns.addAll(columns);
-            return this;
-        }
-
-        public Builder addColumn(KuduColumnInfo column) {
-            if(column==null) return this;
-            this.table.columns.add(column);
-            return this;
-        }
-
-        public KuduTableInfo build() {
-            return table;
-        }
+        return createTableOptionsFactory.getCreateTableOptions();
     }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
index 7548033..5f45137 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
@@ -22,12 +22,18 @@
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * Default failure handling logic that doesn't do any handling but throws
+ * an error.
+ */
 public class DefaultKuduFailureHandler implements KuduFailureHandler {
+
     @Override
     public void onFailure(List<RowError> failure) throws IOException {
         String errors = failure.stream()
                 .map(error -> error.toString() + System.lineSeparator())
                 .collect(Collectors.joining());
+
         throw new IOException("Error while sending value. \n " + errors);
     }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
index 42de4f7..3c8954f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
@@ -17,12 +17,17 @@
 package org.apache.flink.connectors.kudu.connector.failure;
 
 import org.apache.flink.annotation.PublicEvolving;
+
 import org.apache.kudu.client.RowError;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
+/**
+ * Custom handling logic for errors resulting from trying to execute Kudu operations in the
+ * {@link org.apache.flink.connectors.kudu.connector.writer.KuduWriter}
+ */
 @PublicEvolving
 public interface KuduFailureHandler extends Serializable {
 
@@ -34,4 +39,13 @@
      */
     void onFailure(List<RowError> failure) throws IOException;
 
+    /**
+     * Handle a ClassCastException. Default implementation rethrows the exception.
+     *
+     * @param e the cause of failure
+     * @throws IOException if the casting failed
+     */
+    default void onTypeMismatch(ClassCastException e) throws IOException {
+        throw new IOException("Class casting failed \n", e);
+    }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
index a809106..5d85d6c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
@@ -16,16 +16,19 @@
  */
 package org.apache.flink.connectors.kudu.connector.reader;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.LocatableInputSplit;
 
+@Internal
 public class KuduInputSplit extends LocatableInputSplit {
 
     private byte[] scanToken;
 
     /**
      * Creates a new KuduInputSplit
+     *
      * @param splitNumber the number of the input split
-     * @param hostnames The names of the hosts storing the data this input split refers to.
+     * @param hostnames   The names of the hosts storing the data this input split refers to.
      */
     public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
         super(splitNumber, hostnames);
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 9c6e790..51ab748 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -16,11 +16,16 @@
  */
 package org.apache.flink.connectors.kudu.connector.reader;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.kudu.client.*;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,13 +47,15 @@
     private transient KuduSession session;
     private transient KuduTable table;
 
-    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig)  throws IOException {
-        this(tableInfo, readerConfig, new ArrayList<>(), new ArrayList<>());
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig) throws IOException {
+        this(tableInfo, readerConfig, new ArrayList<>(), null);
     }
-    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters)  throws IOException {
-        this(tableInfo, readerConfig, tableFilters, new ArrayList<>());
+
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters) throws IOException {
+        this(tableInfo, readerConfig, tableFilters, null);
     }
-    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections)  throws IOException {
+
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections) throws IOException {
         this.tableInfo = tableInfo;
         this.readerConfig = readerConfig;
         this.tableFilters = tableFilters;
@@ -72,7 +79,7 @@
         if (client.tableExists(tableName)) {
             return client.openTable(tableName);
         }
-        if (tableInfo.createIfNotExist()) {
+        if (tableInfo.getCreateTableIfNotExists()) {
             return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
         }
         throw new UnsupportedOperationException("table not exists and is marketed to not be created");
@@ -85,7 +92,7 @@
     public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) {
         KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
 
-        if (CollectionUtils.isNotEmpty(tableProjections)) {
+        if (tableProjections != null) {
             tokenBuilder.setProjectedColumnNames(tableProjections);
         }
 
@@ -95,7 +102,7 @@
                     .forEach(tokenBuilder::addPredicate);
         }
 
-        if (rowLimit !=null && rowLimit > 0) {
+        if (rowLimit != null && rowLimit > 0) {
             tokenBuilder.limit(rowLimit);
         }
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
index 6f5f079..96fde1b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -16,32 +16,36 @@
  */
 package org.apache.flink.connectors.kudu.connector.reader;
 
-import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.flink.annotation.PublicEvolving;
 
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Configuration used by {@link org.apache.flink.connectors.kudu.batch.KuduRowInputFormat}. Specifies connection and other necessary properties.
+ */
 @PublicEvolving
 public class KuduReaderConfig implements Serializable {
 
     private final String masters;
-    private final Integer rowLimit;
+    private final int rowLimit;
 
     private KuduReaderConfig(
             String masters,
-            Integer rowLimit) {
+            int rowLimit) {
 
         this.masters = checkNotNull(masters, "Kudu masters cannot be null");
-        this.rowLimit = checkNotNull(rowLimit, "Kudu rowLimit cannot be null");;
+        this.rowLimit = checkNotNull(rowLimit, "Kudu rowLimit cannot be null");
     }
 
     public String getMasters() {
         return masters;
     }
 
-    public Integer getRowLimit() {
+    public int getRowLimit() {
         return rowLimit;
     }
 
@@ -57,20 +61,26 @@
      * Builder for the {@link KuduReaderConfig}.
      */
     public static class Builder {
-        private String masters;
-        private Integer rowLimit = 0;
+        private static final int DEFAULT_ROW_LIMIT = 0;
+
+        private final String masters;
+        private final int rowLimit;
 
         private Builder(String masters) {
+            this(masters, DEFAULT_ROW_LIMIT);
+        }
+
+        private Builder(String masters, Integer rowLimit) {
             this.masters = masters;
+            this.rowLimit = rowLimit;
         }
 
         public static Builder setMasters(String masters) {
             return new Builder(masters);
         }
 
-        public Builder setRowLimit(Integer rowLimit) {
-            this.rowLimit = rowLimit;
-            return this;
+        public Builder setRowLimit(int rowLimit) {
+            return new Builder(masters, rowLimit);
         }
 
         public KuduReaderConfig build() {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
index 4a8e69c..1ea4690 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
@@ -17,9 +17,9 @@
 package org.apache.flink.connectors.kudu.connector.reader;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.types.Row;
+
 import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.RowResult;
@@ -51,61 +51,23 @@
         }
     }
 
-    public KuduRow next() {
+    public Row next() {
         RowResult row = this.rowIterator.next();
-        return toKuduRow(row);
+        return toFlinkRow(row);
     }
 
     private void nextRows() throws KuduException {
         this.rowIterator = scanner.nextRows();
     }
 
-    private KuduRow toKuduRow(RowResult row) {
+    private Row toFlinkRow(RowResult row) {
         Schema schema = row.getColumnProjection();
 
-        KuduRow values = new KuduRow(schema.getColumnCount());
+        Row values = new Row(schema.getColumnCount());
         schema.getColumns().forEach(column -> {
             String name = column.getName();
             int pos = schema.getColumnIndex(name);
-            if(row.isNull(name)) {
-                values.setField(pos, name, null);
-            } else {
-                Type type = column.getType();
-                switch (type) {
-                    case BINARY:
-                        values.setField(pos, name, row.getBinary(name));
-                        break;
-                    case STRING:
-                        values.setField(pos, name, row.getString(name));
-                        break;
-                    case BOOL:
-                        values.setField(pos, name, row.getBoolean(name));
-                        break;
-                    case DOUBLE:
-                        values.setField(pos, name, row.getDouble(name));
-                        break;
-                    case FLOAT:
-                        values.setField(pos, name, row.getFloat(name));
-                        break;
-                    case INT8:
-                        values.setField(pos, name, row.getByte(name));
-                        break;
-                    case INT16:
-                        values.setField(pos, name, row.getShort(name));
-                        break;
-                    case INT32:
-                        values.setField(pos, name, row.getInt(name));
-                        break;
-                    case INT64:
-                        values.setField(pos, name, row.getLong(name));
-                        break;
-                    case UNIXTIME_MICROS:
-                        values.setField(pos, name, row.getLong(name) / 1000);
-                        break;
-                    default:
-                        throw new IllegalArgumentException("Illegal var type: " + type);
-                }
-            }
+            values.setField(pos, row.getObject(name));
         });
         return values;
     }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
deleted file mode 100644
index 36584b5..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.kudu.Schema;
-
-public class DefaultSerDe implements KuduSerialization<KuduRow>, KuduDeserialization<KuduRow> {
-
-    @Override
-    public KuduRow deserialize(KuduRow row) {
-        return row;
-    }
-
-    @Override
-    public KuduRow serialize(KuduRow value) {
-        return value;
-    }
-
-    @Override
-    public DefaultSerDe withSchema(Schema schema) {
-        return this;
-    }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
deleted file mode 100644
index 190c4c7..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-
-import java.io.Serializable;
-
-public interface KuduDeserialization<T> extends Serializable {
-    T deserialize(KuduRow row);
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
deleted file mode 100644
index b13c59b..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.kudu.Schema;
-
-import java.io.Serializable;
-
-public interface KuduSerialization<T> extends Serializable {
-    KuduRow serialize(T value);
-
-    KuduSerialization<T> withSchema(Schema schema);
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
deleted file mode 100644
index bc57174..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.kudu.Schema;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.stream.Stream;
-
-public class PojoSerDe<P> implements KuduSerialization<P>, KuduDeserialization<P> {
-
-
-    private Class<P> clazz;
-
-    public transient KuduTableInfo tableInfo;
-    public transient Schema schema;
-
-
-    public PojoSerDe(Class<P> clazz) {
-        this.clazz = clazz;
-    }
-
-    @Override
-    public PojoSerDe<P> withSchema(Schema schema) {
-        this.schema = schema;
-        return this;
-    }
-
-    @Override
-    public KuduRow serialize(P object) {
-        return mapTo(object);
-    }
-
-    private KuduRow mapTo(P object) {
-        if (schema == null) throw new IllegalArgumentException("schema must be set to serialize");
-
-        KuduRow row = new KuduRow(schema.getRowSize());
-
-        for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
-            basicValidation(c.getDeclaredFields())
-                    .forEach(cField -> {
-                        try {
-                            cField.setAccessible(true);
-                            row.setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object));
-                        } catch (IllegalAccessException e) {
-                            String error = String.format("Cannot get value for %s", cField.getName());
-                            throw new IllegalArgumentException(error, e);
-                        }
-                    });
-        }
-
-        return row;
-    }
-
-    private Stream<Field> basicValidation(Field[] fields) {
-        return Arrays.stream(fields)
-                .filter(field -> schemaHasColumn(field.getName()))
-                .filter(field -> !Modifier.isStatic(field.getModifiers()))
-                .filter(field -> !Modifier.isTransient(field.getModifiers()));
-    }
-
-    private boolean schemaHasColumn(String field) {
-        return schema.getColumns().stream().anyMatch(col -> StringUtils.equalsIgnoreCase(col.getName(),field));
-    }
-
-    @Override
-    public P deserialize(KuduRow row) {
-        return mapFrom(row);
-    }
-
-    private P mapFrom(KuduRow row) {
-        P o = createInstance(clazz);
-
-        for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
-            Field[] fields = c.getDeclaredFields();
-
-            basicValidation(fields)
-                    .forEach(cField -> {
-                        try {
-                            cField.setAccessible(true);
-                            Object value = row.getField(cField.getName());
-                            if (value != null) {
-                                if (cField.getType() == value.getClass()) {
-                                    cField.set(o, value);
-                                } else if (cField.getType() == Long.class && value.getClass() == Date.class) {
-                                    cField.set(o, ((Date) value).getTime());
-                                } else {
-                                    cField.set(o, value);
-                                }
-                            }
-                        } catch (IllegalAccessException e) {
-                            String error = String.format("Cannot get value for %s", cField.getName());
-                            throw new IllegalArgumentException(error, e);
-                        }
-                    });
-        }
-
-        return o;
-
-    }
-
-    private P createInstance(Class<P> clazz) {
-        try {
-            Constructor<P> constructor = clazz.getDeclaredConstructor();
-            constructor.setAccessible(true);
-            return constructor.newInstance();
-        } catch (ReflectiveOperationException e) {
-            String error = String.format("Cannot create instance for %s", clazz.getSimpleName());
-            throw new IllegalArgumentException(error, e);
-        }
-    }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
new file mode 100644
index 0000000..d9f8219
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Base implementation for {@link KuduOperationMapper}s that have one-to-one input to
+ * Kudu operation mapping. It requires a fixed table schema to be provided at construction
+ * time and only requires users to implement a getter for a specific column index (relative
+ * to the ones provided in the constructor).
+ * <br>
+ * Supports both fixed operation type per record by specifying the {@link KuduOperation} or a
+ * custom implementation for creating the base {@link Operation} throwugh the
+ * {@link #createBaseOperation(Object, KuduTable)} method.
+ *
+ * @param <T> Input type
+ */
+@PublicEvolving
+public abstract class AbstractSingleOperationMapper<T> implements KuduOperationMapper<T> {
+
+    protected final String[] columnNames;
+    private final KuduOperation operation;
+
+    protected AbstractSingleOperationMapper(String[] columnNames) {
+        this(columnNames, null);
+    }
+
+    public AbstractSingleOperationMapper(String[] columnNames, KuduOperation operation) {
+        this.columnNames = columnNames;
+        this.operation = operation;
+    }
+
+    /**
+     * Returns the object corresponding to the given column index.
+     *
+     * @param input Input element
+     * @param i     Column index
+     * @return Column value
+     */
+    public abstract Object getField(T input, int i);
+
+    public Optional<Operation> createBaseOperation(T input, KuduTable table) {
+        if (operation == null) {
+            throw new UnsupportedOperationException("createBaseOperation must be overridden if no operation specified in constructor");
+        }
+        switch (operation) {
+            case INSERT:
+                return Optional.of(table.newInsert());
+            case UPDATE:
+                return Optional.of(table.newUpdate());
+            case UPSERT:
+                return Optional.of(table.newUpsert());
+            case DELETE:
+                return Optional.of(table.newDelete());
+            default:
+                throw new RuntimeException("Unknown operation " + operation);
+        }
+    }
+
+    @Override
+    public List<Operation> createOperations(T input, KuduTable table) {
+        Optional<Operation> operationOpt = createBaseOperation(input, table);
+        if (!operationOpt.isPresent()) {
+            return Collections.emptyList();
+        }
+
+        Operation operation = operationOpt.get();
+        PartialRow partialRow = operation.getRow();
+
+        for (int i = 0; i < columnNames.length; i++) {
+            partialRow.addObject(columnNames[i], getField(input, i));
+        }
+
+        return Collections.singletonList(operation);
+    }
+
+    public enum KuduOperation {
+        INSERT,
+        UPDATE,
+        UPSERT,
+        DELETE
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
new file mode 100644
index 0000000..4878ab3
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates the logic of mapping input records (of a DataStream) to operations
+ * executed in Kudu. By allowing to return a list of operations we give flexibility
+ * to the implementers to provide more sophisticated logic.
+ *
+ * @param <T> Type of the input data
+ */
+@PublicEvolving
+public interface KuduOperationMapper<T> extends Serializable {
+
+    /**
+     * Create a list of operations to be executed by the {@link KuduWriter} for the
+     * current input
+     *
+     * @param input input element
+     * @param table table for which the operations should be created
+     * @return List of operations to be executed on the table
+     */
+    List<Operation> createOperations(T input, KuduTable table);
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 57c0741..7233478 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -18,13 +18,17 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
 import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.*;
+
+import org.apache.kudu.client.DeleteTableResponse;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.RowError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,23 +37,24 @@
 import java.util.List;
 
 @Internal
-public class KuduWriter implements AutoCloseable {
+public class KuduWriter<T> implements AutoCloseable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final KuduTableInfo tableInfo;
     private final KuduWriterConfig writerConfig;
     private final KuduFailureHandler failureHandler;
+    private final KuduOperationMapper<T> operationMapper;
 
     private transient KuduClient client;
     private transient KuduSession session;
     private transient KuduTable table;
 
-
-    public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig) throws IOException {
-        this (tableInfo, writerConfig, new DefaultKuduFailureHandler());
+    public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduOperationMapper<T> operationMapper) throws IOException {
+        this(tableInfo, writerConfig, operationMapper, new DefaultKuduFailureHandler());
     }
-    public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduFailureHandler failureHandler) throws IOException {
+
+    public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduOperationMapper<T> operationMapper, KuduFailureHandler failureHandler) throws IOException {
         this.tableInfo = tableInfo;
         this.writerConfig = writerConfig;
         this.failureHandler = failureHandler;
@@ -57,6 +62,7 @@
         this.client = obtainClient();
         this.session = obtainSession();
         this.table = obtainTable();
+        this.operationMapper = operationMapper;
     }
 
     private KuduClient obtainClient() {
@@ -74,23 +80,18 @@
         if (client.tableExists(tableName)) {
             return client.openTable(tableName);
         }
-        if (tableInfo.createIfNotExist()) {
+        if (tableInfo.getCreateTableIfNotExists()) {
             return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
         }
         throw new UnsupportedOperationException("table not exists and is marketed to not be created");
     }
 
-    public Schema getSchema() {
-        return table.getSchema();
-    }
-
-    public void write(KuduRow row) throws IOException {
+    public void write(T input) throws IOException {
         checkAsyncErrors();
 
-        final Operation operation = mapToOperation(row);
-        final OperationResponse response = session.apply(operation);
-
-        checkErrors(response);
+        for (Operation operation : operationMapper.createOperations(input, table)) {
+            checkErrors(session.apply(operation));
+        }
     }
 
     public void flushAndCheckErrors() throws IOException {
@@ -140,73 +141,9 @@
     }
 
     private void checkAsyncErrors() throws IOException {
-        if (session.countPendingErrors() == 0) return;
+        if (session.countPendingErrors() == 0) { return; }
 
         List<RowError> errors = Arrays.asList(session.getPendingErrors().getRowErrors());
         failureHandler.onFailure(errors);
     }
-
-    private Operation mapToOperation(KuduRow row) {
-        final Operation operation = obtainOperation();
-        final PartialRow partialRow = operation.getRow();
-
-        table.getSchema().getColumns().forEach(column -> {
-            String columnName = column.getName();
-            if (!row.hasField(columnName)) {
-                return;
-            }
-            Object value = row.getField(columnName);
-
-            if (value == null) {
-                partialRow.setNull(columnName);
-            } else {
-                Type type = column.getType();
-                switch (type) {
-                    case STRING:
-                        partialRow.addString(columnName, (String) value);
-                        break;
-                    case FLOAT:
-                        partialRow.addFloat(columnName, (Float) value);
-                        break;
-                    case INT8:
-                        partialRow.addByte(columnName, (Byte) value);
-                        break;
-                    case INT16:
-                        partialRow.addShort(columnName, (Short) value);
-                        break;
-                    case INT32:
-                        partialRow.addInt(columnName, (Integer) value);
-                        break;
-                    case INT64:
-                        partialRow.addLong(columnName, (Long) value);
-                        break;
-                    case DOUBLE:
-                        partialRow.addDouble(columnName, (Double) value);
-                        break;
-                    case BOOL:
-                        partialRow.addBoolean(columnName, (Boolean) value);
-                        break;
-                    case UNIXTIME_MICROS:
-                        //*1000 to correctly create date on kudu
-                        partialRow.addLong(columnName, ((Long) value) * 1000);
-                        break;
-                    case BINARY:
-                        partialRow.addBinary(columnName, (byte[]) value);
-                        break;
-                    default:
-                        throw new IllegalArgumentException("Illegal var type: " + type);
-                }
-            }
-        });
-        return operation;
-    }
-
-    private Operation obtainOperation() {
-        switch (writerConfig.getWriteMode()) {
-            case INSERT: return table.newInsert();
-            case UPDATE: return table.newUpdate();
-            case UPSERT: return table.newUpsert();
-        }
-        return table.newUpsert();
-    }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 13672d5..598f8d0 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -16,39 +16,37 @@
  */
 package org.apache.flink.connectors.kudu.connector.writer;
 
-import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.flink.annotation.PublicEvolving;
 
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.kudu.client.SessionConfiguration.FlushMode;
 
+/**
+ * Configuration used by {@link org.apache.flink.connectors.kudu.streaming.KuduSink} and {@link org.apache.flink.connectors.kudu.batch.KuduOutputFormat}.
+ * Specifies connection and other necessary properties.
+ */
 @PublicEvolving
 public class KuduWriterConfig implements Serializable {
 
     private final String masters;
     private final FlushMode flushMode;
-    private final KuduWriterMode writeMode;
 
     private KuduWriterConfig(
             String masters,
-            FlushMode flushMode,
-            KuduWriterMode writeMode) {
+            FlushMode flushMode) {
 
         this.masters = checkNotNull(masters, "Kudu masters cannot be null");
         this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null");
-        this.writeMode = checkNotNull(writeMode, "Kudu write mode cannot be null");
     }
 
     public String getMasters() {
         return masters;
     }
 
-    public KuduWriterMode getWriteMode() {
-        return writeMode;
-    }
-
     public FlushMode getFlushMode() {
         return flushMode;
     }
@@ -58,7 +56,6 @@
         return new ToStringBuilder(this)
                 .append("masters", masters)
                 .append("flushMode", flushMode)
-                .append("writeMode", writeMode)
                 .toString();
     }
 
@@ -67,7 +64,6 @@
      */
     public static class Builder {
         private String masters;
-        private KuduWriterMode writeMode = KuduWriterMode.UPSERT;
         private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
 
         private Builder(String masters) {
@@ -78,27 +74,15 @@
             return new Builder(masters);
         }
 
-        public Builder setWriteMode(KuduWriterMode writeMode) {
-            this.writeMode = writeMode;
-            return this;
-        }
-        public Builder setUpsertWrite() {
-            return setWriteMode(KuduWriterMode.UPSERT);
-        }
-        public Builder setInsertWrite() {
-            return setWriteMode(KuduWriterMode.INSERT);
-        }
-        public Builder setUpdateWrite() {
-            return setWriteMode(KuduWriterMode.UPDATE);
-        }
-
         public Builder setConsistency(FlushMode flushMode) {
             this.flushMode = flushMode;
             return this;
         }
+
         public Builder setEventualConsistency() {
             return setConsistency(FlushMode.AUTO_FLUSH_BACKGROUND);
         }
+
         public Builder setStrongConsistency() {
             return setConsistency(FlushMode.AUTO_FLUSH_SYNC);
         }
@@ -106,8 +90,7 @@
         public KuduWriterConfig build() {
             return new KuduWriterConfig(
                     masters,
-                    flushMode,
-                    writeMode);
+                    flushMode);
         }
     }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
new file mode 100644
index 0000000..db44eec
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@PublicEvolving
+public class PojoOperationMapper<T> extends AbstractSingleOperationMapper<T> {
+
+    private final Field[] fields;
+
+    protected PojoOperationMapper(Class<T> pojoClass, String[] columnNames) { this(pojoClass, columnNames, null); }
+
+    public PojoOperationMapper(Class<T> pojoClass, String[] columnNames, KuduOperation operation) {
+        super(columnNames, operation);
+        fields = initFields(pojoClass, columnNames);
+    }
+
+    public static List<Field> getAllFields(List<Field> fields, Class<?> type) {
+        fields.addAll(Arrays.asList(type.getDeclaredFields()));
+
+        if (type.getSuperclass() != null) {
+            getAllFields(fields, type.getSuperclass());
+        }
+
+        return fields;
+    }
+
+    private Field[] initFields(Class<T> pojoClass, String[] columnNames) {
+        Map<String, Field> allFields = new HashMap<>();
+        getAllFields(new ArrayList<>(), pojoClass).stream().forEach(f -> {
+            if (!allFields.containsKey(f.getName())) {
+                allFields.put(f.getName(), f);
+            }
+        });
+
+        Field[] fields = new Field[columnNames.length];
+
+        for (int i = 0; i < columnNames.length; i++) {
+            Field f = allFields.get(columnNames[i]);
+            if (f == null) {
+                throw new RuntimeException("Cannot find field " + columnNames[i] + ". List of detected fields: " + allFields.keySet());
+            }
+            f.setAccessible(true);
+            fields[i] = f;
+        }
+
+        return fields;
+    }
+
+    @Override
+    public Object getField(T input, int i) {
+        try {
+            return fields[i].get(input);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("This is a bug");
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowOperationMapper.java
similarity index 63%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowOperationMapper.java
index 8c9eab0..2e90598 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowOperationMapper.java
@@ -16,8 +16,22 @@
  */
 package org.apache.flink.connectors.kudu.connector.writer;
 
-public enum KuduWriterMode {
-    INSERT,
-    UPDATE,
-    UPSERT
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.Row;
+
+@PublicEvolving
+public class RowOperationMapper extends AbstractSingleOperationMapper<Row> {
+
+    protected RowOperationMapper(String[] columnNames) {
+        super(columnNames);
+    }
+
+    public RowOperationMapper(String[] columnNames, KuduOperation operation) {
+        super(columnNames, operation);
+    }
+
+    @Override
+    public Object getField(Row input, int i) {
+        return input.getField(i);
+    }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/TupleOperationMapper.java
similarity index 62%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/TupleOperationMapper.java
index 27b2ed3..491168b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/TupleOperationMapper.java
@@ -16,17 +16,22 @@
  */
 package org.apache.flink.connectors.kudu.connector.writer;
 
-import static org.apache.kudu.client.SessionConfiguration.*;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.tuple.Tuple;
 
-public enum KuduWriterConsistency {
-    EVENTUAL(FlushMode.AUTO_FLUSH_BACKGROUND),
-    STRONG(FlushMode.AUTO_FLUSH_SYNC),
-    //CHECKPOINT(FlushMode.MANUAL_FLUSH)
-    ;
+@PublicEvolving
+public class TupleOperationMapper<T extends Tuple> extends AbstractSingleOperationMapper<T> {
 
-    public final FlushMode flushMode;
+    protected TupleOperationMapper(String[] columnNames) {
+        super(columnNames);
+    }
 
-    KuduWriterConsistency(FlushMode flushMode) {
-        this.flushMode = flushMode;
+    public TupleOperationMapper(String[] columnNames, KuduOperation operation) {
+        super(columnNames, operation);
+    }
+
+    @Override
+    public Object getField(T input, int i) {
+        return input.getField(i);
     }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
index d523b67..a671408 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
@@ -18,56 +18,83 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Streaming Sink that executes Kudu operations based on the incoming elements.
+ * The target Kudu table is defined in the {@link KuduTableInfo} object together with parameters for table
+ * creation in case the table does not exist.
+ * <p>
+ * Incoming records are mapped to Kudu table operations using the provided {@link KuduOperationMapper} logic. While
+ * failures resulting from the operations are handled by the {@link KuduFailureHandler} instance.
+ *
+ * @param <IN> Type of the input records
+ */
 @PublicEvolving
-public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+public class KuduSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final KuduTableInfo tableInfo;
     private final KuduWriterConfig writerConfig;
     private final KuduFailureHandler failureHandler;
-    private final KuduSerialization<OUT> serializer;
-
+    private final KuduOperationMapper<IN> opsMapper;
     private transient KuduWriter kuduWriter;
 
-    public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
-        this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+    /**
+     * Creates a new {@link KuduSink} that will execute operations against the specified Kudu table (defined in {@link KuduTableInfo})
+     * for the incoming stream elements.
+     *
+     * @param writerConfig Writer configuration
+     * @param tableInfo    Table information for the target table
+     * @param opsMapper    Mapping logic from inputs to Kudu operations
+     */
+    public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper) {
+        this(writerConfig, tableInfo, opsMapper, new DefaultKuduFailureHandler());
     }
 
-    public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer, KuduFailureHandler failureHandler) {
-        this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
-        this.writerConfig = checkNotNull(writerConfig,"config could not be null");
-        this.serializer = checkNotNull(serializer,"serializer could not be null");
-        this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+    /**
+     * Creates a new {@link KuduSink} that will execute operations against the specified Kudu table (defined in {@link KuduTableInfo})
+     * for the incoming stream elements.
+     *
+     * @param writerConfig   Writer configuration
+     * @param tableInfo      Table information for the target table
+     * @param opsMapper      Mapping logic from inputs to Kudu operations
+     * @param failureHandler Custom failure handler instance
+     */
+    public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduOperationMapper<IN> opsMapper, KuduFailureHandler failureHandler) {
+        this.tableInfo = checkNotNull(tableInfo, "tableInfo could not be null");
+        this.writerConfig = checkNotNull(writerConfig, "config could not be null");
+        this.opsMapper = checkNotNull(opsMapper, "opsMapper could not be null");
+        this.failureHandler = checkNotNull(failureHandler, "failureHandler could not be null");
     }
 
     @Override
     public void open(Configuration parameters) throws Exception {
-        kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
-
-        serializer.withSchema(kuduWriter.getSchema());
+        kuduWriter = new KuduWriter(tableInfo, writerConfig, opsMapper, failureHandler);
     }
 
     @Override
-    public void invoke(OUT value) throws Exception {
-        final KuduRow kuduRow = serializer.serialize(value);
-        kuduWriter.write(kuduRow);
+    public void invoke(IN value) throws Exception {
+        try {
+            kuduWriter.write(value);
+        } catch (ClassCastException e) {
+            failureHandler.onTypeMismatch(e);
+        }
     }
 
     @Override
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
new file mode 100644
index 0000000..2e1c63e
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+
+/**
+ * Read-only catalog.
+ */
+@Internal
+public abstract class AbstractReadOnlyCatalog extends AbstractCatalog {
+
+    private static final CatalogException UNSUPPORTED_ERR = new CatalogException("This action is not supported for read-only catalogs");
+
+    public AbstractReadOnlyCatalog(String name, String defaultDatabase) {
+        super(name, defaultDatabase);
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException {
+        throw UNSUPPORTED_ERR;
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException {
+        throw UNSUPPORTED_ERR;
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
new file mode 100644
index 0000000..7b3c987
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.AlterTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.shaded.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_MASTERS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Catalog for reading and creating Kudu tables.
+ */
+@PublicEvolving
+public class KuduCatalog extends AbstractReadOnlyCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduCatalog.class);
+    private final KuduTableFactory tableFactory = new KuduTableFactory();
+    private final String kuduMasters;
+    private KuduClient kuduClient;
+
+    /**
+     * Create a new {@link KuduCatalog} with the specified catalog name and kudu master addresses.
+     *
+     * @param catalogName Name of the catalog (used by the table environment)
+     * @param kuduMasters Connection address to Kudu
+     */
+    public KuduCatalog(String catalogName, String kuduMasters) {
+        super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+        this.kuduMasters = kuduMasters;
+        this.kuduClient = createClient();
+    }
+
+    /**
+     * Create a new {@link KuduCatalog} with the specified kudu master addresses.
+     *
+     * @param kuduMasters Connection address to Kudu
+     */
+    public KuduCatalog(String kuduMasters) {
+        this("kudu", kuduMasters);
+    }
+
+    public Optional<TableFactory> getTableFactory() {
+        return Optional.of(getKuduTableFactory());
+    }
+
+    public KuduTableFactory getKuduTableFactory() {
+        return tableFactory;
+    }
+
+    private KuduClient createClient() {
+        return new KuduClient.KuduClientBuilder(kuduMasters).build();
+    }
+
+    @Override
+    public void open() {}
+
+    @Override
+    public void close() {
+        try {
+            if (kuduClient != null) {
+                kuduClient.close();
+            }
+        } catch (KuduException e) {
+            LOG.error("Error while closing kudu client", e);
+        }
+    }
+
+    public ObjectPath getObjectPath(String tableName) {
+        return new ObjectPath(getDefaultDatabase(), tableName);
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        try {
+            return kuduClient.getTablesList().getTablesList();
+        } catch (Throwable t) {
+            throw new CatalogException("Could not list tables", t);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) {
+        checkNotNull(tablePath);
+        try {
+            return kuduClient.tableExists(tablePath.getObjectName());
+        } catch (KuduException e) {
+            throw new CatalogException(e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException {
+        checkNotNull(tablePath);
+
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        String tableName = tablePath.getObjectName();
+
+        try {
+            KuduTable kuduTable = kuduClient.openTable(tableName);
+
+            CatalogTableImpl table = new CatalogTableImpl(
+                    KuduTableUtils.kuduToFlinkSchema(kuduTable.getSchema()),
+                    createTableProperties(tableName, kuduTable.getSchema().getPrimaryKeyColumns()),
+                    tableName);
+
+            return table;
+        } catch (KuduException e) {
+            throw new CatalogException(e);
+        }
+    }
+
+    protected Map<String, String> createTableProperties(String tableName, List<ColumnSchema> primaryKeyColumns) {
+        Map<String, String> props = new HashMap<>();
+        props.put(KUDU_MASTERS, kuduMasters);
+        String primaryKeyNames = primaryKeyColumns.stream().map(ColumnSchema::getName).collect(Collectors.joining(","));
+        props.put(KUDU_PRIMARY_KEY_COLS, primaryKeyNames);
+        props.put(KuduTableFactory.KUDU_TABLE, tableName);
+        return props;
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException {
+        String tableName = tablePath.getObjectName();
+        try {
+            if (tableExists(tablePath)) {
+                kuduClient.deleteTable(tableName);
+            } else if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath);
+            }
+        } catch (KuduException e) {
+            throw new CatalogException("Could not delete table " + tableName, e);
+        }
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException {
+        String tableName = tablePath.getObjectName();
+        try {
+            if (tableExists(tablePath)) {
+                kuduClient.alterTable(tableName, new AlterTableOptions().renameTable(newTableName));
+            } else if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath);
+            }
+        } catch (KuduException e) {
+            throw new CatalogException("Could not rename table " + tableName, e);
+        }
+    }
+
+    public void createTable(KuduTableInfo tableInfo, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException {
+        ObjectPath path = getObjectPath(tableInfo.getName());
+        if (tableExists(path)) {
+            if (ignoreIfExists) {
+                return;
+            } else {
+                throw new TableAlreadyExistException(getName(), path);
+            }
+        }
+
+        try {
+            kuduClient.createTable(tableInfo.getName(), tableInfo.getSchema(), tableInfo.getCreateTableOptions());
+        } catch (
+                KuduException e) {
+            throw new CatalogException("Could not create table " + tableInfo.getName(), e);
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
+        Map<String, String> tableProperties = table.getProperties();
+        TableSchema tableSchema = table.getSchema();
+
+        Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS));
+        Set<String> requiredProperties = new HashSet<>(Arrays.asList(KUDU_HASH_COLS));
+
+        if (!tableSchema.getPrimaryKey().isPresent()) {
+            requiredProperties.add(KUDU_PRIMARY_KEY_COLS);
+        }
+
+        if (!tableProperties.keySet().containsAll(requiredProperties)) {
+            throw new CatalogException("Missing required property. The following properties must be provided: " +
+                    requiredProperties.toString());
+        }
+
+        Set<String> permittedProperties = Sets.union(requiredProperties, optionalProperties);
+        if (!permittedProperties.containsAll(tableProperties.keySet())) {
+            throw new CatalogException("Unpermitted properties were given. The following properties are allowed:" +
+                    permittedProperties.toString());
+        }
+
+        String tableName = tablePath.getObjectName();
+
+        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, tableSchema, tableProperties);
+
+        createTable(tableInfo, ignoreIfExists);
+    }
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return Lists.newArrayList(getDefaultDatabase());
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+        if (databaseName.equals(getDefaultDatabase())) {
+            return new CatalogDatabaseImpl(new HashMap<>(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public List<String> listViews(String databaseName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public List<String> listFunctions(String dbName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException(getName(), functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+        return false;
+    }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
new file mode 100644
index 0000000..1018cae
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.CatalogFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+
+/**
+ * Factory for {@link KuduCatalog}.
+ */
+@Internal
+public class KuduCatalogFactory implements CatalogFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduCatalogFactory.class);
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> context = new HashMap<>();
+        context.put(CATALOG_TYPE, KuduTableFactory.KUDU);
+        context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        List<String> properties = new ArrayList<>();
+
+        properties.add(KuduTableFactory.KUDU_MASTERS);
+
+        return properties;
+    }
+
+    @Override
+    public Catalog createCatalog(String name, Map<String, String> properties) {
+        final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+        return new KuduCatalog(name, descriptorProperties.getString(KuduTableFactory.KUDU_MASTERS));
+    }
+
+    private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+        final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+        descriptorProperties.putProperties(properties);
+        descriptorProperties.validateString(KuduTableFactory.KUDU_MASTERS, false);
+
+        return descriptorProperties;
+    }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
new file mode 100644
index 0000000..49b09a2
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.types.Row;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
+
+    public static final String KUDU_TABLE = "kudu.table";
+    public static final String KUDU_MASTERS = "kudu.masters";
+    public static final String KUDU_HASH_COLS = "kudu.hash-columns";
+    public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
+    public static final String KUDU_REPLICAS = "kudu.replicas";
+    public static final String KUDU = "kudu";
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> context = new HashMap<>();
+        context.put(CONNECTOR_TYPE, KUDU);
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        List<String> properties = new ArrayList<>();
+        properties.add(KUDU_TABLE);
+        properties.add(KUDU_MASTERS);
+        properties.add(KUDU_HASH_COLS);
+        properties.add(KUDU_PRIMARY_KEY_COLS);
+        // schema
+        properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
+        properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+        properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+        properties.add(SCHEMA + ".#." + SCHEMA_FROM);
+        // computed column
+        properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+
+        // time attributes
+        properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
+        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
+        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
+        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
+        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
+        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
+        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
+        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
+        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
+
+        // watermark
+        properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
+        properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
+        properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+        return properties;
+    }
+
+    private DescriptorProperties getValidatedProps(Map<String, String> properties) {
+        checkNotNull(properties.get(KUDU_MASTERS), "Missing required property " + KUDU_MASTERS);
+        checkNotNull(properties.get(KUDU_TABLE), "Missing required property " + KUDU_TABLE);
+        final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+        descriptorProperties.putProperties(properties);
+        new SchemaValidator(true, false, false).validate(descriptorProperties);
+        return descriptorProperties;
+    }
+
+    @Override
+    public KuduTableSource createTableSource(Map<String, String> properties) {
+        DescriptorProperties descriptorProperties = getValidatedProps(properties);
+        String tableName = descriptorProperties.getString(KUDU_TABLE);
+        TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
+        return createTableSource(tableName, schema, properties);
+    }
+
+    @Override
+    public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) {
+        String tableName = tablePath.getObjectName();
+        return createTableSource(tableName, table.getSchema(), table.getProperties());
+    }
+
+    private KuduTableSource createTableSource(String tableName, TableSchema schema, Map<String, String> props) {
+        String masterAddresses = props.get(KUDU_MASTERS);
+        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
+        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);
+
+        KuduReaderConfig.Builder configBuilder = KuduReaderConfig.Builder
+                .setMasters(masterAddresses);
+
+        return new KuduTableSource(configBuilder, tableInfo, physicalSchema, null);
+    }
+
+    @Override
+    public KuduTableSink createTableSink(Map<String, String> properties) {
+        DescriptorProperties descriptorProperties = getValidatedProps(properties);
+        String tableName = descriptorProperties.getString(KUDU_TABLE);
+        TableSchema schema = descriptorProperties.getTableSchema(SCHEMA);
+
+        return createTableSink(tableName, schema, properties);
+    }
+
+    @Override
+    public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) {
+        return createTableSink(tablePath.getObjectName(), table.getSchema(), table.getProperties());
+    }
+
+    private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
+        String masterAddresses = props.get(KUDU_MASTERS);
+        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
+        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);
+
+        KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
+                .setMasters(masterAddresses);
+
+        return new KuduTableSink(configBuilder, tableInfo, physicalSchema);
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
new file mode 100644
index 0000000..8ff517a
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.streaming.KuduSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+public class KuduTableSink implements UpsertStreamTableSink<Row> {
+
+    private final KuduWriterConfig.Builder writerConfigBuilder;
+    private final TableSchema flinkSchema;
+    private final KuduTableInfo tableInfo;
+
+    public KuduTableSink(KuduWriterConfig.Builder configBuilder, KuduTableInfo tableInfo, TableSchema flinkSchema) {
+        this.writerConfigBuilder = configBuilder;
+        this.tableInfo = tableInfo;
+        this.flinkSchema = flinkSchema;
+    }
+
+    @Override
+    public void setKeyFields(String[] keyFields) { /* this has no effect */}
+
+    @Override
+    public void setIsAppendOnly(Boolean isAppendOnly) { /* this has no effect */}
+
+    @Override
+    public TypeInformation<Row> getRecordType() { return flinkSchema.toRowType(); }
+
+    @Override
+    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) { consumeDataStream(dataStreamTuple); }
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) {
+        KuduSink upsertKuduSink = new KuduSink(writerConfigBuilder.build(), tableInfo, new UpsertOperationMapper(getTableSchema().getFieldNames()));
+
+        return dataStreamTuple
+                .addSink(upsertKuduSink)
+                .setParallelism(dataStreamTuple.getParallelism())
+                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getTableSchema().getFieldNames()));
+    }
+
+    @Override
+    public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+        return new KuduTableSink(writerConfigBuilder, tableInfo, flinkSchema);
+    }
+
+    @Override
+    public TableSchema getTableSchema() { return flinkSchema; }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
new file mode 100644
index 0000000..2c972fb
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.LimitableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSource<Row>, ProjectableTableSource<Row> {
+
+    private final KuduReaderConfig.Builder configBuilder;
+    private final KuduTableInfo tableInfo;
+    private final TableSchema flinkSchema;
+    private final String[] projectedFields;
+
+    public KuduTableSource(KuduReaderConfig.Builder configBuilder, KuduTableInfo tableInfo, TableSchema flinkSchema, String[] projectedFields) {
+        this.configBuilder = configBuilder;
+        this.tableInfo = tableInfo;
+        this.flinkSchema = flinkSchema;
+        this.projectedFields = projectedFields;
+    }
+
+    @Override
+    public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+
+        KuduRowInputFormat inputFormat = new KuduRowInputFormat(configBuilder.build(), tableInfo, projectedFields == null ? null : Lists.newArrayList(projectedFields));
+
+        return env.createInput(inputFormat, (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())).name(explainSource());
+    }
+
+    @Override
+    public TableSchema getTableSchema() {
+        return flinkSchema;
+    }
+
+    @Override
+    public DataType getProducedDataType() {
+        if (projectedFields == null) {
+            return flinkSchema.toRowDataType();
+        } else {
+            DataTypes.Field[] fields = new DataTypes.Field[projectedFields.length];
+            for (int i = 0; i < fields.length; i++) {
+                String fieldName = projectedFields[i];
+                fields[i] = DataTypes.FIELD(
+                        fieldName,
+                        flinkSchema
+                                .getTableColumn(fieldName)
+                                .get()
+                                .getType()
+                );
+            }
+            return DataTypes.ROW(fields);
+        }
+    }
+
+    @Override
+    public boolean isBounded() {
+        return true;
+    }
+
+    @Override
+    public boolean isLimitPushedDown() {
+        return true;
+    }
+
+    @Override
+    public TableSource<Row> applyLimit(long l) {
+        return new KuduTableSource(configBuilder.setRowLimit((int) l), tableInfo, flinkSchema, projectedFields);
+    }
+
+    @Override
+    public TableSource<Row> projectFields(int[] ints) {
+        String[] fieldNames = new String[ints.length];
+        RowType producedDataType = (RowType) getProducedDataType().getLogicalType();
+        List<String> prevFieldNames = producedDataType.getFieldNames();
+        for (int i = 0; i < ints.length; i++) {
+            fieldNames[i] = prevFieldNames.get(ints[i]);
+        }
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+    }
+
+    @Override
+    public String explainSource() {
+        return "KuduStreamTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames())
+                + (projectedFields != null ?", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
new file mode 100644
index 0000000..847dad4
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import java.util.Optional;
+
+@Internal
+public class UpsertOperationMapper extends AbstractSingleOperationMapper<Tuple2<Boolean, Row>> {
+
+    public UpsertOperationMapper(String[] columnNames) {
+        super(columnNames);
+    }
+
+    @Override
+    public Object getField(Tuple2<Boolean, Row> input, int i) {
+        return input.f1.getField(i);
+    }
+
+    @Override
+    public Optional<Operation> createBaseOperation(Tuple2<Boolean, Row> input, KuduTable table) {
+        return Optional.of(input.f0 ? table.newUpsert() : table.newDelete());
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
new file mode 100644
index 0000000..5fc8cca
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.ColumnSchemasFactory;
+import org.apache.flink.connectors.kudu.connector.CreateTableOptionsFactory;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.table.KuduTableFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
+
+public class KuduTableUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduTableUtils.class);
+
+    public static KuduTableInfo createTableInfo(String tableName, TableSchema schema, Map<String, String> props) {
+        // Since KUDU_HASH_COLS is a required property for table creation, we use it to infer whether to create table
+        boolean createIfMissing = props.containsKey(KUDU_HASH_COLS);
+
+        KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
+
+        if (createIfMissing) {
+
+            List<Tuple2<String, DataType>> columns = getSchemaWithSqlTimestamp(schema)
+                    .getTableColumns()
+                    .stream()
+                    .map(tc -> Tuple2.of(tc.getName(), tc.getType()))
+                    .collect(Collectors.toList());
+
+            List<String> keyColumns = getPrimaryKeyColumns(props, schema);
+            ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, keyColumns);
+            List<String> hashColumns = getHashColumns(props);
+            int replicas = Optional.ofNullable(props.get(KuduTableFactory.KUDU_REPLICAS)).map(Integer::parseInt).orElse(1);
+
+            CreateTableOptionsFactory optionsFactory = () -> new CreateTableOptions()
+                    .setNumReplicas(replicas)
+                    .addHashPartitions(hashColumns, replicas * 2);
+
+            tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);
+        } else {
+            LOG.debug("Property {} is missing, assuming the table is already created.", KUDU_HASH_COLS);
+        }
+
+        return tableInfo;
+    }
+
+    public static List<ColumnSchema> toKuduConnectorColumns(List<Tuple2<String, DataType>> columns, Collection<String> keyColumns) {
+        return columns.stream()
+                .map(t -> {
+                            ColumnSchema.ColumnSchemaBuilder builder = new ColumnSchema
+                                    .ColumnSchemaBuilder(t.f0, KuduTypeUtils.toKuduType(t.f1))
+                                    .key(keyColumns.contains(t.f0))
+                                    .nullable(!keyColumns.contains(t.f0) && t.f1.getLogicalType().isNullable());
+                            if(t.f1.getLogicalType() instanceof DecimalType) {
+                                DecimalType decimalType = ((DecimalType) t.f1.getLogicalType());
+                                builder.typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+                                    .precision(decimalType.getPrecision())
+                                    .scale(decimalType.getScale())
+                                    .build());
+                            }
+                            return builder.build();
+                        }
+                ).collect(Collectors.toList());
+    }
+
+    public static TableSchema kuduToFlinkSchema(Schema schema) {
+        TableSchema.Builder builder = TableSchema.builder();
+
+        for (ColumnSchema column : schema.getColumns()) {
+            DataType flinkType = KuduTypeUtils.toFlinkType(column.getType(), column.getTypeAttributes()).nullable();
+            builder.field(column.getName(), flinkType);
+        }
+
+        return builder.build();
+    }
+
+    public static List<String> getPrimaryKeyColumns(Map<String, String> tableProperties, TableSchema tableSchema) {
+        return tableProperties.containsKey(KUDU_PRIMARY_KEY_COLS) ? Arrays.asList(tableProperties.get(KUDU_PRIMARY_KEY_COLS).split(",")) : tableSchema.getPrimaryKey().get().getColumns();
+    }
+
+    public static List<String> getHashColumns(Map<String, String> tableProperties) {
+        return Lists.newArrayList(tableProperties.get(KUDU_HASH_COLS).split(","));
+    }
+
+    public static TableSchema getSchemaWithSqlTimestamp(TableSchema schema) {
+        TableSchema.Builder builder = new TableSchema.Builder();
+        TableSchemaUtils.getPhysicalSchema(schema).getTableColumns().forEach(
+                tableColumn -> {
+                    if (tableColumn.getType().getLogicalType() instanceof TimestampType) {
+                        builder.field(tableColumn.getName(), tableColumn.getType().bridgedTo(Timestamp.class));
+                    } else {
+                        builder.field(tableColumn.getName(), tableColumn.getType());
+                    }
+                });
+        return builder.build();
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
new file mode 100644
index 0000000..c445465
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.table.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Type;
+
+import java.sql.Timestamp;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class KuduTypeUtils {
+
+    public static DataType toFlinkType(Type type, ColumnTypeAttributes typeAttributes) {
+        switch (type) {
+            case STRING:
+                return DataTypes.STRING();
+            case FLOAT:
+                return DataTypes.FLOAT();
+            case INT8:
+                return DataTypes.TINYINT();
+            case INT16:
+                return DataTypes.SMALLINT();
+            case INT32:
+                return DataTypes.INT();
+            case INT64:
+                return DataTypes.BIGINT();
+            case DOUBLE:
+                return DataTypes.DOUBLE();
+            case DECIMAL:
+                return DataTypes.DECIMAL(typeAttributes.getPrecision(), typeAttributes.getScale());
+            case BOOL:
+                return DataTypes.BOOLEAN();
+            case BINARY:
+                return DataTypes.BYTES();
+            case UNIXTIME_MICROS:
+                return new AtomicDataType(new TimestampType(3), Timestamp.class);
+
+            default:
+                throw new IllegalArgumentException("Illegal var type: " + type);
+        }
+    }
+
+    public static Type toKuduType(DataType dataType) {
+        checkNotNull(dataType, "type cannot be null");
+        LogicalType logicalType = dataType.getLogicalType();
+        return logicalType.accept(new KuduTypeLogicalTypeVisitor(dataType));
+    }
+
+    private static class KuduTypeLogicalTypeVisitor extends LogicalTypeDefaultVisitor<Type> {
+
+        private final DataType dataType;
+
+        KuduTypeLogicalTypeVisitor(DataType dataType) {
+            this.dataType = dataType;
+        }
+
+        @Override
+        public Type visit(BooleanType booleanType) {
+            return Type.BOOL;
+        }
+
+        @Override
+        public Type visit(TinyIntType tinyIntType) {
+            return Type.INT8;
+        }
+
+        @Override
+        public Type visit(SmallIntType smallIntType) {
+            return Type.INT16;
+        }
+
+        @Override
+        public Type visit(IntType intType) {
+            return Type.INT32;
+        }
+
+        @Override
+        public Type visit(BigIntType bigIntType) {
+            return Type.INT64;
+        }
+
+        @Override
+        public Type visit(FloatType floatType) {
+            return Type.FLOAT;
+        }
+
+        @Override
+        public Type visit(DoubleType doubleType) {
+            return Type.DOUBLE;
+        }
+
+        @Override
+        public Type visit(DecimalType decimalType) {
+            return Type.DECIMAL;
+        }
+
+        @Override
+        public Type visit(TimestampType timestampType) {
+            return Type.UNIXTIME_MICROS;
+        }
+
+        @Override
+        public Type visit(VarCharType varCharType) {
+            return Type.STRING;
+        }
+
+        @Override
+        public Type visit(VarBinaryType varBinaryType) {
+            return Type.BINARY;
+        }
+
+        @Override
+        protected Type defaultMethod(LogicalType logicalType) {
+            throw new UnsupportedOperationException(
+                    String.format("Flink doesn't support converting type %s to Kudu type yet.", dataType.toString()));
+        }
+
+    }
+}
diff --git a/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..487f2d9
--- /dev/null
+++ b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connectors.kudu.table.KuduTableFactory
+org.apache.flink.connectors.kudu.table.KuduCatalogFactory
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
index e22f40e..fb7d8e3 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.flink.connectors.kudu.batch;
 
-import org.apache.flink.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.flink.types.Row;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -29,27 +29,27 @@
 import java.util.Arrays;
 import java.util.List;
 
-class KuduInputFormatTest extends KuduDatabase {
+class KuduInputFormatTest extends KuduTestBase {
 
     @Test
     void testInvalidKuduMaster() {
-        KuduTableInfo tableInfo = booksTableInfo("books",false);
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(null, tableInfo, new DefaultSerDe()));
+        KuduTableInfo tableInfo = booksTableInfo("books", false);
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(null, tableInfo));
     }
 
     @Test
     void testInvalidTableInfo() {
         String masterAddresses = harness.getMasterAddressesAsString();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(readerConfig, null, new DefaultSerDe()));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null));
     }
 
     @Test
     void testInputFormat() throws Exception {
-        KuduTableInfo tableInfo = booksTableInfo("books",true);
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
 
-        List<KuduRow> rows = readRows(tableInfo);
+        List<Row> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
 
         cleanDatabase(tableInfo);
@@ -57,32 +57,32 @@
 
     @Test
     void testInputFormatWithProjection() throws Exception {
-        KuduTableInfo tableInfo = booksTableInfo("books",true);
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
 
-        List<KuduRow> rows = readRows(tableInfo,"title","id");
+        List<Row> rows = readRows(tableInfo, "title", "id");
         Assertions.assertEquals(5, rows.size());
 
-        for (KuduRow row: rows) {
+        for (Row row : rows) {
             Assertions.assertEquals(2, row.getArity());
         }
 
         cleanDatabase(tableInfo);
     }
 
-
-    private List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
+    private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        KuduInputFormat<KuduRow> inputFormat = new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe(), new ArrayList<>(), Arrays.asList(fieldProjection));
+        KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(),
+                fieldProjection == null ? null : Arrays.asList(fieldProjection));
 
         KuduInputSplit[] splits = inputFormat.createInputSplits(1);
-        List<KuduRow> rows = new ArrayList<>();
+        List<Row> rows = new ArrayList<>();
         for (KuduInputSplit split : splits) {
             inputFormat.open(split);
-            while(!inputFormat.reachedEnd()) {
-                KuduRow row = inputFormat.nextRecord(new KuduRow(5));
-                if(row != null) {
+            while (!inputFormat.reachedEnd()) {
+                Row row = inputFormat.nextRecord(new Row(5));
+                if (row != null) {
                     rows.add(row);
                 }
             }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
index f14eaa0..abf8a30 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
@@ -16,60 +16,62 @@
  */
 package org.apache.flink.connectors.kudu.batch;
 
-import org.apache.flink.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.types.Row;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
 import java.util.UUID;
 
-class KuduOuputFormatTest extends KuduDatabase {
+class KuduOuputFormatTest extends KuduTestBase {
 
     @Test
     void testInvalidKuduMaster() {
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, null));
     }
 
     @Test
     void testInvalidTableInfo() {
         String masterAddresses = harness.getMasterAddressesAsString();
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, new DefaultSerDe()));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, null));
     }
 
     @Test
     void testNotTableExist() {
         String masterAddresses = harness.getMasterAddressesAsString();
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
-        Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
+        KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0, 1));
     }
 
     @Test
     void testOutputWithStrongConsistency() throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
 
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
                 .setMasters(masterAddresses)
                 .setStrongConsistency()
                 .build();
-        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+        KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
 
-        outputFormat.open(0,1);
+        outputFormat.open(0, 1);
 
-        for (KuduRow kuduRow : booksDataRow()) {
+        for (Row kuduRow : booksDataRow()) {
             outputFormat.writeRecord(kuduRow);
         }
         outputFormat.close();
 
-        List<KuduRow> rows = readRows(tableInfo);
+        List<Row> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
         kuduRowsTest(rows);
 
@@ -80,16 +82,16 @@
     void testOutputWithEventualConsistency() throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
 
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
                 .setMasters(masterAddresses)
                 .setEventualConsistency()
                 .build();
-        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+        KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
 
-        outputFormat.open(0,1);
+        outputFormat.open(0, 1);
 
-        for (KuduRow kuduRow : booksDataRow()) {
+        for (Row kuduRow : booksDataRow()) {
             outputFormat.writeRecord(kuduRow);
         }
 
@@ -98,7 +100,7 @@
 
         outputFormat.close();
 
-        List<KuduRow> rows = readRows(tableInfo);
+        List<Row> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
         kuduRowsTest(rows);
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
deleted file mode 100644
index cda8c21..0000000
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector;
-
-import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
-import org.apache.kudu.Type;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Rule;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@ExtendWith(ExternalResourceSupport.class)
-public class KuduDatabase {
-
-    @Rule
-    public static KuduTestHarness harness = new KuduTestHarness();
-
-    private static final Object[][] booksTableData = {
-            {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
-            {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
-            {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
-            {1004, "A Cup of Java", "Kumar", 44.44, 44},
-            {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
-
-
-    protected static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
-        return KuduTableInfo.Builder
-                .create(tableName)
-                .createIfNotExist(createIfNotExist)
-                .replicas(1)
-                .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
-                .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
-                .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
-                .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).asNullable().build())
-                .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).asNullable().build())
-                .build();
-    }
-
-    protected static List<KuduRow> booksDataRow() {
-        return Arrays.stream(booksTableData)
-                .map(row -> {
-                    Integer rowId = (Integer)row[0];
-                    if (rowId % 2 == 1) {
-                        KuduRow values = new KuduRow(5);
-                        values.setField(0, "id", row[0]);
-                        values.setField(1, "title", row[1]);
-                        values.setField(2, "author", row[2]);
-                        values.setField(3, "price", row[3]);
-                        values.setField(4, "quantity", row[4]);
-                        return values;
-                    } else {
-                        KuduRow values = new KuduRow(3);
-                        values.setField(0, "id", row[0]);
-                        values.setField(1, "title", row[1]);
-                        values.setField(2, "author", row[2]);
-                        return values;
-                    }
-                })
-                .collect(Collectors.toList());
-    }
-
-    protected void setUpDatabase(KuduTableInfo tableInfo) {
-        try {
-            String masterAddresses = harness.getMasterAddressesAsString();
-            KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
-            booksDataRow().forEach(row -> {
-                try {
-                    kuduWriter.write(row);
-                }catch (Exception e) {
-                    e.printStackTrace();
-                }
-            });
-            kuduWriter.close();
-        } catch (Exception e) {
-            Assertions.fail();
-        }
-    }
-
-    protected void cleanDatabase(KuduTableInfo tableInfo) {
-        try {
-            String masterAddresses = harness.getMasterAddressesAsString();
-            KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
-            kuduWriter.deleteTable();
-            kuduWriter.close();
-        } catch (Exception e) {
-            Assertions.fail();
-        }
-    }
-
-    protected List<KuduRow> readRows(KuduTableInfo tableInfo) throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
-        KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        KuduReader reader = new KuduReader(tableInfo, readerConfig);
-
-        KuduInputSplit[] splits = reader.createInputSplits(1);
-        List<KuduRow> rows = new ArrayList<>();
-        for (KuduInputSplit split : splits) {
-            KuduReaderIterator resultIterator = reader.scanner(split.getScanToken());
-            while(resultIterator.hasNext()) {
-                KuduRow row = resultIterator.next();
-                if(row != null) {
-                    rows.add(row);
-                }
-            }
-        }
-        reader.close();
-
-        return rows;
-    }
-
-    protected void kuduRowsTest(List<KuduRow> rows) {
-        for (KuduRow row: rows) {
-            Integer rowId = (Integer)row.getField("id");
-            if (rowId % 2 == 1) {
-                Assertions.assertNotEquals(null, row.getField("price"));
-                Assertions.assertNotEquals(null, row.getField("quantity"));
-            }
-            else {
-                Assertions.assertNull(row.getField("price"));
-                Assertions.assertNull(row.getField("quantity"));
-            }
-        }
-    }
-}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
new file mode 100644
index 0000000..36572c6
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(ExternalResourceSupport.class)
+public class KuduTestBase {
+
+    private static final Object[][] booksTableData = {
+            {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
+            {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
+            {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
+            {1004, "A Cup of Java", "Kumar", 44.44, 44},
+            {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
+
+    public static KuduTestHarness harness;
+    public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
+
+    @BeforeAll
+    public static void beforeClass() throws Exception {
+        harness = new KuduTestHarness();
+        harness.before();
+    }
+
+    @AfterAll
+    public static void afterClass() throws Exception {
+        harness.after();
+    }
+
+    public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
+
+        KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
+
+        if (createIfNotExist) {
+            ColumnSchemasFactory schemasFactory = () -> {
+                List<ColumnSchema> schemas = new ArrayList<>();
+                schemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
+                schemas.add(new ColumnSchema.ColumnSchemaBuilder("title", Type.STRING).build());
+                schemas.add(new ColumnSchema.ColumnSchemaBuilder("author", Type.STRING).build());
+                schemas.add(new ColumnSchema.ColumnSchemaBuilder("price", Type.DOUBLE).nullable(true).build());
+                schemas.add(new ColumnSchema.ColumnSchemaBuilder("quantity", Type.INT32).nullable(true).build());
+                return schemas;
+            };
+
+            tableInfo.createTableIfNotExists(
+                    schemasFactory,
+                    () -> new CreateTableOptions()
+                            .setNumReplicas(1)
+                            .addHashPartitions(Lists.newArrayList("id"), 2));
+        }
+
+        return tableInfo;
+    }
+
+    public static List<Tuple5<Integer, String, String, Double, Integer>> booksDataTuple() {
+        return Arrays.stream(booksTableData)
+                .map(row -> {
+                    Integer rowId = (Integer) row[0];
+                    if (rowId % 2 == 1) {
+                        Tuple5<Integer, String, String, Double, Integer> values =
+                                Tuple5.of((Integer) row[0],
+                                        (String) row[1],
+                                        (String) row[2],
+                                        (Double) row[3],
+                                        (Integer)row[4]);
+                        return values;
+                    } else {
+                        Tuple5<Integer, String, String, Double, Integer> values =
+                                Tuple5.of((Integer) row[0],
+                                        (String) row[1],
+                                        (String) row[2],
+                                        null, null);
+                        return values;
+                    }
+                })
+                .collect(Collectors.toList());
+    }
+
+    public static List<Row> booksDataRow() {
+        return Arrays.stream(booksTableData)
+                .map(row -> {
+                    Integer rowId = (Integer) row[0];
+                    if (rowId % 2 == 1) {
+                        Row values = new Row(5);
+                        values.setField(0, row[0]);
+                        values.setField(1, row[1]);
+                        values.setField(2, row[2]);
+                        values.setField(3, row[3]);
+                        values.setField(4, row[4]);
+                        return values;
+                    } else {
+                        Row values = new Row(5);
+                        values.setField(0, row[0]);
+                        values.setField(1, row[1]);
+                        values.setField(2, row[2]);
+                        return values;
+                    }
+                })
+                .collect(Collectors.toList());
+    }
+
+    public static List<BookInfo> booksDataPojo() {
+        return Arrays.stream(booksTableData).map(row -> new BookInfo(
+                (int) row[0],
+                (String) row[1],
+                (String) row[2],
+                (Double) row[3],
+                (int) row[4]))
+                .collect(Collectors.toList());
+    }
+
+    protected void setUpDatabase(KuduTableInfo tableInfo) {
+        try {
+            String masterAddresses = harness.getMasterAddressesAsString();
+            KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+            booksDataRow().forEach(row -> {
+                try {
+                    kuduWriter.write(row);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            });
+            kuduWriter.close();
+        } catch (Exception e) {
+            Assertions.fail();
+        }
+    }
+
+    protected void cleanDatabase(KuduTableInfo tableInfo) {
+        try {
+            String masterAddresses = harness.getMasterAddressesAsString();
+            KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+            kuduWriter.deleteTable();
+            kuduWriter.close();
+        } catch (Exception e) {
+            Assertions.fail();
+        }
+    }
+
+    protected List<Row> readRows(KuduTableInfo tableInfo) throws Exception {
+        String masterAddresses = harness.getMasterAddressesAsString();
+        KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+        KuduReader reader = new KuduReader(tableInfo, readerConfig);
+
+        KuduInputSplit[] splits = reader.createInputSplits(1);
+        List<Row> rows = new ArrayList<>();
+        for (KuduInputSplit split : splits) {
+            KuduReaderIterator resultIterator = reader.scanner(split.getScanToken());
+            while (resultIterator.hasNext()) {
+                Row row = resultIterator.next();
+                if (row != null) {
+                    rows.add(row);
+                }
+            }
+        }
+        reader.close();
+
+        return rows;
+    }
+
+    protected void kuduRowsTest(List<Row> rows) {
+        for (Row row : rows) {
+            Integer rowId = (Integer) row.getField(0);
+            if (rowId % 2 == 1) {
+                Assertions.assertNotEquals(null, row.getField(3));
+                Assertions.assertNotEquals(null, row.getField(4));
+            } else {
+                Assertions.assertNull(row.getField(3));
+                Assertions.assertNull(row.getField(4));
+            }
+        }
+    }
+
+    protected void validateSingleKey(String tableName) throws Exception {
+        KuduTable kuduTable = harness.getClient().openTable(tableName);
+        Schema schema = kuduTable.getSchema();
+
+        assertEquals(1, schema.getPrimaryKeyColumnCount());
+        assertEquals(2, schema.getColumnCount());
+
+        assertTrue(schema.getColumn("first").isKey());
+        assertFalse(schema.getColumn("second").isKey());
+
+        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        List<RowResult> rows = new ArrayList<>();
+        scanner.forEach(rows::add);
+
+        assertEquals(1, rows.size());
+        assertEquals("f", rows.get(0).getString("first"));
+        assertEquals("s", rows.get(0).getString("second"));
+    }
+
+    public static class BookInfo {
+        public int id, quantity;
+        public String title, author;
+        public Double price;
+
+        public BookInfo() {
+        }
+
+        public BookInfo(int id, String title, String author, Double price, int quantity) {
+            this.id = id;
+            this.title = title;
+            this.author = author;
+            this.price = price;
+            this.quantity = quantity;
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
deleted file mode 100644
index 6057113..0000000
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.connector.serde;
-
-import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
-import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.kudu.Type;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class PojoSerDeTest {
-
-    public class TestPojo {
-        private String field1;
-        private String field2;
-        private String field3;
-
-        public TestPojo() {
-            field1 = "field1";
-            field2 = "field2";
-            field3 = "field3";
-        }
-    }
-
-    @Test
-    public void testFieldsNotInSchema() {
-
-        TestPojo pojo = new TestPojo();
-
-        KuduTableInfo tableInfo = KuduTableInfo.Builder.create("test")
-                .addColumn(KuduColumnInfo.Builder.create("field1", Type.STRING).key(true).hashKey(true).build())
-                .addColumn(KuduColumnInfo.Builder.create("field2", Type.STRING).build())
-                .build();
-
-        KuduRow row = new PojoSerDe<>(TestPojo.class).withSchema(tableInfo.getSchema()).serialize(pojo);
-
-        Assertions.assertEquals(2, row.blindMap().size());
-        Assertions.assertEquals("field1", row.getField("field1"));
-        Assertions.assertEquals("field2", row.getField("field2"));
-
-    }
-}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 38d0115..077306d 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -17,24 +17,32 @@
 package org.apache.flink.connectors.kudu.streaming;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
-import org.apache.flink.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.connectors.kudu.connector.KuduRow;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.UUID;
 
-class KuduSinkTest extends KuduDatabase {
+public class KuduSinkTest extends KuduTestBase {
 
+    private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
+    private static final String[] columns = new String[]{"id", "uuid"};
     private static StreamingRuntimeContext context;
 
     @BeforeAll
@@ -45,23 +53,24 @@
 
     @Test
     void testInvalidKuduMaster() {
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(null, tableInfo, new DefaultSerDe()));
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(null, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)));
     }
 
     @Test
     void testInvalidTableInfo() {
+        harness.getClient();
         String masterAddresses = harness.getMasterAddressesAsString();
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new DefaultSerDe()));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)));
     }
 
     @Test
     void testNotTableExist() {
         String masterAddresses = harness.getMasterAddressesAsString();
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+        KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
 
         sink.setRuntimeContext(context);
         Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
@@ -71,22 +80,22 @@
     void testOutputWithStrongConsistency() throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
 
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
                 .setMasters(masterAddresses)
                 .setStrongConsistency()
                 .build();
-        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+        KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
 
         sink.setRuntimeContext(context);
         sink.open(new Configuration());
 
-        for (KuduRow kuduRow : booksDataRow()) {
+        for (Row kuduRow : booksDataRow()) {
             sink.invoke(kuduRow);
         }
         sink.close();
 
-        List<KuduRow> rows = readRows(tableInfo);
+        List<Row> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
         kuduRowsTest(rows);
     }
@@ -95,17 +104,17 @@
     void testOutputWithEventualConsistency() throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
 
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
                 .setMasters(masterAddresses)
                 .setEventualConsistency()
                 .build();
-        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+        KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
 
         sink.setRuntimeContext(context);
         sink.open(new Configuration());
 
-        for (KuduRow kuduRow : booksDataRow()) {
+        for (Row kuduRow : booksDataRow()) {
             sink.invoke(kuduRow);
         }
 
@@ -114,37 +123,46 @@
 
         sink.close();
 
-        List<KuduRow> rows = readRows(tableInfo);
+        List<Row> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
         kuduRowsTest(rows);
     }
 
-
     @Test
     void testSpeed() throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
 
-        KuduTableInfo tableInfo = KuduTableInfo.Builder
-                .create("test_speed")
-                .createIfNotExist(true)
-                .replicas(3)
-                .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
-                .addColumn(KuduColumnInfo.Builder.create("uuid", Type.STRING).build())
-                .build();
+        KuduTableInfo tableInfo = KuduTableInfo
+                .forTable("test_speed")
+                .createTableIfNotExists(
+                        () ->
+                                Lists.newArrayList(
+                                        new ColumnSchema
+                                                .ColumnSchemaBuilder("id", Type.INT32)
+                                                .key(true)
+                                                .build(),
+                                        new ColumnSchema
+                                                .ColumnSchemaBuilder("uuid", Type.STRING)
+                                                .build()
+                                ),
+                        () -> new CreateTableOptions()
+                                .setNumReplicas(3)
+                                .addHashPartitions(Lists.newArrayList("id"), 6));
+
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
                 .setMasters(masterAddresses)
                 .setEventualConsistency()
                 .build();
-        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+        KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
 
         sink.setRuntimeContext(context);
         sink.open(new Configuration());
 
         int totalRecords = 100000;
-        for (int i=0; i < totalRecords; i++) {
-            KuduRow kuduRow = new KuduRow(2);
-            kuduRow.setField(0, "id", i);
-            kuduRow.setField(1, "uuid", UUID.randomUUID().toString());
+        for (int i = 0; i < totalRecords; i++) {
+            Row kuduRow = new Row(2);
+            kuduRow.setField(0, i);
+            kuduRow.setField(1, UUID.randomUUID().toString());
             sink.invoke(kuduRow);
         }
 
@@ -153,7 +171,7 @@
 
         sink.close();
 
-        List<KuduRow> rows = readRows(tableInfo);
+        List<Row> rows = readRows(tableInfo);
         Assertions.assertEquals(totalRecords, rows.size());
     }
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
new file mode 100644
index 0000000..f694108
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
+import org.apache.flink.connectors.kudu.streaming.KuduSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.flink.types.Row;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class KuduCatalogTest extends KuduTestBase {
+
+    private KuduCatalog catalog;
+    private StreamTableEnvironment tableEnv;
+
+    @BeforeEach
+    public void init() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
+        tableEnv.registerCatalog("kudu", catalog);
+        tableEnv.useCatalog("kudu");
+    }
+
+    @Test
+    public void testCreateAlterDrop() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+        tableEnv.sqlUpdate("INSERT INTO TestTable1 VALUES ('f', 's')");
+
+        // Add this once Primary key support has been enabled
+        // tableEnv.sqlUpdate("CREATE TABLE TestTable2 (`first` STRING, `second` String, PRIMARY KEY(`first`)) WITH ('kudu.hash-columns' = 'first')");
+        // tableEnv.sqlUpdate("INSERT INTO TestTable2 VALUES ('f', 's')");
+
+        tableEnv.execute("test");
+
+        validateSingleKey("TestTable1");
+        // validateSingleKey("TestTable2");
+
+        tableEnv.sqlUpdate("ALTER TABLE TestTable1 RENAME TO TestTable1R");
+        validateSingleKey("TestTable1R");
+
+        tableEnv.sqlUpdate("DROP TABLE TestTable1R");
+        assertFalse(harness.getClient().tableExists("TestTable1R"));
+    }
+
+    @Test
+    public void testCreateAndInsertMultiKey() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')");
+        tableEnv.sqlUpdate("INSERT INTO TestTable3 VALUES ('f', 2, 't')");
+
+        // Add this once Primary key support has been enabled
+        // tableEnv.sqlUpdate("CREATE TABLE TestTable4 (`first` STRING, `second` INT, `third` STRING) PRIMARY KEY (`first`, `second`) WITH ('kudu.hash-columns' = 'first,second')");
+        // tableEnv.sqlUpdate("INSERT INTO TestTable4 VALUES ('f', 2, 't')");
+
+        tableEnv.execute("test");
+
+        validateMultiKey("TestTable3");
+        // validateMultiKey("TestTable4");
+    }
+
+    @Test
+    public void testSourceProjection() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')");
+        tableEnv.sqlUpdate("INSERT INTO TestTable5 VALUES ('s', 'f', 't')");
+        tableEnv.execute("test");
+
+        tableEnv.sqlUpdate("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+        tableEnv.sqlUpdate("INSERT INTO TestTable6 (SELECT `first`, `second` FROM  TestTable5)");
+        tableEnv.execute("test");
+
+        validateSingleKey("TestTable6");
+    }
+
+    @Test
+    public void testEmptyProjection() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+        tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f','s')");
+        tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f2','s2')");
+        tableEnv.execute("test");
+
+        Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP");
+
+        DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(result, Types.ROW(Types.LONG));
+
+        CollectionSink.output.clear();
+
+        resultDataStream
+                .map(t -> Tuple2.of(t.f0, t.f1.getField(0)))
+                .returns(Types.TUPLE(Types.BOOLEAN, Types.LONG))
+                .addSink(new CollectionSink<>()).setParallelism(1);
+
+        tableEnv.execute("test");
+
+        List<Tuple2<Boolean, Long>> expected = Lists.newArrayList(Tuple2.of(true, 1L), Tuple2.of(false, 1L), Tuple2.of(true, 2L));
+
+        assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
+        CollectionSink.output.clear();
+
+    }
+
+    @Test
+    public void dataStreamEndToEstTest() throws Exception {
+        KuduCatalog catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        // Creating table through catalog
+        KuduTableFactory tableFactory = catalog.getKuduTableFactory();
+
+        KuduTableInfo tableInfo = KuduTableInfo.forTable("TestTable7").createTableIfNotExists(
+                () ->
+                        Lists.newArrayList(
+                                new ColumnSchema
+                                        .ColumnSchemaBuilder("k", Type.INT32)
+                                        .key(true)
+                                        .build(),
+                                new ColumnSchema
+                                        .ColumnSchemaBuilder("v", Type.STRING)
+                                        .build()
+                        ),
+                () -> new CreateTableOptions()
+                        .setNumReplicas(1)
+                        .addHashPartitions(Lists.newArrayList("k"), 2));
+
+        catalog.createTable(tableInfo, false);
+
+        ObjectPath path = catalog.getObjectPath("TestTable7");
+        CatalogTable table = catalog.getTable(path);
+
+        List<Tuple2<Integer, String>> input = Lists.newArrayList(Tuple2.of(1, "one"), Tuple2.of(2, "two"), Tuple2.of(3, "three"));
+
+        // Writing with simple sink
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(harness.getMasterAddressesAsString()).build();
+        env.fromCollection(input).addSink(
+                new KuduSink<>(
+                        writerConfig,
+                        tableInfo,
+                        new TupleOperationMapper<>(
+                                new String[]{"k", "v"},
+                                AbstractSingleOperationMapper.KuduOperation.INSERT)
+                )
+        );
+        env.execute();
+        // Reading and validating data
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+
+        CollectionSink.output.clear();
+        tableFactory.createTableSource(path, table)
+                .getDataStream(env)
+                .map(row -> Tuple2.of((int) row.getField(0), (String) row.getField(1)))
+                .returns(new TypeHint<Tuple2<Integer, String>>() {
+                })
+                .addSink(new CollectionSink<>()).setParallelism(1);
+        env.execute();
+
+        List<Tuple2<Integer, String>> expected = Lists.newArrayList(Tuple2.of(1, "one"), Tuple2.of(2, "two"), Tuple2.of(3, "three"));
+        assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
+        CollectionSink.output.clear();
+    }
+
+    @Test
+    public void testTimestamp() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " +
+                "WITH ('kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+        tableEnv.sqlUpdate("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
+        tableEnv.execute("test");
+
+        KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
+        assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
+
+        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        List<RowResult> rows = new ArrayList<>();
+        scanner.forEach(rows::add);
+
+        assertEquals(1, rows.size());
+        assertEquals("f", rows.get(0).getString(0));
+        assertEquals(Timestamp.valueOf("2020-01-01 12:12:12.123"), rows.get(0).getTimestamp(1));
+    }
+
+    @Test
+    public void testDatatypes() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," +
+                "`fourth` TINYINT, `fifth` SMALLINT, `sixth` INT, `seventh` BIGINT, `eighth` FLOAT, `ninth` DOUBLE, " +
+                "`tenth` TIMESTAMP)" +
+                "WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
+
+        tableEnv.sqlUpdate("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," +
+                "cast(34 as SMALLINT), 56, cast(78 as BIGINT), cast(3.14 as FLOAT), cast(1.2345 as DOUBLE)," +
+                "TIMESTAMP '2020-04-15 12:34:56.123') ");
+
+        tableEnv.execute("test");
+        validateManyTypes("TestTable8");
+    }
+
+    @Test
+    public void testMissingPropertiesCatalog() throws Exception {
+        assertThrows(TableException.class,
+                () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9a (`first` STRING, `second` String) " +
+                        "WITH ('kudu.primary-key-columns' = 'second')"));
+        assertThrows(TableException.class,
+                () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+                        "WITH ('kudu.hash-columns' = 'first')"));
+        assertThrows(TableException.class,
+                () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
+                        "WITH ('kudu.primary-key-columns' = 'second', 'kudu.hash-columns' = 'first')"));
+    }
+
+    private void validateManyTypes(String tableName) throws Exception {
+        KuduTable kuduTable = harness.getClient().openTable(tableName);
+        Schema schema = kuduTable.getSchema();
+
+        assertEquals(Type.STRING, schema.getColumn("first").getType());
+        assertEquals(Type.BOOL, schema.getColumn("second").getType());
+        assertEquals(Type.BINARY, schema.getColumn("third").getType());
+        assertEquals(Type.INT8, schema.getColumn("fourth").getType());
+        assertEquals(Type.INT16, schema.getColumn("fifth").getType());
+        assertEquals(Type.INT32, schema.getColumn("sixth").getType());
+        assertEquals(Type.INT64, schema.getColumn("seventh").getType());
+        assertEquals(Type.FLOAT, schema.getColumn("eighth").getType());
+        assertEquals(Type.DOUBLE, schema.getColumn("ninth").getType());
+        assertEquals(Type.UNIXTIME_MICROS, schema.getColumn("tenth").getType());
+
+        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        List<RowResult> rows = new ArrayList<>();
+        scanner.forEach(rows::add);
+
+        assertEquals(1, rows.size());
+        assertEquals("f", rows.get(0).getString(0));
+        assertEquals(false, rows.get(0).getBoolean(1));
+        assertEquals(ByteBuffer.wrap("bbbb".getBytes()), rows.get(0).getBinary(2));
+        assertEquals(12, rows.get(0).getByte(3));
+        assertEquals(34, rows.get(0).getShort(4));
+        assertEquals(56, rows.get(0).getInt(5));
+        assertEquals(78, rows.get(0).getLong(6));
+        assertEquals(3.14, rows.get(0).getFloat(7), 0.01);
+        assertEquals(1.2345, rows.get(0).getDouble(8), 0.0001);
+        assertEquals(Timestamp.valueOf("2020-04-15 12:34:56.123"), rows.get(0).getTimestamp(9));
+    }
+
+    private void validateMultiKey(String tableName) throws Exception {
+        KuduTable kuduTable = harness.getClient().openTable(tableName);
+        Schema schema = kuduTable.getSchema();
+
+        assertEquals(2, schema.getPrimaryKeyColumnCount());
+        assertEquals(3, schema.getColumnCount());
+
+        assertTrue(schema.getColumn("first").isKey());
+        assertTrue(schema.getColumn("second").isKey());
+
+        assertFalse(schema.getColumn("third").isKey());
+
+        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        List<RowResult> rows = new ArrayList<>();
+        scanner.forEach(rows::add);
+
+        assertEquals(1, rows.size());
+        assertEquals("f", rows.get(0).getString("first"));
+        assertEquals(2, rows.get(0).getInt("second"));
+        assertEquals("t", rows.get(0).getString("third"));
+    }
+
+    public static class CollectionSink<T> implements SinkFunction<T> {
+
+        public static List<Object> output = Collections.synchronizedList(new ArrayList<>());
+
+        public void invoke(T value, SinkFunction.Context context) throws Exception {
+            output.add(value);
+        }
+
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
new file mode 100644
index 0000000..398ba9f
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class KuduTableFactoryTest extends KuduTestBase {
+    private StreamTableEnvironment tableEnv;
+    private String kuduMasters;
+
+    @BeforeEach
+    public void init() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
+        tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
+        kuduMasters = harness.getMasterAddressesAsString();
+    }
+
+    @Test
+    public void testMissingTable() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+                "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "')");
+        assertThrows(NullPointerException.class,
+                () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
+    }
+
+    @Test
+    public void testMissingMasters() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+                "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11')");
+        assertThrows(NullPointerException.class,
+                () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
+    }
+
+    @Test
+    public void testNonExistingTable() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
+                "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "')");
+        tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)");
+        assertThrows(java.util.concurrent.ExecutionException.class,
+                () -> tableEnv.execute("test"));
+    }
+
+    @Test
+    public void testCreateTable() throws Exception {
+        tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
+                "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "', " +
+                "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+        tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 's')");
+        tableEnv.execute("test");
+
+        validateSingleKey("TestTable11");
+    }
+    @Test
+    public void testTimestamp() throws Exception {
+        // Timestamp should be bridged to sql.Timestamp
+        // Test it when creating the table...
+        tableEnv.sqlUpdate("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
+                "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "', " +
+                "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+        tableEnv.sqlUpdate("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
+        tableEnv.execute("test");
+
+        // And also when inserting into existing table
+        tableEnv.sqlUpdate("CREATE TABLE TestTableTsE (`first` STRING, `second` TIMESTAMP(3)) " +
+                "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "')");
+
+        tableEnv.sqlUpdate("INSERT INTO TestTableTsE values ('s', TIMESTAMP '2020-02-02 23:23:23')");
+        tableEnv.execute("test");
+
+        KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
+        assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
+
+        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        HashSet<Timestamp> results = new HashSet<>();
+        scanner.forEach(sc -> results.add(sc.getTimestamp("second")));
+
+        assertEquals(2, results.size());
+        List<Timestamp> expected = Lists.newArrayList(
+                Timestamp.valueOf("2020-01-01 12:12:12.123"),
+                Timestamp.valueOf("2020-02-02 23:23:23"));
+        assertEquals(new HashSet<>(expected), results);
+    }
+
+    @Test
+    public void testExistingTable() throws Exception {
+        // Creating a table
+        tableEnv.sqlUpdate("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
+                "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "', " +
+                "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
+
+        tableEnv.sqlUpdate("INSERT INTO TestTable12 values ('f', 's')");
+        tableEnv.execute("test");
+
+        // Then another one in SQL that refers to the previously created one
+        tableEnv.sqlUpdate("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
+                "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "')");
+        tableEnv.sqlUpdate("INSERT INTO TestTable12b values ('f2','s2')");
+        tableEnv.execute("test2");
+
+        // Validate that both insertions were into the same table
+        KuduTable kuduTable = harness.getClient().openTable("TestTable12");
+        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        List<RowResult> rows = new ArrayList<>();
+        scanner.forEach(rows::add);
+
+        assertEquals(2, rows.size());
+        assertEquals("f", rows.get(0).getString("first"));
+        assertEquals("s", rows.get(0).getString("second"));
+        assertEquals("f2", rows.get(1).getString("first"));
+        assertEquals("s2", rows.get(1).getString("second"));
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
new file mode 100644
index 0000000..b522c51
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+
+public class KuduTableTestUtils {
+
+    public static StreamTableEnvironment createTableEnvWithBlinkPlannerBatchMode(StreamExecutionEnvironment env) {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+        return tableEnv;
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
new file mode 100644
index 0000000..f37b40d
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Update;
+import org.apache.kudu.client.Upsert;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractOperationTest {
+
+    public static final Schema tableSchema = KuduTestBase.booksTableInfo("test_table", true).getSchema();
+    @Mock
+    Insert mockInsert;
+    @Mock
+    Upsert mockUpsert;
+    @Mock
+    Update mockUpdate;
+    @Mock
+    Delete mockDelete;
+    @Mock
+    KuduTable mockTable;
+    @Mock
+    PartialRow mockPartialRow;
+
+    @BeforeEach
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        when(mockInsert.getRow()).thenReturn(mockPartialRow);
+        when(mockUpsert.getRow()).thenReturn(mockPartialRow);
+        when(mockUpdate.getRow()).thenReturn(mockPartialRow);
+        when(mockDelete.getRow()).thenReturn(mockPartialRow);
+        when(mockTable.newInsert()).thenReturn(mockInsert);
+        when(mockTable.newUpsert()).thenReturn(mockUpsert);
+        when(mockTable.newUpdate()).thenReturn(mockUpdate);
+        when(mockTable.newDelete()).thenReturn(mockDelete);
+        when(mockTable.getSchema()).thenReturn(tableSchema);
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
new file mode 100644
index 0000000..45e0b1b
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.PojoOperationMapper;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class PojoOperationMapperTest extends AbstractOperationTest {
+
+    @Test
+    void testPojoMapper() {
+
+        PojoOperationMapper<BookInfo> mapper = new PojoOperationMapper<>(BookInfo.class, KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+
+        BookInfo bookInfo = KuduTestBase.booksDataPojo().get(0);
+
+        assertEquals(bookInfo.id, mapper.getField(bookInfo, 0));
+        assertEquals(bookInfo.title, mapper.getField(bookInfo, 1));
+        assertEquals(bookInfo.author, mapper.getField(bookInfo, 2));
+        assertEquals(bookInfo.price, mapper.getField(bookInfo, 3));
+        assertEquals(bookInfo.quantity, mapper.getField(bookInfo, 4));
+
+        List<Operation> operations = mapper.createOperations(bookInfo, mockTable);
+        assertEquals(1, operations.size());
+
+        PartialRow row = operations.get(0).getRow();
+        Mockito.verify(row, Mockito.times(1)).addObject("id", bookInfo.id);
+        Mockito.verify(row, Mockito.times(1)).addObject("quantity", bookInfo.quantity);
+
+        Mockito.verify(row, Mockito.times(1)).addObject("title", bookInfo.title);
+        Mockito.verify(row, Mockito.times(1)).addObject("author", bookInfo.author);
+
+        Mockito.verify(row, Mockito.times(1)).addObject("price", bookInfo.price);
+    }
+
+    @Test
+    public void testFieldInheritance() {
+        PojoOperationMapper<Second> mapper = new PojoOperationMapper<>(Second.class, new String[]{"s1", "i1", "i2"}, AbstractSingleOperationMapper.KuduOperation.INSERT);
+        Second s = new Second();
+        assertEquals("s1", mapper.getField(s, 0));
+        assertEquals(1, mapper.getField(s, 1));
+        assertEquals(2, mapper.getField(s, 2));
+    }
+
+    private static class First {
+        private int i1 = 1;
+        public int i2 = 2;
+        private String s1 = "ignore";
+    }
+
+    private static class Second extends First {
+        private String s1 = "s1";
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
new file mode 100644
index 0000000..e737063
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.types.Row;
+
+import org.apache.kudu.client.Operation;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+
+public class RowOperationMapperTest extends AbstractOperationTest {
+
+    @Test
+    void testGetField() {
+        RowOperationMapper mapper = new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+        Row inputRow = KuduTestBase.booksDataRow().get(0);
+
+        for (int i = 0; i < inputRow.getArity(); i++) {
+            Assertions.assertEquals(inputRow.getField(i), mapper.getField(inputRow, i));
+        }
+    }
+
+    @Test
+    void testCorrectOperationInsert() {
+        RowOperationMapper mapper = new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+        Row inputRow = KuduTestBase.booksDataRow().get(0);
+
+        List<Operation> operations = mapper.createOperations(inputRow, mockTable);
+
+        assertEquals(1, operations.size());
+        verify(mockTable).newInsert();
+    }
+
+    @Test
+    void testCorrectOperationUpsert() {
+        RowOperationMapper mapper = new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.UPSERT);
+        Row inputRow = KuduTestBase.booksDataRow().get(0);
+
+        List<Operation> operations = mapper.createOperations(inputRow, mockTable);
+
+        assertEquals(1, operations.size());
+        verify(mockTable).newUpsert();
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
new file mode 100644
index 0000000..308a011
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
+
+import org.apache.kudu.client.Operation;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+
+public class TupleOpertaionMapperTest extends AbstractOperationTest {
+    @Test
+    void testGetField() {
+        TupleOperationMapper<Tuple5<Integer, String, String, Double, Integer>> mapper =
+                new TupleOperationMapper<>(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+        Tuple5<Integer, String, String, Double, Integer> inputTuple = KuduTestBase.booksDataTuple().get(0);
+
+        for (int i = 0; i < inputTuple.getArity(); i++) {
+            Assertions.assertEquals(inputTuple.getField(i), mapper.getField(inputTuple, i));
+        }
+    }
+
+    @Test
+    void testCorrectOperationInsert() {
+        TupleOperationMapper<Tuple5<Integer, String, String, Double, Integer>> mapper =
+                new TupleOperationMapper<>(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT);
+        Tuple5<Integer, String, String, Double, Integer> inputTuple = KuduTestBase.booksDataTuple().get(0);
+
+        List<Operation> operations = mapper.createOperations(inputTuple, mockTable);
+
+        assertEquals(1, operations.size());
+        verify(mockTable).newInsert();
+    }
+
+    @Test
+    void testCorrectOperationUpsert() {
+        TupleOperationMapper<Tuple5<Integer, String, String, Double, Integer>> mapper =
+                new TupleOperationMapper<>(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.UPSERT);
+        Tuple5<Integer, String, String, Double, Integer> inputTuple = KuduTestBase.booksDataTuple().get(0);
+
+        List<Operation> operations = mapper.createOperations(inputTuple, mockTable);
+
+        assertEquals(1, operations.size());
+        verify(mockTable).newUpsert();
+    }
+}