[BAHIR-214] Improve speed and solve eventual consistence issues (#64)
* resolve eventual consistency issues
* improve speed special on eventual consistency stream
* Update Readme
diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 9b75aa7..8692ca5 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -29,15 +29,19 @@
// create a table info object
KuduTableInfo tableInfo = KuduTableInfo.Builder
.create("books")
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+ .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
+ .addColumn(KuduColumnInfo.Builder.createString("title").build())
+ .addColumn(KuduColumnInfo.Builder.createString("author").build())
+ .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
+ .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
.build();
-
+// create a reader configuration
+KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+ .setMasters("172.25.0.6")
+ .setRowLimit(1000)
+ .build();
// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
-env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
+env.createInput(new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe()))
.count();
env.execute();
@@ -54,18 +58,23 @@
KuduTableInfo tableInfo = KuduTableInfo.Builder
.create("books")
.createIfNotExist(true)
- .replicas(1)
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+ .replicas(3)
+ .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
+ .addColumn(KuduColumnInfo.Builder.createString("title").build())
+ .addColumn(KuduColumnInfo.Builder.createString("author").build())
+ .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
+ .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
.build();
-
+// create a writer configuration
+KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+ .setMasters("172.25.0.6")
+ .setUpsertWrite()
+ .setStrongConsistency()
+ .build();
...
env.fromCollection(books)
- .output(new KuduOutputFormat<>("172.25.0.6", tableInfo));
+ .output(new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe()));
env.execute();
```
@@ -81,18 +90,23 @@
KuduTableInfo tableInfo = KuduTableInfo.Builder
.create("books")
.createIfNotExist(true)
- .replicas(1)
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+ .replicas(3)
+ .addColumn(KuduColumnInfo.Builder.createInteger("id").asKey().asHashKey().build())
+ .addColumn(KuduColumnInfo.Builder.createString("title").build())
+ .addColumn(KuduColumnInfo.Builder.createString("author").build())
+ .addColumn(KuduColumnInfo.Builder.createDouble("price").build())
+ .addColumn(KuduColumnInfo.Builder.createInteger("quantity").build())
.build();
-
+// create a writer configuration
+KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+ .setMasters("172.25.0.6")
+ .setUpsertWrite()
+ .setStrongConsistency()
+ .build();
...
env.fromCollection(books)
- .addSink(new KuduSink<>("172.25.0.6", tableInfo));
+ .addSink(new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe()));
env.execute();
```
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
new file mode 100644
index 0000000..3a35e6a
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduInputFormat.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.batch;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.connectors.kudu.connector.serde.KuduDeserialization;
+import org.apache.kudu.client.KuduException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class KuduInputFormat<OUT> extends RichInputFormat<OUT, KuduInputSplit> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final KuduReaderConfig readerConfig;
+ private final KuduTableInfo tableInfo;
+ private final KuduDeserialization<OUT> deserializer;
+
+ private List<KuduFilterInfo> tableFilters;
+ private List<String> tableProjections;
+
+ private boolean endReached;
+
+ private transient KuduReader kuduReader;
+ private transient KuduReaderIterator resultIterator;
+
+ public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer) {
+ this(readerConfig, tableInfo, deserializer, new ArrayList<>(), new ArrayList<>());
+ }
+ public KuduInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, KuduDeserialization<OUT> deserializer, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+
+ this.readerConfig = checkNotNull(readerConfig,"readerConfig could not be null");
+ this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
+ this.deserializer = checkNotNull(deserializer,"deserializer could not be null");
+ this.tableFilters = checkNotNull(tableFilters,"tableFilters could not be null");
+ this.tableProjections = checkNotNull(tableProjections,"tableProjections could not be null");
+
+ this.endReached = false;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(KuduInputSplit split) throws IOException {
+ endReached = false;
+ startKuduReader();
+
+ resultIterator = kuduReader.scanner(split.getScanToken());
+ }
+
+ private void startKuduReader() throws IOException {
+ if (kuduReader == null) {
+ kuduReader = new KuduReader(tableInfo, readerConfig, tableFilters, tableProjections);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (resultIterator != null) {
+ try {
+ resultIterator.close();
+ } catch (KuduException e) {
+ e.printStackTrace();
+ }
+ }
+ if (kuduReader != null) {
+ kuduReader.close();
+ }
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+ return cachedStatistics;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) {
+ return new LocatableInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+ startKuduReader();
+ return kuduReader.createInputSplits(minNumSplits);
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return endReached;
+ }
+
+ @Override
+ public OUT nextRecord(OUT reuse) throws IOException {
+ // check that current iterator has next rows
+ if (this.resultIterator.hasNext()) {
+ KuduRow row = resultIterator.next();
+ return deserializer.deserialize(row);
+ } else {
+ endReached = true;
+ return null;
+ }
+ }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
new file mode 100644
index 0000000..9d7d017
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.batch;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class KuduOutputFormat<IN> extends RichOutputFormat<IN> implements CheckpointedFunction {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final KuduTableInfo tableInfo;
+ private final KuduWriterConfig writerConfig;
+ private final KuduFailureHandler failureHandler;
+ private final KuduSerialization<IN> serializer;
+
+ private transient KuduWriter kuduWriter;
+
+ public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer) {
+ this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+ }
+
+ public KuduOutputFormat(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<IN> serializer, KuduFailureHandler failureHandler) {
+ this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
+ this.writerConfig = checkNotNull(writerConfig,"config could not be null");
+ this.serializer = checkNotNull(serializer,"serializer could not be null");
+ this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
+
+ serializer.withSchema(kuduWriter.getSchema());
+ }
+
+ @Override
+ public void writeRecord(IN row) throws IOException {
+ final KuduRow kuduRow = serializer.serialize(row);
+ kuduWriter.write(kuduRow);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (kuduWriter != null) {
+ kuduWriter.close();
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ kuduWriter.flushAndCheckErrors();
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
similarity index 97%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
index fa7472f..ff8a601 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduColumnInfo.java
@@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import java.io.Serializable;
+@PublicEvolving
public class KuduColumnInfo implements Serializable {
private String name;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
similarity index 97%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index 1a7582d..c37bc9a 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduPredicate;
import java.util.List;
-
+@PublicEvolving
public class KuduFilterInfo {
private String column;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
similarity index 95%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
index 3c57a1b..af78361 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
@@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.types.Row;
import java.lang.reflect.Field;
@@ -26,6 +27,7 @@
import java.util.Map;
import java.util.stream.Stream;
+@PublicEvolving
public class KuduRow extends Row {
private Map<String, Integer> rowNames;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
similarity index 96%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index dfea382..eb63b3f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.CreateTableOptions;
@@ -24,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
+@PublicEvolving
public class KuduTableInfo implements Serializable {
private static final Integer DEFAULT_REPLICAS = 1;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
new file mode 100644
index 0000000..7548033
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/DefaultKuduFailureHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.failure;
+
+import org.apache.kudu.client.RowError;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DefaultKuduFailureHandler implements KuduFailureHandler {
+ @Override
+ public void onFailure(List<RowError> failure) throws IOException {
+ String errors = failure.stream()
+ .map(error -> error.toString() + System.lineSeparator())
+ .collect(Collectors.joining());
+ throw new IOException("Error while sending value. \n " + errors);
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
new file mode 100644
index 0000000..42de4f7
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.failure;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.kudu.client.RowError;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+@PublicEvolving
+public interface KuduFailureHandler extends Serializable {
+
+ /**
+ * Handle a failed {@link List<RowError>}.
+ *
+ * @param failure the cause of failure
+ * @throws IOException if the sink should fail on this failure, the implementation should rethrow the throwable or a custom one
+ */
+ void onFailure(List<RowError> failure) throws IOException;
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
new file mode 100644
index 0000000..a809106
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduInputSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+
+public class KuduInputSplit extends LocatableInputSplit {
+
+ private byte[] scanToken;
+
+ /**
+ * Creates a new KuduInputSplit
+ * @param splitNumber the number of the input split
+ * @param hostnames The names of the hosts storing the data this input split refers to.
+ */
+ public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
+ super(splitNumber, hostnames);
+
+ this.scanToken = scanToken;
+ }
+
+ public byte[] getScanToken() {
+ return scanToken;
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
new file mode 100644
index 0000000..9c6e790
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Internal
+public class KuduReader implements AutoCloseable {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final KuduTableInfo tableInfo;
+ private final KuduReaderConfig readerConfig;
+ private final List<KuduFilterInfo> tableFilters;
+ private final List<String> tableProjections;
+
+ private transient KuduClient client;
+ private transient KuduSession session;
+ private transient KuduTable table;
+
+ public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig) throws IOException {
+ this(tableInfo, readerConfig, new ArrayList<>(), new ArrayList<>());
+ }
+ public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters) throws IOException {
+ this(tableInfo, readerConfig, tableFilters, new ArrayList<>());
+ }
+ public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections) throws IOException {
+ this.tableInfo = tableInfo;
+ this.readerConfig = readerConfig;
+ this.tableFilters = tableFilters;
+ this.tableProjections = tableProjections;
+
+ this.client = obtainClient();
+ this.session = obtainSession();
+ this.table = obtainTable();
+ }
+
+ private KuduClient obtainClient() {
+ return new KuduClient.KuduClientBuilder(readerConfig.getMasters()).build();
+ }
+
+ private KuduSession obtainSession() {
+ return client.newSession();
+ }
+
+ private KuduTable obtainTable() throws IOException {
+ String tableName = tableInfo.getName();
+ if (client.tableExists(tableName)) {
+ return client.openTable(tableName);
+ }
+ if (tableInfo.createIfNotExist()) {
+ return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
+ }
+ throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+ }
+
+ public KuduReaderIterator scanner(byte[] token) throws IOException {
+ return new KuduReaderIterator(KuduScanToken.deserializeIntoScanner(token, client));
+ }
+
+ public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) {
+ KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
+
+ if (CollectionUtils.isNotEmpty(tableProjections)) {
+ tokenBuilder.setProjectedColumnNames(tableProjections);
+ }
+
+ if (CollectionUtils.isNotEmpty(tableFilters)) {
+ tableFilters.stream()
+ .map(filter -> filter.toPredicate(table.getSchema()))
+ .forEach(tokenBuilder::addPredicate);
+ }
+
+ if (rowLimit !=null && rowLimit > 0) {
+ tokenBuilder.limit(rowLimit);
+ }
+
+ return tokenBuilder.build();
+ }
+
+ public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+
+ List<KuduScanToken> tokens = scanTokens(tableFilters, tableProjections, readerConfig.getRowLimit());
+
+ KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
+
+ for (int i = 0; i < tokens.size(); i++) {
+ KuduScanToken token = tokens.get(i);
+
+ List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+
+ for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+ locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
+ }
+
+ KuduInputSplit split = new KuduInputSplit(
+ token.serialize(),
+ i,
+ locations.toArray(new String[locations.size()])
+ );
+ splits[i] = split;
+ }
+
+ if (splits.length < minNumSplits) {
+ log.warn(" The minimum desired number of splits with your configured parallelism level " +
+ "is {}. Current kudu splits = {}. {} instances will remain idle.",
+ minNumSplits,
+ splits.length,
+ (minNumSplits - splits.length)
+ );
+ }
+
+ return splits;
+ }
+
+ /**
+ * Returns a endpoint url in the following format: <host>:<ip>
+ *
+ * @param host Hostname
+ * @param port Port
+ * @return Formatted URL
+ */
+ private String getLocation(String host, Integer port) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(host).append(":").append(port);
+ return builder.toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ log.error("Error while closing session.", e);
+ }
+ try {
+ if (client != null) {
+ client.close();
+ }
+ } catch (Exception e) {
+ log.error("Error while closing client.", e);
+ }
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
new file mode 100644
index 0000000..6f5f079
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@PublicEvolving
+public class KuduReaderConfig implements Serializable {
+
+ private final String masters;
+ private final Integer rowLimit;
+
+ private KuduReaderConfig(
+ String masters,
+ Integer rowLimit) {
+
+ this.masters = checkNotNull(masters, "Kudu masters cannot be null");
+ this.rowLimit = checkNotNull(rowLimit, "Kudu rowLimit cannot be null");;
+ }
+
+ public String getMasters() {
+ return masters;
+ }
+
+ public Integer getRowLimit() {
+ return rowLimit;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("masters", masters)
+ .append("rowLimit", rowLimit)
+ .toString();
+ }
+
+ /**
+ * Builder for the {@link KuduReaderConfig}.
+ */
+ public static class Builder {
+ private String masters;
+ private Integer rowLimit = 0;
+
+ private Builder(String masters) {
+ this.masters = masters;
+ }
+
+ public static Builder setMasters(String masters) {
+ return new Builder(masters);
+ }
+
+ public Builder setRowLimit(Integer rowLimit) {
+ this.rowLimit = rowLimit;
+ return this;
+ }
+
+ public KuduReaderConfig build() {
+ return new KuduReaderConfig(
+ masters,
+ rowLimit);
+ }
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
new file mode 100644
index 0000000..4a8e69c
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+@Internal
+public class KuduReaderIterator {
+
+ private KuduScanner scanner;
+ private RowResultIterator rowIterator;
+
+ public KuduReaderIterator(KuduScanner scanner) throws KuduException {
+ this.scanner = scanner;
+ nextRows();
+ }
+
+ public void close() throws KuduException {
+ scanner.close();
+ }
+
+ public boolean hasNext() throws KuduException {
+ if (rowIterator.hasNext()) {
+ return true;
+ } else if (scanner.hasMoreRows()) {
+ nextRows();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public KuduRow next() {
+ RowResult row = this.rowIterator.next();
+ return toKuduRow(row);
+ }
+
+ private void nextRows() throws KuduException {
+ this.rowIterator = scanner.nextRows();
+ }
+
+ private KuduRow toKuduRow(RowResult row) {
+ Schema schema = row.getColumnProjection();
+
+ KuduRow values = new KuduRow(schema.getColumnCount());
+ schema.getColumns().forEach(column -> {
+ String name = column.getName();
+ int pos = schema.getColumnIndex(name);
+ if(row.isNull(name)) {
+ values.setField(pos, name, null);
+ } else {
+ Type type = column.getType();
+ switch (type) {
+ case BINARY:
+ values.setField(pos, name, row.getBinary(name));
+ break;
+ case STRING:
+ values.setField(pos, name, row.getString(name));
+ break;
+ case BOOL:
+ values.setField(pos, name, row.getBoolean(name));
+ break;
+ case DOUBLE:
+ values.setField(pos, name, row.getDouble(name));
+ break;
+ case FLOAT:
+ values.setField(pos, name, row.getFloat(name));
+ break;
+ case INT8:
+ values.setField(pos, name, row.getByte(name));
+ break;
+ case INT16:
+ values.setField(pos, name, row.getShort(name));
+ break;
+ case INT32:
+ values.setField(pos, name, row.getInt(name));
+ break;
+ case INT64:
+ values.setField(pos, name, row.getLong(name));
+ break;
+ case UNIXTIME_MICROS:
+ values.setField(pos, name, row.getLong(name) / 1000);
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal var type: " + type);
+ }
+ }
+ });
+ return values;
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
similarity index 90%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
index c12eb42..36584b5 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/DefaultSerDe.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
import org.apache.kudu.Schema;
public class DefaultSerDe implements KuduSerialization<KuduRow>, KuduDeserialization<KuduRow> {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
similarity index 87%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
index 355a516..190c4c7 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduDeserialization.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
similarity index 88%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
index 99db1dc..b13c59b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/KuduSerialization.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
import org.apache.kudu.Schema;
import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
similarity index 95%
rename from flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
index 1063aa2..bc57174 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDe.java
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.kudu.Schema;
import java.lang.reflect.Constructor;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
new file mode 100644
index 0000000..f4e2a8a
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+@Internal
+public class KuduWriter implements AutoCloseable {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final KuduTableInfo tableInfo;
+ private final KuduWriterConfig writerConfig;
+ private final KuduFailureHandler failureHandler;
+
+ private transient KuduClient client;
+ private transient KuduSession session;
+ private transient KuduTable table;
+
+
+ public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig) throws IOException {
+ this (tableInfo, writerConfig, new DefaultKuduFailureHandler());
+ }
+ public KuduWriter(KuduTableInfo tableInfo, KuduWriterConfig writerConfig, KuduFailureHandler failureHandler) throws IOException {
+ this.tableInfo = tableInfo;
+ this.writerConfig = writerConfig;
+ this.failureHandler = failureHandler;
+
+ this.client = obtainClient();
+ this.session = obtainSession();
+ this.table = obtainTable();
+ }
+
+ private KuduClient obtainClient() {
+ return new KuduClient.KuduClientBuilder(writerConfig.getMasters()).build();
+ }
+
+ private KuduSession obtainSession() {
+ KuduSession session = client.newSession();
+ session.setFlushMode(writerConfig.getFlushMode());
+ return session;
+ }
+
+ private KuduTable obtainTable() throws IOException {
+ String tableName = tableInfo.getName();
+ if (client.tableExists(tableName)) {
+ return client.openTable(tableName);
+ }
+ if (tableInfo.createIfNotExist()) {
+ return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions());
+ }
+ throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+ }
+
+ public Schema getSchema() {
+ return table.getSchema();
+ }
+
+ public void write(KuduRow row) throws IOException {
+ checkAsyncErrors();
+
+ final Operation operation = mapToOperation(row);
+ final OperationResponse response = session.apply(operation);
+
+ checkErrors(response);
+ }
+
+ public void flushAndCheckErrors() throws IOException {
+ checkAsyncErrors();
+ flush();
+ checkAsyncErrors();
+ }
+
+ @VisibleForTesting
+ public DeleteTableResponse deleteTable() throws IOException {
+ String tableName = table.getName();
+ return client.deleteTable(tableName);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ flushAndCheckErrors();
+ } finally {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ log.error("Error while closing session.", e);
+ }
+ try {
+ if (client != null) {
+ client.close();
+ }
+ } catch (Exception e) {
+ log.error("Error while closing client.", e);
+ }
+ }
+ }
+
+ private void flush() throws IOException {
+ session.flush();
+ }
+
+ private void checkErrors(OperationResponse response) throws IOException {
+ if (response != null && response.hasRowError()) {
+ failureHandler.onFailure(Arrays.asList(response.getRowError()));
+ } else {
+ checkAsyncErrors();
+ }
+ }
+
+ private void checkAsyncErrors() throws IOException {
+ if (session.countPendingErrors() == 0) return;
+
+ List<RowError> errors = Arrays.asList(session.getPendingErrors().getRowErrors());
+ failureHandler.onFailure(errors);
+ }
+
+ private Operation mapToOperation(KuduRow row) {
+ final Operation operation = obtainOperation();
+ final PartialRow partialRow = operation.getRow();
+
+ table.getSchema().getColumns().forEach(column -> {
+ String columnName = column.getName();
+ Object value = row.getField(column.getName());
+
+ if (value == null) {
+ partialRow.setNull(columnName);
+ } else {
+ Type type = column.getType();
+ switch (type) {
+ case STRING:
+ partialRow.addString(columnName, (String) value);
+ break;
+ case FLOAT:
+ partialRow.addFloat(columnName, (Float) value);
+ break;
+ case INT8:
+ partialRow.addByte(columnName, (Byte) value);
+ break;
+ case INT16:
+ partialRow.addShort(columnName, (Short) value);
+ break;
+ case INT32:
+ partialRow.addInt(columnName, (Integer) value);
+ break;
+ case INT64:
+ partialRow.addLong(columnName, (Long) value);
+ break;
+ case DOUBLE:
+ partialRow.addDouble(columnName, (Double) value);
+ break;
+ case BOOL:
+ partialRow.addBoolean(columnName, (Boolean) value);
+ break;
+ case UNIXTIME_MICROS:
+ //*1000 to correctly create date on kudu
+ partialRow.addLong(columnName, ((Long) value) * 1000);
+ break;
+ case BINARY:
+ partialRow.addBinary(columnName, (byte[]) value);
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal var type: " + type);
+ }
+ }
+ });
+ return operation;
+ }
+
+ private Operation obtainOperation() {
+ switch (writerConfig.getWriteMode()) {
+ case INSERT: return table.newInsert();
+ case UPDATE: return table.newUpdate();
+ case UPSERT: return table.newUpsert();
+ }
+ return table.newUpsert();
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
new file mode 100644
index 0000000..13672d5
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.kudu.client.SessionConfiguration.FlushMode;
+
+@PublicEvolving
+public class KuduWriterConfig implements Serializable {
+
+ private final String masters;
+ private final FlushMode flushMode;
+ private final KuduWriterMode writeMode;
+
+ private KuduWriterConfig(
+ String masters,
+ FlushMode flushMode,
+ KuduWriterMode writeMode) {
+
+ this.masters = checkNotNull(masters, "Kudu masters cannot be null");
+ this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null");
+ this.writeMode = checkNotNull(writeMode, "Kudu write mode cannot be null");
+ }
+
+ public String getMasters() {
+ return masters;
+ }
+
+ public KuduWriterMode getWriteMode() {
+ return writeMode;
+ }
+
+ public FlushMode getFlushMode() {
+ return flushMode;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("masters", masters)
+ .append("flushMode", flushMode)
+ .append("writeMode", writeMode)
+ .toString();
+ }
+
+ /**
+ * Builder for the {@link KuduWriterConfig}.
+ */
+ public static class Builder {
+ private String masters;
+ private KuduWriterMode writeMode = KuduWriterMode.UPSERT;
+ private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+
+ private Builder(String masters) {
+ this.masters = masters;
+ }
+
+ public static Builder setMasters(String masters) {
+ return new Builder(masters);
+ }
+
+ public Builder setWriteMode(KuduWriterMode writeMode) {
+ this.writeMode = writeMode;
+ return this;
+ }
+ public Builder setUpsertWrite() {
+ return setWriteMode(KuduWriterMode.UPSERT);
+ }
+ public Builder setInsertWrite() {
+ return setWriteMode(KuduWriterMode.INSERT);
+ }
+ public Builder setUpdateWrite() {
+ return setWriteMode(KuduWriterMode.UPDATE);
+ }
+
+ public Builder setConsistency(FlushMode flushMode) {
+ this.flushMode = flushMode;
+ return this;
+ }
+ public Builder setEventualConsistency() {
+ return setConsistency(FlushMode.AUTO_FLUSH_BACKGROUND);
+ }
+ public Builder setStrongConsistency() {
+ return setConsistency(FlushMode.AUTO_FLUSH_SYNC);
+ }
+
+ public KuduWriterConfig build() {
+ return new KuduWriterConfig(
+ masters,
+ flushMode,
+ writeMode);
+ }
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
new file mode 100644
index 0000000..27b2ed3
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConsistency.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import static org.apache.kudu.client.SessionConfiguration.*;
+
+public enum KuduWriterConsistency {
+ EVENTUAL(FlushMode.AUTO_FLUSH_BACKGROUND),
+ STRONG(FlushMode.AUTO_FLUSH_SYNC),
+ //CHECKPOINT(FlushMode.MANUAL_FLUSH)
+ ;
+
+ public final FlushMode flushMode;
+
+ KuduWriterConsistency(FlushMode flushMode) {
+ this.flushMode = flushMode;
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
new file mode 100644
index 0000000..8c9eab0
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterMode.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+public enum KuduWriterMode {
+ INSERT,
+ UPDATE,
+ UPSERT
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
new file mode 100644
index 0000000..d523b67
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.streaming;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.serde.KuduSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@PublicEvolving
+public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final KuduTableInfo tableInfo;
+ private final KuduWriterConfig writerConfig;
+ private final KuduFailureHandler failureHandler;
+ private final KuduSerialization<OUT> serializer;
+
+ private transient KuduWriter kuduWriter;
+
+ public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
+ this(writerConfig, tableInfo, serializer, new DefaultKuduFailureHandler());
+ }
+
+ public KuduSink(KuduWriterConfig writerConfig, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer, KuduFailureHandler failureHandler) {
+ this.tableInfo = checkNotNull(tableInfo,"tableInfo could not be null");
+ this.writerConfig = checkNotNull(writerConfig,"config could not be null");
+ this.serializer = checkNotNull(serializer,"serializer could not be null");
+ this.failureHandler = checkNotNull(failureHandler,"failureHandler could not be null");
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ kuduWriter = new KuduWriter(tableInfo, writerConfig, failureHandler);
+
+ serializer.withSchema(kuduWriter.getSchema());
+ }
+
+ @Override
+ public void invoke(OUT value) throws Exception {
+ final KuduRow kuduRow = serializer.serialize(value);
+ kuduWriter.write(kuduRow);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (kuduWriter != null) {
+ kuduWriter.close();
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ kuduWriter.flushAndCheckErrors();
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
deleted file mode 100644
index fd126d0..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.streaming.connectors.kudu.connector.*;
-import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanToken;
-import org.apache.kudu.client.LocatedTablet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.KuduInputSplit> {
-
- private String kuduMasters;
- private KuduTableInfo tableInfo;
- private List<KuduFilterInfo> tableFilters;
- private List<String> tableProjections;
- private Long rowsLimit;
- private boolean endReached;
-
- private transient KuduConnector tableContext;
- private transient KuduRowIterator resultIterator;
-
- private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class);
-
- public KuduInputFormat(String kuduMasters, KuduTableInfo tableInfo) {
- Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
- this.kuduMasters = kuduMasters;
-
- Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
- this.tableInfo = tableInfo;
-
- this.endReached = false;
- }
-
- public KuduInputFormat withTableFilters(KuduFilterInfo... tableFilters) {
- return withTableFilters(Arrays.asList(tableFilters));
- }
-
- public KuduInputFormat withTableFilters(List<KuduFilterInfo> tableFilters) {
- this.tableFilters = tableFilters;
- return this;
- }
-
- public KuduInputFormat withTableProjections(String... tableProjections) {
- return withTableProjections(Arrays.asList(tableProjections));
- }
- public KuduInputFormat withTableProjections(List<String> tableProjections) {
- this.tableProjections = tableProjections;
- return this;
- }
-
- public KuduInputFormat withRowsLimit(Long rowsLimit) {
- this.rowsLimit = rowsLimit;
- return this;
- }
-
- @Override
- public void configure(Configuration parameters) {
-
- }
-
- @Override
- public void open(KuduInputSplit split) throws IOException {
- endReached = false;
- startTableContext();
-
- resultIterator = tableContext.scanner(split.getScanToken());
- }
-
- @Override
- public void close() {
- if (resultIterator != null) {
- try {
- resultIterator.close();
- } catch (KuduException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
- return cachedStatistics;
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) {
- return new LocatableInputSplitAssigner(inputSplits);
- }
-
- private void startTableContext() throws IOException {
- if (tableContext == null) {
- tableContext = new KuduConnector(kuduMasters, tableInfo);
- }
- }
-
- @Override
- public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
- startTableContext();
- Preconditions.checkNotNull(tableContext,"tableContext should not be null");
-
- List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);
-
- KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
-
- for (int i = 0; i < tokens.size(); i++) {
- KuduScanToken token = tokens.get(i);
-
- List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
-
- for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
- locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
- }
-
- KuduInputSplit split = new KuduInputSplit(
- token.serialize(),
- i,
- locations.toArray(new String[locations.size()])
- );
- splits[i] = split;
- }
-
- if (splits.length < minNumSplits) {
- LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
- "is {}. Current kudu splits = {}. {} instances will remain idle.",
- minNumSplits,
- splits.length,
- (minNumSplits - splits.length)
- );
- }
-
- return splits;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return endReached;
- }
-
- @Override
- public KuduRow nextRecord(KuduRow reuse) throws IOException {
- // check that current iterator has next rows
- if (this.resultIterator.hasNext()) {
- return resultIterator.next();
- } else {
- endReached = true;
- return null;
- }
- }
-
- /**
- * Returns a endpoint url in the following format: <host>:<ip>
- *
- * @param host Hostname
- * @param port Port
- * @return Formatted URL
- */
- private String getLocation(String host, Integer port) {
- StringBuilder builder = new StringBuilder();
- builder.append(host).append(":").append(port);
- return builder.toString();
- }
-
-
- public class KuduInputSplit extends LocatableInputSplit {
-
- private byte[] scanToken;
-
- /**
- * Creates a new KuduInputSplit
- * @param splitNumber the number of the input split
- * @param hostnames The names of the hosts storing the data this input split refers to.
- */
- public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
- super(splitNumber, hostnames);
-
- this.scanToken = scanToken;
- }
-
- public byte[] getScanToken() {
- return scanToken;
- }
- }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
deleted file mode 100644
index c1301da..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
-import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
-
- private String kuduMasters;
- private KuduTableInfo tableInfo;
- private KuduConnector.Consistency consistency;
- private KuduConnector.WriteMode writeMode;
-
- private KuduSerialization<OUT> serializer;
-
- private transient KuduConnector connector;
-
-
- public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
- Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
- this.kuduMasters = kuduMasters;
-
- Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
- this.tableInfo = tableInfo;
- this.consistency = KuduConnector.Consistency.STRONG;
- this.writeMode = KuduConnector.WriteMode.UPSERT;
- this.serializer = serializer.withSchema(tableInfo.getSchema());
- }
-
-
- public KuduOutputFormat<OUT> withEventualConsistency() {
- this.consistency = KuduConnector.Consistency.EVENTUAL;
- return this;
- }
-
- public KuduOutputFormat<OUT> withStrongConsistency() {
- this.consistency = KuduConnector.Consistency.STRONG;
- return this;
- }
-
- public KuduOutputFormat<OUT> withUpsertWriteMode() {
- this.writeMode = KuduConnector.WriteMode.UPSERT;
- return this;
- }
-
- public KuduOutputFormat<OUT> withInsertWriteMode() {
- this.writeMode = KuduConnector.WriteMode.INSERT;
- return this;
- }
-
- public KuduOutputFormat<OUT> withUpdateWriteMode() {
- this.writeMode = KuduConnector.WriteMode.UPDATE;
- return this;
- }
-
- @Override
- public void configure(Configuration parameters) {
-
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- if (connector != null) return;
- connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode,FlushMode.AUTO_FLUSH_SYNC);
- serializer = serializer.withSchema(tableInfo.getSchema());
- }
-
- @Override
- public void writeRecord(OUT row) throws IOException {
- boolean response;
- try {
- KuduRow kuduRow = serializer.serialize(row);
- response = connector.writeRow(kuduRow);
- } catch (Exception e) {
- throw new IOException(e.getLocalizedMessage(), e);
- }
-
- if(!response) {
- throw new IOException("error with some transaction");
- }
- }
-
- @Override
- public void close() throws IOException {
- if (this.connector == null) return;
- try {
- this.connector.close();
- } catch (Exception e) {
- throw new IOException(e.getLocalizedMessage(), e);
- }
- }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
deleted file mode 100644
index b6dd9c8..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
-import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
-
- private String kuduMasters;
- private KuduTableInfo tableInfo;
- private KuduConnector.Consistency consistency;
- private KuduConnector.WriteMode writeMode;
- private FlushMode flushMode;
-
- private KuduSerialization<OUT> serializer;
-
- private transient KuduConnector connector;
-
- public KuduSink(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
- Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
- this.kuduMasters = kuduMasters;
-
- Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
- this.tableInfo = tableInfo;
- this.consistency = KuduConnector.Consistency.STRONG;
- this.writeMode = KuduConnector.WriteMode.UPSERT;
- this.serializer = serializer.withSchema(tableInfo.getSchema());
- }
-
- public KuduSink<OUT> withEventualConsistency() {
- this.consistency = KuduConnector.Consistency.EVENTUAL;
- return this;
- }
-
- public KuduSink<OUT> withStrongConsistency() {
- this.consistency = KuduConnector.Consistency.STRONG;
- return this;
- }
-
- public KuduSink<OUT> withUpsertWriteMode() {
- this.writeMode = KuduConnector.WriteMode.UPSERT;
- return this;
- }
-
- public KuduSink<OUT> withInsertWriteMode() {
- this.writeMode = KuduConnector.WriteMode.INSERT;
- return this;
- }
-
- public KuduSink<OUT> withUpdateWriteMode() {
- this.writeMode = KuduConnector.WriteMode.UPDATE;
- return this;
- }
-
- public KuduSink<OUT> withSyncFlushMode() {
- this.flushMode = FlushMode.AUTO_FLUSH_SYNC;
- return this;
- }
-
- public KuduSink<OUT> withAsyncFlushMode() {
- this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
- return this;
- }
-
- @Override
- public void open(Configuration parameters) throws IOException {
- if (this.connector != null) return;
- this.connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode, getflushMode());
- this.serializer.withSchema(tableInfo.getSchema());
- }
-
- /**
- * If flink checkpoint is disable,synchronously write data to kudu.
- * <p>If flink checkpoint is enable, asynchronously write data to kudu by default.
- *
- * <p>(Note: async may result in out-of-order writes to Kudu.
- * you also can change to sync by explicitly calling {@link KuduSink#withSyncFlushMode()} when initializing KuduSink. )
- *
- * @return flushMode
- */
- private FlushMode getflushMode() {
- FlushMode flushMode = FlushMode.AUTO_FLUSH_SYNC;
- boolean enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
- if(enableCheckpoint && this.flushMode == null) {
- flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
- }
- if(enableCheckpoint && this.flushMode != null) {
- flushMode = this.flushMode;
- }
- return flushMode;
- }
-
- @Override
- public void invoke(OUT row) throws Exception {
- KuduRow kuduRow = serializer.serialize(row);
- boolean response = connector.writeRow(kuduRow);
-
- if(!response) {
- throw new IOException("error with some transaction");
- }
- }
-
- @Override
- public void close() throws Exception {
- if (this.connector == null) return;
- try {
- this.connector.close();
- } catch (Exception e) {
- throw new IOException(e.getLocalizedMessage(), e);
- }
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
- }
- this.connector.flush();
- }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
deleted file mode 100644
index d45886c..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu.connector;
-
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.api.common.time.Time;
-import org.apache.kudu.client.*;
-import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class KuduConnector implements AutoCloseable {
-
- private final Logger LOG = LoggerFactory.getLogger(this.getClass());
-
- private Callback<Boolean, OperationResponse> defaultCB;
-
- public enum Consistency {EVENTUAL, STRONG};
- public enum WriteMode {INSERT,UPDATE,UPSERT}
-
- private AsyncKuduClient client;
- private KuduTable table;
- private AsyncKuduSession session;
-
- private Consistency consistency;
- private WriteMode writeMode;
-
- private static AtomicInteger pendingTransactions = new AtomicInteger();
- private static AtomicBoolean errorTransactions = new AtomicBoolean(false);
-
- public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException {
- this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT,FlushMode.AUTO_FLUSH_SYNC);
- }
-
- public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode,FlushMode flushMode) throws IOException {
- this.client = client(kuduMasters);
- this.table = table(tableInfo);
- this.session = client.newSession();
- this.consistency = consistency;
- this.writeMode = writeMode;
- this.defaultCB = new ResponseCallback();
- this.session.setFlushMode(flushMode);
- }
-
- private AsyncKuduClient client(String kuduMasters) {
- return new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasters).build();
- }
-
- private KuduTable table(KuduTableInfo infoTable) throws IOException {
- KuduClient syncClient = client.syncClient();
-
- String tableName = infoTable.getName();
- if (syncClient.tableExists(tableName)) {
- return syncClient.openTable(tableName);
- }
- if (infoTable.createIfNotExist()) {
- return syncClient.createTable(tableName, infoTable.getSchema(), infoTable.getCreateTableOptions());
- }
- throw new UnsupportedOperationException("table not exists and is marketed to not be created");
- }
-
- public boolean deleteTable() throws IOException {
- String tableName = table.getName();
- client.syncClient().deleteTable(tableName);
- return true;
- }
-
- public KuduRowIterator scanner(byte[] token) throws IOException {
- return new KuduRowIterator(KuduScanToken.deserializeIntoScanner(token, client.syncClient()));
- }
-
- public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) {
- KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.syncClient().newScanTokenBuilder(table);
-
- if (CollectionUtils.isNotEmpty(tableProjections)) {
- tokenBuilder.setProjectedColumnNames(tableProjections);
- }
-
- if (CollectionUtils.isNotEmpty(tableFilters)) {
- tableFilters.stream()
- .map(filter -> filter.toPredicate(table.getSchema()))
- .forEach(tokenBuilder::addPredicate);
- }
-
- if (rowLimit !=null && rowLimit > 0) {
- tokenBuilder.limit(rowLimit);
- }
-
- return tokenBuilder.build();
- }
-
- public boolean writeRow(KuduRow row) throws Exception {
- final Operation operation = KuduMapper.toOperation(table, writeMode, row);
-
- Deferred<OperationResponse> response = session.apply(operation);
-
- if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) {
- pendingTransactions.incrementAndGet();
- response.addCallback(defaultCB);
- } else {
- processResponse(response.join());
- }
-
- return !errorTransactions.get();
-
- }
-
- @Override
- public void close() throws Exception {
- while(pendingTransactions.get() > 0) {
- LOG.info("sleeping {}s by pending transactions", pendingTransactions.get());
- Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds());
- }
-
- if (session == null) return;
- session.close();
-
- if (client == null) return;
- client.close();
- }
-
- public void flush(){
- this.session.flush();
- }
-
- private class ResponseCallback implements Callback<Boolean, OperationResponse> {
- @Override
- public Boolean call(OperationResponse operationResponse) {
- pendingTransactions.decrementAndGet();
- processResponse(operationResponse);
- return errorTransactions.get();
- }
- }
-
- protected void processResponse(OperationResponse operationResponse) {
- if (operationResponse == null) return;
-
- if (operationResponse.hasRowError()) {
- logResponseError(operationResponse.getRowError());
- errorTransactions.set(true);
- }
- }
-
- private void logResponseError(RowError error) {
- LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString());
- }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
deleted file mode 100644
index 86b683f..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu.connector;
-
-
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.RowResult;
-
-final class KuduMapper {
-
- private KuduMapper() { }
-
- static KuduRow toKuduRow(RowResult row) {
- Schema schema = row.getColumnProjection();
-
- KuduRow values = new KuduRow(schema.getColumnCount());
- schema.getColumns().forEach(column -> {
- String name = column.getName();
- int pos = schema.getColumnIndex(name);
- if(row.isNull(name)) {
- values.setField(pos, name, null);
- } else {
- Type type = column.getType();
- switch (type) {
- case BINARY:
- values.setField(pos, name, row.getBinary(name));
- break;
- case STRING:
- values.setField(pos, name, row.getString(name));
- break;
- case BOOL:
- values.setField(pos, name, row.getBoolean(name));
- break;
- case DOUBLE:
- values.setField(pos, name, row.getDouble(name));
- break;
- case FLOAT:
- values.setField(pos, name, row.getFloat(name));
- break;
- case INT8:
- values.setField(pos, name, row.getByte(name));
- break;
- case INT16:
- values.setField(pos, name, row.getShort(name));
- break;
- case INT32:
- values.setField(pos, name, row.getInt(name));
- break;
- case INT64:
- values.setField(pos, name, row.getLong(name));
- break;
- case UNIXTIME_MICROS:
- values.setField(pos, name, row.getLong(name) / 1000);
- break;
- default:
- throw new IllegalArgumentException("Illegal var type: " + type);
- }
- }
- });
- return values;
- }
-
-
- static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) {
- final Operation operation = toOperation(table, writeMode);
- final PartialRow partialRow = operation.getRow();
-
- table.getSchema().getColumns().forEach(column -> {
- String columnName = column.getName();
- Object value = row.getField(column.getName());
-
- if (value == null) {
- partialRow.setNull(columnName);
- } else {
- Type type = column.getType();
- switch (type) {
- case STRING:
- partialRow.addString(columnName, (String) value);
- break;
- case FLOAT:
- partialRow.addFloat(columnName, (Float) value);
- break;
- case INT8:
- partialRow.addByte(columnName, (Byte) value);
- break;
- case INT16:
- partialRow.addShort(columnName, (Short) value);
- break;
- case INT32:
- partialRow.addInt(columnName, (Integer) value);
- break;
- case INT64:
- partialRow.addLong(columnName, (Long) value);
- break;
- case DOUBLE:
- partialRow.addDouble(columnName, (Double) value);
- break;
- case BOOL:
- partialRow.addBoolean(columnName, (Boolean) value);
- break;
- case UNIXTIME_MICROS:
- //*1000 to correctly create date on kudu
- partialRow.addLong(columnName, ((Long) value) * 1000);
- break;
- case BINARY:
- partialRow.addBinary(columnName, (byte[]) value);
- break;
- default:
- throw new IllegalArgumentException("Illegal var type: " + type);
- }
- }
- });
- return operation;
- }
-
- static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) {
- switch (writeMode) {
- case INSERT: return table.newInsert();
- case UPDATE: return table.newUpdate();
- case UPSERT: return table.newUpsert();
- }
- return table.newUpsert();
- }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
deleted file mode 100644
index 46cbff1..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed serialize the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file serialize You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed serialize in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu.connector;
-
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
-
-public class KuduRowIterator {
-
- private KuduScanner scanner;
- private RowResultIterator rowIterator;
-
- public KuduRowIterator(KuduScanner scanner) throws KuduException {
- this.scanner = scanner;
- nextRows();
- }
-
- public void close() throws KuduException {
- scanner.close();
- }
-
- public boolean hasNext() throws KuduException {
- if (rowIterator.hasNext()) {
- return true;
- } else if (scanner.hasMoreRows()) {
- nextRows();
- return true;
- } else {
- return false;
- }
- }
-
- public KuduRow next() {
- RowResult row = this.rowIterator.next();
- return KuduMapper.toKuduRow(row);
- }
-
- private void nextRows() throws KuduException {
- this.rowIterator = scanner.nextRows();
- }
-}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
similarity index 61%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
index 041b77e..e22f40e 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
@@ -14,35 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu;
+package org.apache.flink.connectors.kudu.batch;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-
-public class KuduInputFormatTest extends KuduDatabase {
+class KuduInputFormatTest extends KuduDatabase {
@Test
- public void testInvalidKuduMaster() throws IOException {
+ void testInvalidKuduMaster() {
KuduTableInfo tableInfo = booksTableInfo("books",false);
- Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(null, tableInfo));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(null, tableInfo, new DefaultSerDe()));
}
@Test
- public void testInvalidTableInfo() throws IOException {
+ void testInvalidTableInfo() {
String masterAddresses = harness.getMasterAddressesAsString();
- Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(masterAddresses, null));
+ KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat<>(readerConfig, null, new DefaultSerDe()));
}
@Test
- public void testInputFormat() throws Exception {
+ void testInputFormat() throws Exception {
KuduTableInfo tableInfo = booksTableInfo("books",true);
setUpDatabase(tableInfo);
@@ -53,7 +56,7 @@
}
@Test
- public void testInputFormatWithProjection() throws Exception {
+ void testInputFormatWithProjection() throws Exception {
KuduTableInfo tableInfo = booksTableInfo("books",true);
setUpDatabase(tableInfo);
@@ -68,14 +71,14 @@
}
- public static List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
+ private List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduInputFormat inputFormat = new KuduInputFormat(masterAddresses, tableInfo)
- .withTableProjections(fieldProjection);
+ KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+ KuduInputFormat<KuduRow> inputFormat = new KuduInputFormat<>(readerConfig, tableInfo, new DefaultSerDe(), new ArrayList<>(), Arrays.asList(fieldProjection));
- KuduInputFormat.KuduInputSplit[] splits = inputFormat.createInputSplits(1);
+ KuduInputSplit[] splits = inputFormat.createInputSplits(1);
List<KuduRow> rows = new ArrayList<>();
- for (KuduInputFormat.KuduInputSplit split : splits) {
+ for (KuduInputSplit split : splits) {
inputFormat.open(split);
while(!inputFormat.reachedEnd()) {
KuduRow row = inputFormat.nextRecord(new KuduRow(5));
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
similarity index 63%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
index 4e91310..963a8c0 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
@@ -14,48 +14,54 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu;
+package org.apache.flink.connectors.kudu.batch;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
+import org.apache.flink.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.List;
import java.util.UUID;
-public class KuduOuputFormatTest extends KuduDatabase {
+class KuduOuputFormatTest extends KuduDatabase {
@Test
- public void testInvalidKuduMaster() throws IOException {
+ void testInvalidKuduMaster() {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
}
@Test
- public void testInvalidTableInfo() throws IOException {
+ void testInvalidTableInfo() {
String masterAddresses = harness.getMasterAddressesAsString();
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe()));
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, new DefaultSerDe()));
}
@Test
- public void testNotTableExist() throws IOException {
+ void testNotTableExist() {
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe());
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
}
@Test
- public void testOutputWithStrongConsistency() throws Exception {
+ void testOutputWithStrongConsistency() throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
- .withStrongConsistency();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+ .setMasters(masterAddresses)
+ .setStrongConsistency()
+ .build();
+ KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+
outputFormat.open(0,1);
for (KuduRow kuduRow : booksDataRow()) {
@@ -63,19 +69,23 @@
}
outputFormat.close();
- List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+ List<KuduRow> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
cleanDatabase(tableInfo);
}
@Test
- public void testOutputWithEventualConsistency() throws Exception {
+ void testOutputWithEventualConsistency() throws Exception {
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
- .withEventualConsistency();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+ .setMasters(masterAddresses)
+ .setEventualConsistency()
+ .build();
+ KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new DefaultSerDe());
+
outputFormat.open(0,1);
for (KuduRow kuduRow : booksDataRow()) {
@@ -87,7 +97,7 @@
outputFormat.close();
- List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+ List<KuduRow> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
cleanDatabase(tableInfo);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
similarity index 64%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
index d22203d..3d02a1d 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
@@ -14,8 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.connector;
+package org.apache.flink.connectors.kudu.connector;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
import org.apache.kudu.Type;
import org.apache.kudu.test.KuduTestHarness;
import org.junit.Rule;
@@ -23,6 +29,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -33,7 +40,7 @@
@Rule
public static KuduTestHarness harness = new KuduTestHarness();
- protected static final Object[][] booksTableData = {
+ private static final Object[][] booksTableData = {
{1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
{1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
{1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
@@ -68,17 +75,19 @@
.collect(Collectors.toList());
}
- public void setUpDatabase(KuduTableInfo tableInfo) {
+ protected void setUpDatabase(KuduTableInfo tableInfo) {
try {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo);
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
booksDataRow().forEach(row -> {
try {
- tableContext.writeRow(row);
+ kuduWriter.write(row);
}catch (Exception e) {
e.printStackTrace();
}
});
+ kuduWriter.close();
} catch (Exception e) {
Assertions.fail();
}
@@ -87,11 +96,34 @@
protected void cleanDatabase(KuduTableInfo tableInfo) {
try {
String masterAddresses = harness.getMasterAddressesAsString();
- KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo);
- tableContext.deleteTable();
- tableContext.close();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig);
+ kuduWriter.deleteTable();
+ kuduWriter.close();
} catch (Exception e) {
Assertions.fail();
}
}
+
+ protected List<KuduRow> readRows(KuduTableInfo tableInfo) throws Exception {
+ String masterAddresses = harness.getMasterAddressesAsString();
+ KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+ KuduReader reader = new KuduReader(tableInfo, readerConfig);
+
+ KuduInputSplit[] splits = reader.createInputSplits(1);
+ List<KuduRow> rows = new ArrayList<>();
+ for (KuduInputSplit split : splits) {
+ KuduReaderIterator resultIterator = reader.scanner(split.getScanToken());
+ while(resultIterator.hasNext()) {
+ KuduRow row = resultIterator.next();
+ if(row != null) {
+ rows.add(row);
+ }
+ }
+ }
+ reader.close();
+
+ return rows;
+ }
+
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
similarity index 87%
rename from flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
index afe57ca..6057113 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/serde/PojoSerDeTest.java
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kudu.serde;
+package org.apache.flink.connectors.kudu.connector.serde;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduColumnInfo;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.kudu.Type;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
new file mode 100644
index 0000000..ea49a91
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.streaming;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.connectors.kudu.connector.KuduColumnInfo;
+import org.apache.flink.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.connectors.kudu.connector.KuduRow;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.serde.DefaultSerDe;
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.UUID;
+
+class KuduSinkTest extends KuduDatabase {
+
+ private static StreamingRuntimeContext context;
+
+ @BeforeAll
+ static void start() {
+ context = Mockito.mock(StreamingRuntimeContext.class);
+ Mockito.when(context.isCheckpointingEnabled()).thenReturn(true);
+ }
+
+ @Test
+ void testInvalidKuduMaster() {
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(null, tableInfo, new DefaultSerDe()));
+ }
+
+ @Test
+ void testInvalidTableInfo() {
+ String masterAddresses = harness.getMasterAddressesAsString();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new DefaultSerDe()));
+ }
+
+ @Test
+ void testNotTableExist() {
+ String masterAddresses = harness.getMasterAddressesAsString();
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
+ KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+ sink.setRuntimeContext(context);
+ Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
+ }
+
+ @Test
+ void testOutputWithStrongConsistency() throws Exception {
+ String masterAddresses = harness.getMasterAddressesAsString();
+
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+ .setMasters(masterAddresses)
+ .setStrongConsistency()
+ .build();
+ KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+ sink.setRuntimeContext(context);
+ sink.open(new Configuration());
+
+ for (KuduRow kuduRow : booksDataRow()) {
+ sink.invoke(kuduRow);
+ }
+ sink.close();
+
+ List<KuduRow> rows = readRows(tableInfo);
+ Assertions.assertEquals(5, rows.size());
+
+ }
+
+ @Test
+ void testOutputWithEventualConsistency() throws Exception {
+ String masterAddresses = harness.getMasterAddressesAsString();
+
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+ .setMasters(masterAddresses)
+ .setEventualConsistency()
+ .build();
+ KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+ sink.setRuntimeContext(context);
+ sink.open(new Configuration());
+
+ for (KuduRow kuduRow : booksDataRow()) {
+ sink.invoke(kuduRow);
+ }
+
+ // sleep to allow eventual consistency to finish
+ Thread.sleep(1000);
+
+ sink.close();
+
+ List<KuduRow> rows = readRows(tableInfo);
+ Assertions.assertEquals(5, rows.size());
+ }
+
+
+ @Test
+ void testSpeed() throws Exception {
+ String masterAddresses = harness.getMasterAddressesAsString();
+
+ KuduTableInfo tableInfo = KuduTableInfo.Builder
+ .create("test_speed")
+ .createIfNotExist(true)
+ .replicas(3)
+ .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+ .addColumn(KuduColumnInfo.Builder.create("uuid", Type.STRING).build())
+ .build();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+ .setMasters(masterAddresses)
+ .setEventualConsistency()
+ .build();
+ KuduSink<KuduRow> sink = new KuduSink<>(writerConfig, tableInfo, new DefaultSerDe());
+
+ sink.setRuntimeContext(context);
+ sink.open(new Configuration());
+
+ int totalRecords = 100000;
+ for (int i=0; i < totalRecords; i++) {
+ KuduRow kuduRow = new KuduRow(2);
+ kuduRow.setField(0, "id", i);
+ kuduRow.setField(1, "uuid", UUID.randomUUID().toString());
+ sink.invoke(kuduRow);
+ }
+
+ // sleep to allow eventual consistency to finish
+ Thread.sleep(1000);
+
+ sink.close();
+
+ List<KuduRow> rows = readRows(tableInfo);
+ Assertions.assertEquals(totalRecords, rows.size());
+ }
+
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
deleted file mode 100644
index 225bf7c..0000000
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kudu;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
-import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-
-public class KuduSinkTest extends KuduDatabase {
-
- private static StreamingRuntimeContext context;
-
- @BeforeAll
- public static void start() {
- context = Mockito.mock(StreamingRuntimeContext.class);
- Mockito.when(context.isCheckpointingEnabled()).thenReturn(true);
- }
-
- @Test
- public void testInvalidKuduMaster() throws IOException {
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
- }
-
- @Test
- public void testInvalidTableInfo() throws IOException {
- String masterAddresses = harness.getMasterAddressesAsString();
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe()));
- }
-
- @Test
- public void testNotTableExist() throws IOException {
- String masterAddresses = harness.getMasterAddressesAsString();
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe());
- sink.setRuntimeContext(context);
- Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
- }
-
- @Test
- public void testOutputWithStrongConsistency() throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
-
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
- .withStrongConsistency();
- sink.setRuntimeContext(context);
- sink.open(new Configuration());
-
- for (KuduRow kuduRow : booksDataRow()) {
- sink.invoke(kuduRow);
- }
- sink.close();
-
- List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
- Assertions.assertEquals(5, rows.size());
-
- }
-
- @Test
- public void testOutputWithEventualConsistency() throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
-
- KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
- .withEventualConsistency();
- sink.setRuntimeContext(context);
- sink.open(new Configuration());
-
- for (KuduRow kuduRow : booksDataRow()) {
- sink.invoke(kuduRow);
- }
-
- // sleep to allow eventual consistency to finish
- Thread.sleep(1000);
-
- sink.close();
-
- List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
- Assertions.assertEquals(5, rows.size());
- }
-
-}