[BAHIR-214] Improve speed and solve eventual consistence issues (#64)

* resolve eventual consistency issues
* improve speed special on eventual consistency stream
* Update Readme
diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 9b75aa7..8692ca5 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -29,15 +29,19 @@
 // create a table info object
 KuduTableInfo tableInfo = KuduTableInfo.Builder
         .create("books")
-        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
-        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
-        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
-        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
-        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+        .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
+        .addColumn(KuduColumnInfo.Builder.createString("title").build())
+        .addColumn(KuduColumnInfo.Builder.createString("author").build())
+        .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
+        .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
         .build();
-    
+// create a reader configuration
+KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+        .setMasters("172.25.0.6")
+        .setRowLimit(1000)
+        .build();    
 // Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
-env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
+env.createInput(new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe()))
         .count();
         
 env.execute();
@@ -54,18 +58,23 @@
 KuduTableInfo tableInfo = KuduTableInfo.Builder
         .create("books")
         .createIfNotExist(true)
-        .replicas(1)
-        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
-        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
-        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
-        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
-        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+        .replicas(3)
+        .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
+        .addColumn(KuduColumnInfo.Builder.createString("title").build())
+        .addColumn(KuduColumnInfo.Builder.createString("author").build())
+        .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
+        .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
         .build();
-
+// create a writer configuration
+KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+        .setMasters("172.25.0.6")
+        .setUpsertWrite()
+        .setStrongConsistency()
+        .build();
 ...
 
 env.fromCollection(books)
-        .output(new KuduOutputFormat<>("172.25.0.6", tableInfo));
+        .output(new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe()));
 
 env.execute();
 ```
@@ -81,18 +90,23 @@
 KuduTableInfo tableInfo = KuduTableInfo.Builder
         .create("books")
         .createIfNotExist(true)
-        .replicas(1)
-        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
-        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
-        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
-        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
-        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+        .replicas(3)
+        .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
+        .addColumn(KuduColumnInfo.Builder.createString("title").build())
+        .addColumn(KuduColumnInfo.Builder.createString("author").build())
+        .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
+        .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
         .build();
-
+// create a writer configuration
+KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+        .setMasters("172.25.0.6")
+        .setUpsertWrite()
+        .setStrongConsistency()
+        .build();
 ...
 
 env.fromCollection(books)
-    .addSink(new KuduSink<>("172.25.0.6", tableInfo));
+    .addSink(new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()));
 
 env.execute();
 ```
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
new file mode 100644
index 0000000..3a35e6a
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.batch;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.connectors.kudu.connector.serde.KuduDeserialization;
+import org.apache.kudu.client.KuduException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class KuduInputFormat<OUT> extends RichInputFormat<OUT, KuduInputSplit> {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final KuduReaderConfig readerConfig;
+    private final KuduTableInfo tableInfo;
+    private final KuduDeserialization<OUT> deserializer;
+
+    private List<KuduFilterInfo> tableFilters;
+    private List<String> tableProjections;
+
+    private boolean endReached;
+
+    private transient KuduReader kuduReader;
+    private transient KuduReaderIterator resultIterator;
+
+    public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer) {
+        this(readerConfig, tableInfo, deserializer, new ArrayList<>(), new ArrayList<>());
+    }
+    public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+
+        this.readerConfig = checkNotNull(readerConfig,"readerConfig could not be null");
+        this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
+        this.deserializer = checkNotNull(deserializer,"deserializer could not be null");
+        this.tableFilters = checkNotNull(tableFilters,"tableFilters could not be null");
+        this.tableProjections = checkNotNull(tableProjections,"tableProjections could not be null");
+
+        this.endReached = false;
+    }
+
+    @Override
+    public void configure(Configuration parameters) {
+    }
+
+    @Override
+    public void open(KuduInputSplit split) throws IOException {
+        endReached = false;
+        startKuduReader();
+
+        resultIterator = kuduReader.scanner(split.getScanToken());
+    }
+
+    private void startKuduReader() throws IOException {
+        if (kuduReader == null) {
+            kuduReader = new KuduReader(tableInfo, readerConfig, tableFilters, tableProjections);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (resultIterator != null) {
+            try {
+                resultIterator.close();
+            } catch (KuduException e) {
+                e.printStackTrace();
+            }
+        }
+        if (kuduReader != null) {
+            kuduReader.close();
+        }
+    }
+
+    @Override
+    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+        return cachedStatistics;
+    }
+
+    @Override
+    public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) {
+        return new LocatableInputSplitAssigner(inputSplits);
+    }
+
+    @Override
+    public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+        startKuduReader();
+        return kuduReader.createInputSplits(minNumSplits);
+    }
+
+    @Override
+    public boolean reachedEnd() {
+        return endReached;
+    }
+
+    @Override
+    public OUT nextRecord(OUT reuse) throws IOException {
+        // check that current iterator has next rows
+        if (this.resultIterator.hasNext()) {
+            KuduRow row = resultIterator.next();
+            return deserializer.deserialize(row);
+        } else {
+            endReached = true;
+            return null;
+        }
+    }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
new file mode 100644
index 0000000..9d7d017
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.batch;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class KuduOutputFormat<IN> extends RichOutputFormat<IN> implements CheckpointedFunction {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final KuduTableInfo tableInfo;
+    private final KuduWriterConfig writerConfig;
+    private final KuduFailureHandler failureHandler;
+    private final KuduSerialization<IN> serializer;
+
+    private transient KuduWriter kuduWriter;
+
+    public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer) {
+        this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+    }
+
+    public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer, KuduFailureHandler failureHandler) {
+        this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
+        this.writerConfig = checkNotNull(writerConfig,"config could not be null");
+        this.serializer = checkNotNull(serializer,"serializer could not be null");
+        this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+    }
+
+    @Override
+    public void configure(Configuration parameters) {
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
+
+        serializer.withSchema(kuduWriter.getSchema());
+    }
+
+    @Override
+    public void writeRecord(IN row) throws IOException {
+        final KuduRow kuduRow = serializer.serialize(row);
+        kuduWriter.write(kuduRow);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (kuduWriter != null) {
+            kuduWriter.close();
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        kuduWriter.flushAndCheckErrors();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
similarity index 97%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
index fa7472f..ff8a601 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
@@ -14,13 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
 
 import java.io.Serializable;
 
+@PublicEvolving
 public class KuduColumnInfo implements Serializable {
 
     private String name;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
similarity index 97%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index 1a7582d..c37bc9a 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -14,15 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduPredicate;
 
 import java.util.List;
 
-
+@PublicEvolving
 public class KuduFilterInfo {
 
     private String column;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
similarity index 95%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
index 3c57a1b..af78361 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
@@ -14,8 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.types.Row;
 
 import java.lang.reflect.Field;
@@ -26,6 +27,7 @@
 import java.util.Map;
 import java.util.stream.Stream;
 
+@PublicEvolving
 public class KuduRow extends Row {
 
     private Map<String, Integer> rowNames;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
similarity index 96%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index dfea382..eb63b3f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -14,8 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
@@ -24,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+@PublicEvolving
 public class KuduTableInfo implements Serializable {
 
     private static final Integer DEFAULT_REPLICAS = 1;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
new file mode 100644
index 0000000..7548033
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.failure;
+
+import org.apache.kudu.client.RowError;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DefaultKuduFailureHandler implements KuduFailureHandler {
+    @Override
+    public void onFailure(List<RowError> failure) throws IOException {
+        String errors = failure.stream()
+                .map(error -> error.toString() + System.lineSeparator())
+                .collect(Collectors.joining());
+        throw new IOException("Error while sending value. \n " + errors);
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
new file mode 100644
index 0000000..42de4f7
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.failure;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.kudu.client.RowError;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+@PublicEvolving
+public interface KuduFailureHandler extends Serializable {
+
+    /**
+     * Handle a failed {@link List<RowError>}.
+     *
+     * @param failure the cause of failure
+     * @throws IOException if the sink should fail on this failure, the implementation should rethrow the throwable or a custom one
+     */
+    void onFailure(List<RowError> failure) throws IOException;
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
new file mode 100644
index 0000000..a809106
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+
+public class KuduInputSplit extends LocatableInputSplit {
+
+    private byte[] scanToken;
+
+    /**
+     * Creates a new KuduInputSplit
+     * @param splitNumber the number of the input split
+     * @param hostnames The names of the hosts storing the data this input split refers to.
+     */
+    public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
+        super(splitNumber, hostnames);
+
+        this.scanToken = scanToken;
+    }
+
+    public byte[] getScanToken() {
+        return scanToken;
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
new file mode 100644
index 0000000..9c6e790
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Internal
+public class KuduReader implements AutoCloseable {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final KuduTableInfo tableInfo;
+    private final KuduReaderConfig readerConfig;
+    private final List<KuduFilterInfo> tableFilters;
+    private final List<String> tableProjections;
+
+    private transient KuduClient client;
+    private transient KuduSession session;
+    private transient KuduTable table;
+
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig)  throws IOException {
+        this(tableInfo, readerConfig, new ArrayList<>(), new ArrayList<>());
+    }
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters)  throws IOException {
+        this(tableInfo, readerConfig, tableFilters, new ArrayList<>());
+    }
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections)  throws IOException {
+        this.tableInfo = tableInfo;
+        this.readerConfig = readerConfig;
+        this.tableFilters = tableFilters;
+        this.tableProjections = tableProjections;
+
+        this.client = obtainClient();
+        this.session = obtainSession();
+        this.table = obtainTable();
+    }
+
+    private KuduClient obtainClient() {
+        return new KuduClient.KuduClientBuilder(readerConfig.getMasters()).build();
+    }
+
+    private KuduSession obtainSession() {
+        return client.newSession();
+    }
+
+    private KuduTable obtainTable() throws IOException {
+        String tableName = tableInfo.getName();
+        if (client.tableExists(tableName)) {
+            return client.openTable(tableName);
+        }
+        if (tableInfo.createIfNotExist()) {
+            return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
+        }
+        throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+    }
+
+    public KuduReaderIterator scanner(byte[] token) throws IOException {
+        return new KuduReaderIterator(KuduScanToken.deserializeIntoScanner(token, client));
+    }
+
+    public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) {
+        KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
+
+        if (CollectionUtils.isNotEmpty(tableProjections)) {
+            tokenBuilder.setProjectedColumnNames(tableProjections);
+        }
+
+        if (CollectionUtils.isNotEmpty(tableFilters)) {
+            tableFilters.stream()
+                    .map(filter -> filter.toPredicate(table.getSchema()))
+                    .forEach(tokenBuilder::addPredicate);
+        }
+
+        if (rowLimit !=null && rowLimit > 0) {
+            tokenBuilder.limit(rowLimit);
+        }
+
+        return tokenBuilder.build();
+    }
+
+    public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+
+        List<KuduScanToken> tokens = scanTokens(tableFilters, tableProjections, readerConfig.getRowLimit());
+
+        KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
+
+        for (int i = 0; i < tokens.size(); i++) {
+            KuduScanToken token = tokens.get(i);
+
+            List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+
+            for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+                locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
+            }
+
+            KuduInputSplit split = new KuduInputSplit(
+                    token.serialize(),
+                    i,
+                    locations.toArray(new String[locations.size()])
+            );
+            splits[i] = split;
+        }
+
+        if (splits.length < minNumSplits) {
+            log.warn(" The minimum desired number of splits with your configured parallelism level " +
+                            "is {}. Current kudu splits = {}. {} instances will remain idle.",
+                    minNumSplits,
+                    splits.length,
+                    (minNumSplits - splits.length)
+            );
+        }
+
+        return splits;
+    }
+
+    /**
+     * Returns a endpoint url in the following format: <host>:<ip>
+     *
+     * @param host Hostname
+     * @param port Port
+     * @return Formatted URL
+     */
+    private String getLocation(String host, Integer port) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(host).append(":").append(port);
+        return builder.toString();
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (session != null) {
+                session.close();
+            }
+        } catch (Exception e) {
+            log.error("Error while closing session.", e);
+        }
+        try {
+            if (client != null) {
+                client.close();
+            }
+        } catch (Exception e) {
+            log.error("Error while closing client.", e);
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
new file mode 100644
index 0000000..6f5f079
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@PublicEvolving
+public class KuduReaderConfig implements Serializable {
+
+    private final String masters;
+    private final Integer rowLimit;
+
+    private KuduReaderConfig(
+            String masters,
+            Integer rowLimit) {
+
+        this.masters = checkNotNull(masters, "Kudu masters cannot be null");
+        this.rowLimit = checkNotNull(rowLimit, "Kudu rowLimit cannot be null");;
+    }
+
+    public String getMasters() {
+        return masters;
+    }
+
+    public Integer getRowLimit() {
+        return rowLimit;
+    }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this)
+                .append("masters", masters)
+                .append("rowLimit", rowLimit)
+                .toString();
+    }
+
+    /**
+     * Builder for the {@link KuduReaderConfig}.
+     */
+    public static class Builder {
+        private String masters;
+        private Integer rowLimit = 0;
+
+        private Builder(String masters) {
+            this.masters = masters;
+        }
+
+        public static Builder setMasters(String masters) {
+            return new Builder(masters);
+        }
+
+        public Builder setRowLimit(Integer rowLimit) {
+            this.rowLimit = rowLimit;
+            return this;
+        }
+
+        public KuduReaderConfig build() {
+            return new KuduReaderConfig(
+                    masters,
+                    rowLimit);
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
new file mode 100644
index 0000000..4a8e69c
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+@Internal
+public class KuduReaderIterator {
+
+    private KuduScanner scanner;
+    private RowResultIterator rowIterator;
+
+    public KuduReaderIterator(KuduScanner scanner) throws KuduException {
+        this.scanner = scanner;
+        nextRows();
+    }
+
+    public void close() throws KuduException {
+        scanner.close();
+    }
+
+    public boolean hasNext() throws KuduException {
+        if (rowIterator.hasNext()) {
+            return true;
+        } else if (scanner.hasMoreRows()) {
+            nextRows();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public KuduRow next() {
+        RowResult row = this.rowIterator.next();
+        return toKuduRow(row);
+    }
+
+    private void nextRows() throws KuduException {
+        this.rowIterator = scanner.nextRows();
+    }
+
+    private KuduRow toKuduRow(RowResult row) {
+        Schema schema = row.getColumnProjection();
+
+        KuduRow values = new KuduRow(schema.getColumnCount());
+        schema.getColumns().forEach(column -> {
+            String name = column.getName();
+            int pos = schema.getColumnIndex(name);
+            if(row.isNull(name)) {
+                values.setField(pos, name, null);
+            } else {
+                Type type = column.getType();
+                switch (type) {
+                    case BINARY:
+                        values.setField(pos, name, row.getBinary(name));
+                        break;
+                    case STRING:
+                        values.setField(pos, name, row.getString(name));
+                        break;
+                    case BOOL:
+                        values.setField(pos, name, row.getBoolean(name));
+                        break;
+                    case DOUBLE:
+                        values.setField(pos, name, row.getDouble(name));
+                        break;
+                    case FLOAT:
+                        values.setField(pos, name, row.getFloat(name));
+                        break;
+                    case INT8:
+                        values.setField(pos, name, row.getByte(name));
+                        break;
+                    case INT16:
+                        values.setField(pos, name, row.getShort(name));
+                        break;
+                    case INT32:
+                        values.setField(pos, name, row.getInt(name));
+                        break;
+                    case INT64:
+                        values.setField(pos, name, row.getLong(name));
+                        break;
+                    case UNIXTIME_MICROS:
+                        values.setField(pos, name, row.getLong(name) / 1000);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Illegal var type: " + type);
+                }
+            }
+        });
+        return values;
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
similarity index 90%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
index c12eb42..36584b5 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
 
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
 import org.apache.kudu.Schema;
 
 public class DefaultSerDe implements KuduSerialization<KuduRow>, KuduDeserialization<KuduRow> {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
similarity index 87%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
index 355a516..190c4c7 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
 
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
 
 import java.io.Serializable;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
similarity index 88%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
index 99db1dc..b13c59b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
 
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
 import org.apache.kudu.Schema;
 
 import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
similarity index 95%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
index 1063aa2..bc57174 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.kudu.Schema;
 
 import java.lang.reflect.Constructor;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
new file mode 100644
index 0000000..f4e2a8a
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+@Internal
+public class KuduWriter implements AutoCloseable {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final KuduTableInfo tableInfo;
+    private final KuduWriterConfig writerConfig;
+    private final KuduFailureHandler failureHandler;
+
+    private transient KuduClient client;
+    private transient KuduSession session;
+    private transient KuduTable table;
+
+
+    public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig) throws IOException {
+        this (tableInfo, writerConfig, new DefaultKuduFailureHandler());
+    }
+    public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduFailureHandler failureHandler) throws IOException {
+        this.tableInfo = tableInfo;
+        this.writerConfig = writerConfig;
+        this.failureHandler = failureHandler;
+
+        this.client = obtainClient();
+        this.session = obtainSession();
+        this.table = obtainTable();
+    }
+
+    private KuduClient obtainClient() {
+        return new KuduClient.KuduClientBuilder(writerConfig.getMasters()).build();
+    }
+
+    private KuduSession obtainSession() {
+        KuduSession session = client.newSession();
+        session.setFlushMode(writerConfig.getFlushMode());
+        return session;
+    }
+
+    private KuduTable obtainTable() throws IOException {
+        String tableName = tableInfo.getName();
+        if (client.tableExists(tableName)) {
+            return client.openTable(tableName);
+        }
+        if (tableInfo.createIfNotExist()) {
+            return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
+        }
+        throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+    }
+
+    public Schema getSchema() {
+        return table.getSchema();
+    }
+
+    public void write(KuduRow row) throws IOException {
+        checkAsyncErrors();
+
+        final Operation operation = mapToOperation(row);
+        final OperationResponse response = session.apply(operation);
+
+        checkErrors(response);
+    }
+
+    public void flushAndCheckErrors() throws IOException {
+        checkAsyncErrors();
+        flush();
+        checkAsyncErrors();
+    }
+
+    @VisibleForTesting
+    public DeleteTableResponse deleteTable() throws IOException {
+        String tableName = table.getName();
+        return client.deleteTable(tableName);
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            flushAndCheckErrors();
+        } finally {
+            try {
+                if (session != null) {
+                    session.close();
+                }
+            } catch (Exception e) {
+                log.error("Error while closing session.", e);
+            }
+            try {
+                if (client != null) {
+                    client.close();
+                }
+            } catch (Exception e) {
+                log.error("Error while closing client.", e);
+            }
+        }
+    }
+
+    private void flush() throws IOException {
+        session.flush();
+    }
+
+    private void checkErrors(OperationResponse response) throws IOException {
+        if (response != null && response.hasRowError()) {
+            failureHandler.onFailure(Arrays.asList(response.getRowError()));
+        } else {
+            checkAsyncErrors();
+        }
+    }
+
+    private void checkAsyncErrors() throws IOException {
+        if (session.countPendingErrors() == 0) return;
+
+        List<RowError> errors = Arrays.asList(session.getPendingErrors().getRowErrors());
+        failureHandler.onFailure(errors);
+    }
+
+    private Operation mapToOperation(KuduRow row) {
+        final Operation operation = obtainOperation();
+        final PartialRow partialRow = operation.getRow();
+
+        table.getSchema().getColumns().forEach(column -> {
+            String columnName = column.getName();
+            Object value = row.getField(column.getName());
+
+            if (value == null) {
+                partialRow.setNull(columnName);
+            } else {
+                Type type = column.getType();
+                switch (type) {
+                    case STRING:
+                        partialRow.addString(columnName, (String) value);
+                        break;
+                    case FLOAT:
+                        partialRow.addFloat(columnName, (Float) value);
+                        break;
+                    case INT8:
+                        partialRow.addByte(columnName, (Byte) value);
+                        break;
+                    case INT16:
+                        partialRow.addShort(columnName, (Short) value);
+                        break;
+                    case INT32:
+                        partialRow.addInt(columnName, (Integer) value);
+                        break;
+                    case INT64:
+                        partialRow.addLong(columnName, (Long) value);
+                        break;
+                    case DOUBLE:
+                        partialRow.addDouble(columnName, (Double) value);
+                        break;
+                    case BOOL:
+                        partialRow.addBoolean(columnName, (Boolean) value);
+                        break;
+                    case UNIXTIME_MICROS:
+                        //*1000 to correctly create date on kudu
+                        partialRow.addLong(columnName, ((Long) value) * 1000);
+                        break;
+                    case BINARY:
+                        partialRow.addBinary(columnName, (byte[]) value);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Illegal var type: " + type);
+                }
+            }
+        });
+        return operation;
+    }
+
+    private Operation obtainOperation() {
+        switch (writerConfig.getWriteMode()) {
+            case INSERT: return table.newInsert();
+            case UPDATE: return table.newUpdate();
+            case UPSERT: return table.newUpsert();
+        }
+        return table.newUpsert();
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
new file mode 100644
index 0000000..13672d5
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.kudu.client.SessionConfiguration.FlushMode;
+
+@PublicEvolving
+public class KuduWriterConfig implements Serializable {
+
+    private final String masters;
+    private final FlushMode flushMode;
+    private final KuduWriterMode writeMode;
+
+    private KuduWriterConfig(
+            String masters,
+            FlushMode flushMode,
+            KuduWriterMode writeMode) {
+
+        this.masters = checkNotNull(masters, "Kudu masters cannot be null");
+        this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null");
+        this.writeMode = checkNotNull(writeMode, "Kudu write mode cannot be null");
+    }
+
+    public String getMasters() {
+        return masters;
+    }
+
+    public KuduWriterMode getWriteMode() {
+        return writeMode;
+    }
+
+    public FlushMode getFlushMode() {
+        return flushMode;
+    }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this)
+                .append("masters", masters)
+                .append("flushMode", flushMode)
+                .append("writeMode", writeMode)
+                .toString();
+    }
+
+    /**
+     * Builder for the {@link KuduWriterConfig}.
+     */
+    public static class Builder {
+        private String masters;
+        private KuduWriterMode writeMode = KuduWriterMode.UPSERT;
+        private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+
+        private Builder(String masters) {
+            this.masters = masters;
+        }
+
+        public static Builder setMasters(String masters) {
+            return new Builder(masters);
+        }
+
+        public Builder setWriteMode(KuduWriterMode writeMode) {
+            this.writeMode = writeMode;
+            return this;
+        }
+        public Builder setUpsertWrite() {
+            return setWriteMode(KuduWriterMode.UPSERT);
+        }
+        public Builder setInsertWrite() {
+            return setWriteMode(KuduWriterMode.INSERT);
+        }
+        public Builder setUpdateWrite() {
+            return setWriteMode(KuduWriterMode.UPDATE);
+        }
+
+        public Builder setConsistency(FlushMode flushMode) {
+            this.flushMode = flushMode;
+            return this;
+        }
+        public Builder setEventualConsistency() {
+            return setConsistency(FlushMode.AUTO_FLUSH_BACKGROUND);
+        }
+        public Builder setStrongConsistency() {
+            return setConsistency(FlushMode.AUTO_FLUSH_SYNC);
+        }
+
+        public KuduWriterConfig build() {
+            return new KuduWriterConfig(
+                    masters,
+                    flushMode,
+                    writeMode);
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
new file mode 100644
index 0000000..27b2ed3
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import static org.apache.kudu.client.SessionConfiguration.*;
+
+public enum KuduWriterConsistency {
+    EVENTUAL(FlushMode.AUTO_FLUSH_BACKGROUND),
+    STRONG(FlushMode.AUTO_FLUSH_SYNC),
+    //CHECKPOINT(FlushMode.MANUAL_FLUSH)
+    ;
+
+    public final FlushMode flushMode;
+
+    KuduWriterConsistency(FlushMode flushMode) {
+        this.flushMode = flushMode;
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
new file mode 100644
index 0000000..8c9eab0
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+public enum KuduWriterMode {
+    INSERT,
+    UPDATE,
+    UPSERT
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
new file mode 100644
index 0000000..d523b67
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.streaming;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@PublicEvolving
+public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final KuduTableInfo tableInfo;
+    private final KuduWriterConfig writerConfig;
+    private final KuduFailureHandler failureHandler;
+    private final KuduSerialization<OUT> serializer;
+
+    private transient KuduWriter kuduWriter;
+
+    public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
+        this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+    }
+
+    public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer, KuduFailureHandler failureHandler) {
+        this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
+        this.writerConfig = checkNotNull(writerConfig,"config could not be null");
+        this.serializer = checkNotNull(serializer,"serializer could not be null");
+        this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
+
+        serializer.withSchema(kuduWriter.getSchema());
+    }
+
+    @Override
+    public void invoke(OUT value) throws Exception {
+        final KuduRow kuduRow = serializer.serialize(value);
+        kuduWriter.write(kuduRow);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (kuduWriter != null) {
+            kuduWriter.close();
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        kuduWriter.flushAndCheckErrors();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+    }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
deleted file mode 100644
index fd126d0..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.streaming.connectors.kudu.connector.*;
-import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanToken;
-import org.apache.kudu.client.LocatedTablet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.KuduInputSplit> {
-
-    private String kuduMasters;
-    private KuduTableInfo tableInfo;
-    private List<KuduFilterInfo> tableFilters;
-    private List<String> tableProjections;
-    private Long rowsLimit;
-    private boolean endReached;
-
-    private transient KuduConnector tableContext;
-    private transient KuduRowIterator resultIterator;
-
-    private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class);
-
-    public KuduInputFormat(String kuduMasters, KuduTableInfo tableInfo) {
-        Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
-        this.kuduMasters = kuduMasters;
-
-        Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
-        this.tableInfo = tableInfo;
-
-        this.endReached = false;
-    }
-
-    public KuduInputFormat withTableFilters(KuduFilterInfo... tableFilters) {
-        return withTableFilters(Arrays.asList(tableFilters));
-    }
-
-    public KuduInputFormat withTableFilters(List<KuduFilterInfo> tableFilters) {
-        this.tableFilters = tableFilters;
-        return this;
-    }
-
-    public KuduInputFormat withTableProjections(String... tableProjections) {
-        return withTableProjections(Arrays.asList(tableProjections));
-    }
-    public KuduInputFormat withTableProjections(List<String> tableProjections) {
-        this.tableProjections = tableProjections;
-        return this;
-    }
-
-    public KuduInputFormat withRowsLimit(Long rowsLimit) {
-        this.rowsLimit = rowsLimit;
-        return this;
-    }
-
-    @Override
-    public void configure(Configuration parameters) {
-
-    }
-
-    @Override
-    public void open(KuduInputSplit split) throws IOException {
-        endReached = false;
-        startTableContext();
-
-        resultIterator = tableContext.scanner(split.getScanToken());
-    }
-
-    @Override
-    public void close() {
-        if (resultIterator != null) {
-            try {
-                resultIterator.close();
-            } catch (KuduException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @Override
-    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-        return cachedStatistics;
-    }
-
-    @Override
-    public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) {
-        return new LocatableInputSplitAssigner(inputSplits);
-    }
-
-    private void startTableContext() throws IOException {
-        if (tableContext == null) {
-            tableContext = new KuduConnector(kuduMasters, tableInfo);
-        }
-    }
-
-    @Override
-    public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
-        startTableContext();
-        Preconditions.checkNotNull(tableContext,"tableContext should not be null");
-
-        List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);
-
-        KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
-
-        for (int i = 0; i < tokens.size(); i++) {
-            KuduScanToken token = tokens.get(i);
-
-            List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
-
-            for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
-                locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
-            }
-
-            KuduInputSplit split = new KuduInputSplit(
-                    token.serialize(),
-                    i,
-                    locations.toArray(new String[locations.size()])
-            );
-            splits[i] = split;
-        }
-
-        if (splits.length < minNumSplits) {
-            LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
-                            "is {}. Current kudu splits = {}. {} instances will remain idle.",
-                    minNumSplits,
-                    splits.length,
-                    (minNumSplits - splits.length)
-            );
-        }
-
-        return splits;
-    }
-
-    @Override
-    public boolean reachedEnd() throws IOException {
-        return endReached;
-    }
-
-    @Override
-    public KuduRow nextRecord(KuduRow reuse) throws IOException {
-        // check that current iterator has next rows
-        if (this.resultIterator.hasNext()) {
-            return resultIterator.next();
-        } else {
-            endReached = true;
-            return null;
-        }
-    }
-
-    /**
-     * Returns a endpoint url in the following format: <host>:<ip>
-     *
-     * @param host Hostname
-     * @param port Port
-     * @return Formatted URL
-     */
-    private String getLocation(String host, Integer port) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(host).append(":").append(port);
-        return builder.toString();
-    }
-
-
-    public class KuduInputSplit extends LocatableInputSplit {
-
-        private byte[] scanToken;
-
-        /**
-         * Creates a new KuduInputSplit
-         * @param splitNumber the number of the input split
-         * @param hostnames The names of the hosts storing the data this input split refers to.
-         */
-        public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
-            super(splitNumber, hostnames);
-
-            this.scanToken = scanToken;
-        }
-
-        public byte[] getScanToken() {
-            return scanToken;
-        }
-    }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
deleted file mode 100644
index c1301da..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
-import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
-
-    private String kuduMasters;
-    private KuduTableInfo tableInfo;
-    private KuduConnector.Consistency consistency;
-    private KuduConnector.WriteMode writeMode;
-
-    private KuduSerialization<OUT> serializer;
-
-    private transient KuduConnector connector;
-
-
-    public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
-        Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
-        this.kuduMasters = kuduMasters;
-
-        Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
-        this.tableInfo = tableInfo;
-        this.consistency = KuduConnector.Consistency.STRONG;
-        this.writeMode = KuduConnector.WriteMode.UPSERT;
-        this.serializer = serializer.withSchema(tableInfo.getSchema());
-    }
-
-
-    public KuduOutputFormat<OUT> withEventualConsistency() {
-        this.consistency = KuduConnector.Consistency.EVENTUAL;
-        return this;
-    }
-
-    public KuduOutputFormat<OUT> withStrongConsistency() {
-        this.consistency = KuduConnector.Consistency.STRONG;
-        return this;
-    }
-
-    public KuduOutputFormat<OUT> withUpsertWriteMode() {
-        this.writeMode = KuduConnector.WriteMode.UPSERT;
-        return this;
-    }
-
-    public KuduOutputFormat<OUT> withInsertWriteMode() {
-        this.writeMode = KuduConnector.WriteMode.INSERT;
-        return this;
-    }
-
-    public KuduOutputFormat<OUT> withUpdateWriteMode() {
-        this.writeMode = KuduConnector.WriteMode.UPDATE;
-        return this;
-    }
-
-    @Override
-    public void configure(Configuration parameters) {
-
-    }
-
-    @Override
-    public void open(int taskNumber, int numTasks) throws IOException {
-        if (connector != null) return;
-        connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode,FlushMode.AUTO_FLUSH_SYNC);
-        serializer = serializer.withSchema(tableInfo.getSchema());
-    }
-
-    @Override
-    public void writeRecord(OUT row) throws IOException {
-        boolean response;
-        try {
-            KuduRow kuduRow = serializer.serialize(row);
-            response = connector.writeRow(kuduRow);
-        } catch (Exception e) {
-            throw new IOException(e.getLocalizedMessage(), e);
-        }
-
-        if(!response) {
-            throw new IOException("error with some transaction");
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (this.connector == null) return;
-        try {
-            this.connector.close();
-        } catch (Exception e) {
-            throw new IOException(e.getLocalizedMessage(), e);
-        }
-    }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
deleted file mode 100644
index b6dd9c8..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
-import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
-
-    private String kuduMasters;
-    private KuduTableInfo tableInfo;
-    private KuduConnector.Consistency consistency;
-    private KuduConnector.WriteMode writeMode;
-    private FlushMode flushMode;
-
-    private KuduSerialization<OUT> serializer;
-
-    private transient KuduConnector connector;
-
-    public KuduSink(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
-        Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
-        this.kuduMasters = kuduMasters;
-
-        Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
-        this.tableInfo = tableInfo;
-        this.consistency = KuduConnector.Consistency.STRONG;
-        this.writeMode = KuduConnector.WriteMode.UPSERT;
-        this.serializer = serializer.withSchema(tableInfo.getSchema());
-    }
-
-    public KuduSink<OUT> withEventualConsistency() {
-        this.consistency = KuduConnector.Consistency.EVENTUAL;
-        return this;
-    }
-
-    public KuduSink<OUT> withStrongConsistency() {
-        this.consistency = KuduConnector.Consistency.STRONG;
-        return this;
-    }
-
-    public KuduSink<OUT> withUpsertWriteMode() {
-        this.writeMode = KuduConnector.WriteMode.UPSERT;
-        return this;
-    }
-
-    public KuduSink<OUT> withInsertWriteMode() {
-        this.writeMode = KuduConnector.WriteMode.INSERT;
-        return this;
-    }
-
-    public KuduSink<OUT> withUpdateWriteMode() {
-        this.writeMode = KuduConnector.WriteMode.UPDATE;
-        return this;
-    }
-
-    public KuduSink<OUT> withSyncFlushMode() {
-        this.flushMode = FlushMode.AUTO_FLUSH_SYNC;
-        return this;
-    }
-
-    public KuduSink<OUT> withAsyncFlushMode() {
-        this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
-        return this;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws IOException {
-        if (this.connector != null) return;
-        this.connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode, getflushMode());
-        this.serializer.withSchema(tableInfo.getSchema());
-    }
-
-    /**
-     * If flink checkpoint is disable,synchronously write data to kudu.
-     * <p>If flink checkpoint is enable, asynchronously write data to kudu by default.
-     *
-     * <p>(Note: async may result in out-of-order writes to Kudu.
-     *  you also can change to sync by explicitly calling {@link KuduSink#withSyncFlushMode()} when initializing KuduSink. )
-     *
-     * @return flushMode
-     */
-    private FlushMode getflushMode() {
-        FlushMode flushMode = FlushMode.AUTO_FLUSH_SYNC;
-        boolean enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
-        if(enableCheckpoint && this.flushMode == null) {
-            flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
-        }
-        if(enableCheckpoint && this.flushMode != null) {
-            flushMode = this.flushMode;
-        }
-        return flushMode;
-    }
-
-    @Override
-    public void invoke(OUT row) throws Exception {
-        KuduRow kuduRow = serializer.serialize(row);
-        boolean response = connector.writeRow(kuduRow);
-
-        if(!response) {
-            throw new IOException("error with some transaction");
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (this.connector == null) return;
-        try {
-            this.connector.close();
-        } catch (Exception e) {
-            throw new IOException(e.getLocalizedMessage(), e);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
-        }
-        this.connector.flush();
-    }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
deleted file mode 100644
index d45886c..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu.connector;
-
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.api.common.time.Time;
-import org.apache.kudu.client.*;
-import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class KuduConnector implements AutoCloseable {
-
-    private final Logger LOG = LoggerFactory.getLogger(this.getClass());
-
-    private Callback<Boolean, OperationResponse> defaultCB;
-
-    public enum Consistency {EVENTUAL, STRONG};
-    public enum WriteMode {INSERT,UPDATE,UPSERT}
-
-    private AsyncKuduClient client;
-    private KuduTable table;
-    private AsyncKuduSession session;
-
-    private Consistency consistency;
-    private WriteMode writeMode;
-
-    private static AtomicInteger pendingTransactions = new AtomicInteger();
-    private static AtomicBoolean errorTransactions = new AtomicBoolean(false);
-
-    public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException {
-        this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT,FlushMode.AUTO_FLUSH_SYNC);
-    }
-
-    public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode,FlushMode flushMode) throws IOException {
-        this.client = client(kuduMasters);
-        this.table = table(tableInfo);
-        this.session = client.newSession();
-        this.consistency = consistency;
-        this.writeMode = writeMode;
-        this.defaultCB = new ResponseCallback();
-        this.session.setFlushMode(flushMode);
-    }
-
-    private AsyncKuduClient client(String kuduMasters) {
-        return new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasters).build();
-    }
-
-    private KuduTable table(KuduTableInfo infoTable) throws IOException {
-        KuduClient syncClient = client.syncClient();
-
-        String tableName = infoTable.getName();
-        if (syncClient.tableExists(tableName)) {
-            return syncClient.openTable(tableName);
-        }
-        if (infoTable.createIfNotExist()) {
-            return syncClient.createTable(tableName, infoTable.getSchema(), infoTable.getCreateTableOptions());
-        }
-        throw new UnsupportedOperationException("table not exists and is marketed to not be created");
-    }
-
-    public boolean deleteTable() throws IOException {
-        String tableName = table.getName();
-        client.syncClient().deleteTable(tableName);
-        return true;
-    }
-
-    public KuduRowIterator scanner(byte[] token) throws IOException {
-        return new KuduRowIterator(KuduScanToken.deserializeIntoScanner(token, client.syncClient()));
-    }
-
-    public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) {
-        KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.syncClient().newScanTokenBuilder(table);
-
-        if (CollectionUtils.isNotEmpty(tableProjections)) {
-            tokenBuilder.setProjectedColumnNames(tableProjections);
-        }
-
-        if (CollectionUtils.isNotEmpty(tableFilters)) {
-            tableFilters.stream()
-                    .map(filter -> filter.toPredicate(table.getSchema()))
-                    .forEach(tokenBuilder::addPredicate);
-        }
-
-        if (rowLimit !=null && rowLimit > 0) {
-            tokenBuilder.limit(rowLimit);
-        }
-
-        return tokenBuilder.build();
-    }
-
-    public boolean writeRow(KuduRow row) throws Exception {
-        final Operation operation = KuduMapper.toOperation(table, writeMode, row);
-
-        Deferred<OperationResponse> response = session.apply(operation);
-
-        if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) {
-            pendingTransactions.incrementAndGet();
-            response.addCallback(defaultCB);
-        } else {
-            processResponse(response.join());
-        }
-
-        return !errorTransactions.get();
-
-    }
-
-    @Override
-    public void close() throws Exception {
-        while(pendingTransactions.get() > 0) {
-            LOG.info("sleeping {}s by pending transactions", pendingTransactions.get());
-            Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds());
-        }
-
-        if (session == null) return;
-        session.close();
-
-        if (client == null) return;
-        client.close();
-    }
-
-    public void flush(){
-        this.session.flush();
-    }
-
-    private class ResponseCallback implements Callback<Boolean, OperationResponse> {
-        @Override
-        public Boolean call(OperationResponse operationResponse) {
-            pendingTransactions.decrementAndGet();
-            processResponse(operationResponse);
-            return errorTransactions.get();
-        }
-    }
-
-    protected void processResponse(OperationResponse operationResponse) {
-        if (operationResponse == null) return;
-
-        if (operationResponse.hasRowError()) {
-            logResponseError(operationResponse.getRowError());
-            errorTransactions.set(true);
-        }
-    }
-
-    private void logResponseError(RowError error) {
-        LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString());
-    }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
deleted file mode 100644
index 86b683f..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu.connector;
-
-
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.RowResult;
-
-final class KuduMapper {
-
-    private KuduMapper() { }
-
-    static KuduRow toKuduRow(RowResult row) {
-        Schema schema = row.getColumnProjection();
-
-        KuduRow values = new KuduRow(schema.getColumnCount());
-        schema.getColumns().forEach(column -> {
-            String name = column.getName();
-            int pos = schema.getColumnIndex(name);
-            if(row.isNull(name)) {
-                values.setField(pos, name, null);
-            } else {
-                Type type = column.getType();
-                switch (type) {
-                    case BINARY:
-                        values.setField(pos, name, row.getBinary(name));
-                        break;
-                    case STRING:
-                        values.setField(pos, name, row.getString(name));
-                        break;
-                    case BOOL:
-                        values.setField(pos, name, row.getBoolean(name));
-                        break;
-                    case DOUBLE:
-                        values.setField(pos, name, row.getDouble(name));
-                        break;
-                    case FLOAT:
-                        values.setField(pos, name, row.getFloat(name));
-                        break;
-                    case INT8:
-                        values.setField(pos, name, row.getByte(name));
-                        break;
-                    case INT16:
-                        values.setField(pos, name, row.getShort(name));
-                        break;
-                    case INT32:
-                        values.setField(pos, name, row.getInt(name));
-                        break;
-                    case INT64:
-                        values.setField(pos, name, row.getLong(name));
-                        break;
-                    case UNIXTIME_MICROS:
-                        values.setField(pos, name, row.getLong(name) / 1000);
-                        break;
-                    default:
-                        throw new IllegalArgumentException("Illegal var type: " + type);
-                }
-            }
-        });
-        return values;
-    }
-
-
-    static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) {
-        final Operation operation = toOperation(table, writeMode);
-        final PartialRow partialRow = operation.getRow();
-
-        table.getSchema().getColumns().forEach(column -> {
-            String columnName = column.getName();
-            Object value = row.getField(column.getName());
-
-            if (value == null) {
-                partialRow.setNull(columnName);
-            } else {
-                Type type = column.getType();
-                switch (type) {
-                    case STRING:
-                        partialRow.addString(columnName, (String) value);
-                        break;
-                    case FLOAT:
-                        partialRow.addFloat(columnName, (Float) value);
-                        break;
-                    case INT8:
-                        partialRow.addByte(columnName, (Byte) value);
-                        break;
-                    case INT16:
-                        partialRow.addShort(columnName, (Short) value);
-                        break;
-                    case INT32:
-                        partialRow.addInt(columnName, (Integer) value);
-                        break;
-                    case INT64:
-                        partialRow.addLong(columnName, (Long) value);
-                        break;
-                    case DOUBLE:
-                        partialRow.addDouble(columnName, (Double) value);
-                        break;
-                    case BOOL:
-                        partialRow.addBoolean(columnName, (Boolean) value);
-                        break;
-                    case UNIXTIME_MICROS:
-                        //*1000 to correctly create date on kudu
-                        partialRow.addLong(columnName, ((Long) value) * 1000);
-                        break;
-                    case BINARY:
-                        partialRow.addBinary(columnName, (byte[]) value);
-                        break;
-                    default:
-                        throw new IllegalArgumentException("Illegal var type: " + type);
-                }
-            }
-        });
-        return operation;
-    }
-
-    static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) {
-        switch (writeMode) {
-            case INSERT: return table.newInsert();
-            case UPDATE: return table.newUpdate();
-            case UPSERT: return table.newUpsert();
-        }
-        return table.newUpsert();
-    }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
deleted file mode 100644
index 46cbff1..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu.connector;
-
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
-
-public class KuduRowIterator {
-
-    private KuduScanner scanner;
-    private RowResultIterator rowIterator;
-
-    public KuduRowIterator(KuduScanner scanner) throws KuduException {
-        this.scanner = scanner;
-        nextRows();
-    }
-
-    public void close() throws KuduException {
-        scanner.close();
-    }
-
-    public boolean hasNext() throws KuduException {
-        if (rowIterator.hasNext()) {
-            return true;
-        } else if (scanner.hasMoreRows()) {
-            nextRows();
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    public KuduRow next() {
-        RowResult row = this.rowIterator.next();
-        return KuduMapper.toKuduRow(row);
-    }
-
-    private void nextRows() throws KuduException {
-        this.rowIterator = scanner.nextRows();
-    }
-}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
similarity index 61%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
index 041b77e..e22f40e 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
@@ -14,35 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu;
+package org.apache.flink.connectors.kudu.batch;
 
-import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-
-public class KuduInputFormatTest extends KuduDatabase {
+class KuduInputFormatTest extends KuduDatabase {
 
     @Test
-    public void testInvalidKuduMaster() throws IOException {
+    void testInvalidKuduMaster() {
         KuduTableInfo tableInfo = booksTableInfo("books",false);
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(null, tableInfo));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(null, tableInfo, new DefaultSerDe()));
     }
 
     @Test
-    public void testInvalidTableInfo() throws IOException {
+    void testInvalidTableInfo() {
         String masterAddresses = harness.getMasterAddressesAsString();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(masterAddresses, null));
+        KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(readerConfig, null, new DefaultSerDe()));
     }
 
     @Test
-    public void testInputFormat() throws Exception {
+    void testInputFormat() throws Exception {
         KuduTableInfo tableInfo = booksTableInfo("books",true);
         setUpDatabase(tableInfo);
 
@@ -53,7 +56,7 @@
     }
 
     @Test
-    public void testInputFormatWithProjection() throws Exception {
+    void testInputFormatWithProjection() throws Exception {
         KuduTableInfo tableInfo = booksTableInfo("books",true);
         setUpDatabase(tableInfo);
 
@@ -68,14 +71,14 @@
     }
 
 
-    public static List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
+    private List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
-        KuduInputFormat inputFormat = new KuduInputFormat(masterAddresses, tableInfo)
-                .withTableProjections(fieldProjection);
+        KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+        KuduInputFormat<KuduRow> inputFormat = new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe(), new ArrayList<>(), Arrays.asList(fieldProjection));
 
-        KuduInputFormat.KuduInputSplit[] splits = inputFormat.createInputSplits(1);
+        KuduInputSplit[] splits = inputFormat.createInputSplits(1);
         List<KuduRow> rows = new ArrayList<>();
-        for (KuduInputFormat.KuduInputSplit split : splits) {
+        for (KuduInputSplit split : splits) {
             inputFormat.open(split);
             while(!inputFormat.reachedEnd()) {
                 KuduRow row = inputFormat.nextRecord(new KuduRow(5));
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
similarity index 63%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
index 4e91310..963a8c0 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
@@ -14,48 +14,54 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu;
+package org.apache.flink.connectors.kudu.batch;
 
-import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
+import org.apache.flink.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
-public class KuduOuputFormatTest extends KuduDatabase {
+class KuduOuputFormatTest extends KuduDatabase {
 
     @Test
-    public void testInvalidKuduMaster() throws IOException {
+    void testInvalidKuduMaster() {
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
         Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
     }
 
     @Test
-    public void testInvalidTableInfo() throws IOException {
+    void testInvalidTableInfo() {
         String masterAddresses = harness.getMasterAddressesAsString();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe()));
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, new DefaultSerDe()));
     }
 
     @Test
-    public void testNotTableExist() throws IOException {
+    void testNotTableExist() {
         String masterAddresses = harness.getMasterAddressesAsString();
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
-        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe());
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
         Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
     }
 
     @Test
-    public void testOutputWithStrongConsistency() throws Exception {
+    void testOutputWithStrongConsistency() throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
 
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
-                .withStrongConsistency();
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+                .setMasters(masterAddresses)
+                .setStrongConsistency()
+                .build();
+        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+
         outputFormat.open(0,1);
 
         for (KuduRow kuduRow : booksDataRow()) {
@@ -63,19 +69,23 @@
         }
         outputFormat.close();
 
-        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+        List<KuduRow> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
 
         cleanDatabase(tableInfo);
     }
 
     @Test
-    public void testOutputWithEventualConsistency() throws Exception {
+    void testOutputWithEventualConsistency() throws Exception {
         String masterAddresses = harness.getMasterAddressesAsString();
 
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
-                .withEventualConsistency();
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+                .setMasters(masterAddresses)
+                .setEventualConsistency()
+                .build();
+        KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+
         outputFormat.open(0,1);
 
         for (KuduRow kuduRow : booksDataRow()) {
@@ -87,7 +97,7 @@
 
         outputFormat.close();
 
-        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+        List<KuduRow> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
 
         cleanDatabase(tableInfo);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
similarity index 64%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
index d22203d..3d02a1d 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
@@ -14,8 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
 
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
 import org.apache.kudu.Type;
 import org.apache.kudu.test.KuduTestHarness;
 import org.junit.Rule;
@@ -23,6 +29,7 @@
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -33,7 +40,7 @@
     @Rule
     public static KuduTestHarness harness = new KuduTestHarness();
 
-    protected static final Object[][] booksTableData = {
+    private static final Object[][] booksTableData = {
             {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
             {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
             {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
@@ -68,17 +75,19 @@
                 .collect(Collectors.toList());
     }
 
-    public void setUpDatabase(KuduTableInfo tableInfo) {
+    protected void setUpDatabase(KuduTableInfo tableInfo) {
         try {
             String masterAddresses = harness.getMasterAddressesAsString();
-            KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo);
+            KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
             booksDataRow().forEach(row -> {
                 try {
-                    tableContext.writeRow(row);
+                    kuduWriter.write(row);
                 }catch (Exception e) {
                     e.printStackTrace();
                 }
             });
+            kuduWriter.close();
         } catch (Exception e) {
             Assertions.fail();
         }
@@ -87,11 +96,34 @@
     protected void cleanDatabase(KuduTableInfo tableInfo) {
         try {
             String masterAddresses = harness.getMasterAddressesAsString();
-            KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo);
-            tableContext.deleteTable();
-            tableContext.close();
+            KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
+            kuduWriter.deleteTable();
+            kuduWriter.close();
         } catch (Exception e) {
             Assertions.fail();
         }
     }
+
+    protected List<KuduRow> readRows(KuduTableInfo tableInfo) throws Exception {
+        String masterAddresses = harness.getMasterAddressesAsString();
+        KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+        KuduReader reader = new KuduReader(tableInfo, readerConfig);
+
+        KuduInputSplit[] splits = reader.createInputSplits(1);
+        List<KuduRow> rows = new ArrayList<>();
+        for (KuduInputSplit split : splits) {
+            KuduReaderIterator resultIterator = reader.scanner(split.getScanToken());
+            while(resultIterator.hasNext()) {
+                KuduRow row = resultIterator.next();
+                if(row != null) {
+                    rows.add(row);
+                }
+            }
+        }
+        reader.close();
+
+        return rows;
+    }
+
 }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
similarity index 87%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
index afe57ca..6057113 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
 
-import org.apache.flink.streaming.connectors.kudu.connector.KuduColumnInfo;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.kudu.Type;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
new file mode 100644
index 0000000..ea49a91
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.streaming;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
+import org.apache.flink.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.UUID;
+
+class KuduSinkTest extends KuduDatabase {
+
+    private static StreamingRuntimeContext context;
+
+    @BeforeAll
+    static void start() {
+        context = Mockito.mock(StreamingRuntimeContext.class);
+        Mockito.when(context.isCheckpointingEnabled()).thenReturn(true);
+    }
+
+    @Test
+    void testInvalidKuduMaster() {
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(null, tableInfo, new DefaultSerDe()));
+    }
+
+    @Test
+    void testInvalidTableInfo() {
+        String masterAddresses = harness.getMasterAddressesAsString();
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new DefaultSerDe()));
+    }
+
+    @Test
+    void testNotTableExist() {
+        String masterAddresses = harness.getMasterAddressesAsString();
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+        sink.setRuntimeContext(context);
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
+    }
+
+    @Test
+    void testOutputWithStrongConsistency() throws Exception {
+        String masterAddresses = harness.getMasterAddressesAsString();
+
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+                .setMasters(masterAddresses)
+                .setStrongConsistency()
+                .build();
+        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+        sink.setRuntimeContext(context);
+        sink.open(new Configuration());
+
+        for (KuduRow kuduRow : booksDataRow()) {
+            sink.invoke(kuduRow);
+        }
+        sink.close();
+
+        List<KuduRow> rows = readRows(tableInfo);
+        Assertions.assertEquals(5, rows.size());
+
+    }
+
+    @Test
+    void testOutputWithEventualConsistency() throws Exception {
+        String masterAddresses = harness.getMasterAddressesAsString();
+
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+                .setMasters(masterAddresses)
+                .setEventualConsistency()
+                .build();
+        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+        sink.setRuntimeContext(context);
+        sink.open(new Configuration());
+
+        for (KuduRow kuduRow : booksDataRow()) {
+            sink.invoke(kuduRow);
+        }
+
+        // sleep to allow eventual consistency to finish
+        Thread.sleep(1000);
+
+        sink.close();
+
+        List<KuduRow> rows = readRows(tableInfo);
+        Assertions.assertEquals(5, rows.size());
+    }
+
+
+    @Test
+    void testSpeed() throws Exception {
+        String masterAddresses = harness.getMasterAddressesAsString();
+
+        KuduTableInfo tableInfo = KuduTableInfo.Builder
+                .create("test_speed")
+                .createIfNotExist(true)
+                .replicas(3)
+                .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+                .addColumn(KuduColumnInfo.Builder.create("uuid", Type.STRING).build())
+                .build();
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+                .setMasters(masterAddresses)
+                .setEventualConsistency()
+                .build();
+        KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+        sink.setRuntimeContext(context);
+        sink.open(new Configuration());
+
+        int totalRecords = 100000;
+        for (int i=0; i < totalRecords; i++) {
+            KuduRow kuduRow = new KuduRow(2);
+            kuduRow.setField(0, "id", i);
+            kuduRow.setField(1, "uuid", UUID.randomUUID().toString());
+            sink.invoke(kuduRow);
+        }
+
+        // sleep to allow eventual consistency to finish
+        Thread.sleep(1000);
+
+        sink.close();
+
+        List<KuduRow> rows = readRows(tableInfo);
+        Assertions.assertEquals(totalRecords, rows.size());
+    }
+
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
deleted file mode 100644
index 225bf7c..0000000
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-
-public class KuduSinkTest extends KuduDatabase {
-
-    private static StreamingRuntimeContext context;
-
-    @BeforeAll
-    public static void start() {
-        context = Mockito.mock(StreamingRuntimeContext.class);
-        Mockito.when(context.isCheckpointingEnabled()).thenReturn(true);
-    }
-
-    @Test
-    public void testInvalidKuduMaster() throws IOException {
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
-    }
-
-    @Test
-    public void testInvalidTableInfo() throws IOException {
-        String masterAddresses = harness.getMasterAddressesAsString();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe()));
-    }
-
-    @Test
-    public void testNotTableExist() throws IOException {
-        String masterAddresses = harness.getMasterAddressesAsString();
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
-        KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe());
-        sink.setRuntimeContext(context);
-        Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
-    }
-
-    @Test
-    public void testOutputWithStrongConsistency() throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
-
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
-                .withStrongConsistency();
-        sink.setRuntimeContext(context);
-        sink.open(new Configuration());
-
-        for (KuduRow kuduRow : booksDataRow()) {
-            sink.invoke(kuduRow);
-        }
-        sink.close();
-
-        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
-        Assertions.assertEquals(5, rows.size());
-
-    }
-
-    @Test
-    public void testOutputWithEventualConsistency() throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
-
-        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
-        KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
-                .withEventualConsistency();
-        sink.setRuntimeContext(context);
-        sink.open(new Configuration());
-
-        for (KuduRow kuduRow : booksDataRow()) {
-            sink.invoke(kuduRow);
-        }
-
-        // sleep to allow eventual consistency to finish
-        Thread.sleep(1000);
-
-        sink.close();
-
-        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
-        Assertions.assertEquals(5, rows.size());
-    }
-
-}