[BAHIR-180] Improve eventual consistence for Kudu connector
Closes #40
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 5540fc1..61ab4a6 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -30,7 +30,7 @@
<packaging>jar</packaging>
<properties>
- <kudu.version>1.8.0</kudu.version>
+ <kudu.version>1.7.1</kudu.version>
<junit.groups>!DockerTest</junit.groups>
</properties>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
index 617e317..fd126d0 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
@@ -24,7 +24,9 @@
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.streaming.connectors.kudu.connector.*;
import org.apache.flink.util.Preconditions;
-import org.apache.kudu.client.*;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.LocatedTablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +45,7 @@
private boolean endReached;
private transient KuduConnector tableContext;
- private transient KuduScanner scanner;
- private transient RowResultIterator resultIterator;
+ private transient KuduRowIterator resultIterator;
private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class);
@@ -90,15 +91,14 @@
endReached = false;
startTableContext();
- scanner = tableContext.scanner(split.getScanToken());
- resultIterator = scanner.nextRows();
+ resultIterator = tableContext.scanner(split.getScanToken());
}
@Override
public void close() {
- if (scanner != null) {
+ if (resultIterator != null) {
try {
- scanner.close();
+ resultIterator.close();
} catch (KuduException e) {
e.printStackTrace();
}
@@ -168,18 +168,11 @@
public KuduRow nextRecord(KuduRow reuse) throws IOException {
// check that current iterator has next rows
if (this.resultIterator.hasNext()) {
- RowResult row = this.resultIterator.next();
- return KuduMapper.toKuduRow(row);
- }
- // if not, check that current scanner has more iterators
- else if (scanner.hasMoreRows()) {
- this.resultIterator = scanner.nextRows();
- return nextRecord(reuse);
- }
- else {
+ return resultIterator.next();
+ } else {
endReached = true;
+ return null;
}
- return null;
}
/**
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
index 5c23f36..9d12710 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
@@ -16,18 +16,19 @@
*/
package org.apache.flink.streaming.connectors.kudu;
-import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> {
+public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
@@ -36,10 +37,12 @@
private KuduConnector.Consistency consistency;
private KuduConnector.WriteMode writeMode;
- private transient KuduConnector tableContext;
+ private KuduSerialization<OUT> serializer;
+
+ private transient KuduConnector connector;
- public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) {
+ public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
this.kuduMasters = kuduMasters;
@@ -47,8 +50,10 @@
this.tableInfo = tableInfo;
this.consistency = KuduConnector.Consistency.STRONG;
this.writeMode = KuduConnector.WriteMode.UPSERT;
+ this.serializer = serializer.withSchema(tableInfo.getSchema());
}
+
public KuduOutputFormat<OUT> withEventualConsistency() {
this.consistency = KuduConnector.Consistency.EVENTUAL;
return this;
@@ -81,28 +86,31 @@
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- startTableContext();
- }
-
- private void startTableContext() throws IOException {
- if (tableContext != null) return;
- tableContext = new KuduConnector(kuduMasters, tableInfo);
+ if (connector != null) return;
+ connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode);
+ serializer = serializer.withSchema(tableInfo.getSchema());
}
@Override
- public void writeRecord(OUT kuduRow) throws IOException {
+ public void writeRecord(OUT row) throws IOException {
+ boolean response;
try {
- tableContext.writeRow(kuduRow, consistency, writeMode);
+ KuduRow kuduRow = serializer.serialize(row);
+ response = connector.writeRow(kuduRow);
} catch (Exception e) {
throw new IOException(e.getLocalizedMessage(), e);
}
+
+ if(!response) {
+ throw new IOException("error with some transaction");
+ }
}
@Override
public void close() throws IOException {
- if (this.tableContext == null) return;
+ if (this.connector == null) return;
try {
- this.tableContext.close();
+ this.connector.close();
} catch (Exception e) {
throw new IOException(e.getLocalizedMessage(), e);
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
index 120d5c5..53cf249 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
@@ -21,13 +21,14 @@
import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> {
+public class KuduSink<OUT> extends RichSinkFunction<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
@@ -36,10 +37,11 @@
private KuduConnector.Consistency consistency;
private KuduConnector.WriteMode writeMode;
- private transient KuduConnector tableContext;
+ private KuduSerialization<OUT> serializer;
+ private transient KuduConnector connector;
- public KuduSink(String kuduMasters, KuduTableInfo tableInfo) {
+ public KuduSink(String kuduMasters, KuduTableInfo tableInfo, KuduSerialization<OUT> serializer) {
Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
this.kuduMasters = kuduMasters;
@@ -47,6 +49,7 @@
this.tableInfo = tableInfo;
this.consistency = KuduConnector.Consistency.STRONG;
this.writeMode = KuduConnector.WriteMode.UPSERT;
+ this.serializer = serializer.withSchema(tableInfo.getSchema());
}
public KuduSink<OUT> withEventualConsistency() {
@@ -76,29 +79,26 @@
@Override
public void open(Configuration parameters) throws IOException {
- startTableContext();
+ if (connector != null) return;
+ connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode);
+ serializer.withSchema(tableInfo.getSchema());
}
- private void startTableContext() throws IOException {
- if (tableContext != null) return;
- tableContext = new KuduConnector(kuduMasters, tableInfo);
- }
-
-
@Override
- public void invoke(OUT kuduRow) throws Exception {
- try {
- tableContext.writeRow(kuduRow, consistency, writeMode);
- } catch (Exception e) {
- throw new IOException(e.getLocalizedMessage(), e);
+ public void invoke(OUT row) throws Exception {
+ KuduRow kuduRow = serializer.serialize(row);
+ boolean response = connector.writeRow(kuduRow);
+
+ if(!response) {
+ throw new IOException("error with some transaction");
}
}
@Override
public void close() throws Exception {
- if (this.tableContext == null) return;
+ if (this.connector == null) return;
try {
- this.tableContext.close();
+ this.connector.close();
} catch (Exception e) {
throw new IOException(e.getLocalizedMessage(), e);
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
index 4dfc0b8..fa7472f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
@@ -80,22 +80,72 @@
public static Builder create(String name, Type type) {
return new Builder(name, type);
}
+ public static Builder createByte(String name) {
+ return create(name, Type.INT8);
+ }
+ public static Builder createShort(String name) {
+ return create(name, Type.INT16);
+ }
+ public static Builder createInteger(String name) {
+ return create(name, Type.INT32);
+ }
+ public static Builder createLong(String name) {
+ return create(name, Type.INT64);
+ }
+ public static Builder createDouble(String name) {
+ return create(name, Type.DOUBLE);
+ }
+ public static Builder createFloat(String name) {
+ return create(name, Type.FLOAT);
+ }
+ public static Builder createString(String name) {
+ return create(name, Type.STRING);
+ }
+ public static Builder createBool(String name) {
+ return create(name, Type.BOOL);
+ }
+ public static Builder createByteArray(String name) {
+ return create(name, Type.BINARY);
+ }
+ public static Builder createUnixTime(String name) {
+ return create(name, Type.UNIXTIME_MICROS);
+ }
+
+ public Builder asKey() {
+ return key(true);
+ }
public Builder key(boolean key) {
this.column.key = key;
return this;
}
+ public Builder asRangeKey() {
+ return rangeKey(true);
+ }
+
public Builder rangeKey(boolean rangeKey) {
this.column.rangeKey = rangeKey;
return this;
}
+ public Builder asHashKey() {
+ return hashKey(true);
+ }
+
public Builder hashKey(boolean hashKey) {
this.column.hashKey = hashKey;
return this;
}
+ public Builder asNullable() {
+ return nullable(true);
+ }
+
+ public Builder asNotNullable() {
+ return nullable(false);
+ }
+
public Builder nullable(boolean nullable) {
this.column.nullable = nullable;
return this;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
index 0e2e6bc..a3851c4 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
@@ -17,27 +17,46 @@
package org.apache.flink.streaming.connectors.kudu.connector;
import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.time.Time;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class KuduConnector implements AutoCloseable {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
+ private Callback<Boolean, OperationResponse> defaultCB;
+
public enum Consistency {EVENTUAL, STRONG};
public enum WriteMode {INSERT,UPDATE,UPSERT}
private AsyncKuduClient client;
private KuduTable table;
+ private Consistency consistency;
+ private WriteMode writeMode;
+
+ private static AtomicInteger pendingTransactions = new AtomicInteger();
+ private static AtomicBoolean errorTransactions = new AtomicBoolean(false);
+
public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException {
- client = client(kuduMasters);
- table = table(tableInfo);
+ this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT);
+ }
+
+ public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode) throws IOException {
+ this.client = client(kuduMasters);
+ this.table = table(tableInfo);
+ this.consistency = consistency;
+ this.writeMode = writeMode;
+ this.defaultCB = new ResponseCallback();
}
private AsyncKuduClient client(String kuduMasters) {
@@ -63,8 +82,8 @@
return true;
}
- public KuduScanner scanner(byte[] token) throws IOException {
- return KuduScanToken.deserializeIntoScanner(token, client.syncClient());
+ public KuduRowIterator scanner(byte[] token) throws IOException {
+ return new KuduRowIterator(KuduScanToken.deserializeIntoScanner(token, client.syncClient()));
}
public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) {
@@ -82,52 +101,60 @@
if (rowLimit !=null && rowLimit > 0) {
tokenBuilder.limit(rowLimit);
- // FIXME: https://issues.apache.org/jira/browse/KUDU-16
- // Server side limit() operator for java-based scanners are not implemented yet
}
return tokenBuilder.build();
}
- public boolean writeRow(KuduRow row, Consistency consistency, WriteMode writeMode) throws Exception {
+ public boolean writeRow(KuduRow row) throws Exception {
final Operation operation = KuduMapper.toOperation(table, writeMode, row);
- if (Consistency.EVENTUAL.equals(consistency)) {
- AsyncKuduSession session = client.newSession();
- session.apply(operation);
- session.flush();
- return session.close().addCallback(new ResponseCallback()).join();
+ AsyncKuduSession session = client.newSession();
+ Deferred<OperationResponse> response = session.apply(operation);
+
+ if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) {
+ pendingTransactions.incrementAndGet();
+ response.addCallback(defaultCB);
} else {
- KuduSession session = client.syncClient().newSession();
- session.apply(operation);
- session.flush();
- return processResponse(session.close());
+ processResponse(response.join());
}
+
+ session.close();
+ return !errorTransactions.get();
+
}
@Override
public void close() throws Exception {
- if (client == null) return;
+ while(pendingTransactions.get() > 0) {
+ LOG.info("sleeping {}s by pending transactions", pendingTransactions.get());
+ Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds());
+ }
+ if (client == null) return;
client.close();
}
- private Boolean processResponse(List<OperationResponse> operationResponses) {
- Boolean isOk = operationResponses.isEmpty();
- for(OperationResponse operationResponse : operationResponses) {
- logResponseError(operationResponse.getRowError());
+ private class ResponseCallback implements Callback<Boolean, OperationResponse> {
+ @Override
+ public Boolean call(OperationResponse operationResponse) {
+ pendingTransactions.decrementAndGet();
+ processResponse(operationResponse);
+ return errorTransactions.get();
}
- return isOk;
+ }
+
+ protected void processResponse(OperationResponse operationResponse) {
+ if (operationResponse == null) return;
+
+ if (operationResponse.hasRowError()) {
+ logResponseError(operationResponse.getRowError());
+ errorTransactions.set(true);
+ }
}
private void logResponseError(RowError error) {
LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString());
}
- private class ResponseCallback implements Callback<Boolean, List<OperationResponse>> {
- @Override
- public Boolean call(List<OperationResponse> operationResponses) {
- return processResponse(operationResponses);
- }
- }
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
index b1366ba..86b683f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.kudu.connector;
-import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduTable;
@@ -25,75 +24,73 @@
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowResult;
-import java.util.List;
-
-public final class KuduMapper {
+final class KuduMapper {
private KuduMapper() { }
- public static KuduRow toKuduRow(RowResult row) {
+ static KuduRow toKuduRow(RowResult row) {
Schema schema = row.getColumnProjection();
- List<ColumnSchema> columns = schema.getColumns();
- KuduRow values = new KuduRow(columns.size());
- for (int i = 0; i < columns.size(); i++) {
- String name = schema.getColumnByIndex(i).getName();
- if(row.isNull(i)) {
- values.setField(i, name, null);
+ KuduRow values = new KuduRow(schema.getColumnCount());
+ schema.getColumns().forEach(column -> {
+ String name = column.getName();
+ int pos = schema.getColumnIndex(name);
+ if(row.isNull(name)) {
+ values.setField(pos, name, null);
} else {
- Type type = schema.getColumnByIndex(i).getType();
+ Type type = column.getType();
switch (type) {
case BINARY:
- values.setField(i, name, row.getBinary(i));
+ values.setField(pos, name, row.getBinary(name));
break;
case STRING:
- values.setField(i, name, row.getString(i));
+ values.setField(pos, name, row.getString(name));
break;
case BOOL:
- values.setField(i, name, row.getBoolean(i));
+ values.setField(pos, name, row.getBoolean(name));
break;
case DOUBLE:
- values.setField(i, name, row.getDouble(i));
+ values.setField(pos, name, row.getDouble(name));
break;
case FLOAT:
- values.setField(i, name, row.getFloat(i));
+ values.setField(pos, name, row.getFloat(name));
break;
case INT8:
- values.setField(i, name, row.getByte(i));
+ values.setField(pos, name, row.getByte(name));
break;
case INT16:
- values.setField(i, name, row.getShort(i));
+ values.setField(pos, name, row.getShort(name));
break;
case INT32:
- values.setField(i, name, row.getInt(i));
+ values.setField(pos, name, row.getInt(name));
break;
case INT64:
+ values.setField(pos, name, row.getLong(name));
+ break;
case UNIXTIME_MICROS:
- values.setField(i, name, row.getLong(i));
+ values.setField(pos, name, row.getLong(name) / 1000);
break;
default:
throw new IllegalArgumentException("Illegal var type: " + type);
}
}
- }
+ });
return values;
}
- public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) {
+ static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) {
final Operation operation = toOperation(table, writeMode);
final PartialRow partialRow = operation.getRow();
- Schema schema = table.getSchema();
- List<ColumnSchema> columns = schema.getColumns();
+ table.getSchema().getColumns().forEach(column -> {
+ String columnName = column.getName();
+ Object value = row.getField(column.getName());
- for (int i = 0; i < columns.size(); i++) {
- String columnName = schema.getColumnByIndex(i).getName();
- Object value = row.getField(i);
if (value == null) {
partialRow.setNull(columnName);
} else {
- Type type = schema.getColumnByIndex(i).getType();
+ Type type = column.getType();
switch (type) {
case STRING:
partialRow.addString(columnName, (String) value);
@@ -130,11 +127,11 @@
throw new IllegalArgumentException("Illegal var type: " + type);
}
}
- }
+ });
return operation;
}
- public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) {
+ static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) {
switch (writeMode) {
case INSERT: return table.newInsert();
case UPDATE: return table.newUpdate();
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
index 03f5e5c..3c57a1b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
@@ -17,11 +17,13 @@
package org.apache.flink.streaming.connectors.kudu.connector;
import org.apache.flink.types.Row;
-import org.apache.kudu.Schema;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.stream.Stream;
public class KuduRow extends Row {
@@ -33,24 +35,6 @@
rowNames = new LinkedHashMap<>();
}
- public KuduRow(Object object, Schema schema) {
- super(validFields(object));
- for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
- basicValidation(c.getDeclaredFields())
- .filter(field -> schema.getColumn(field.getName()) != null)
- .forEach(cField -> {
- try {
- cField.setAccessible(true);
- setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object));
- } catch (IllegalAccessException e) {
- String error = String.format("Cannot get value for %s", cField.getName());
- throw new IllegalArgumentException(error, e);
- }
- });
- }
- }
-
-
public Object getField(String name) {
return super.getField(rowNames.get(name));
}
@@ -60,6 +44,10 @@
this.rowNames.put(name, pos);
}
+ public boolean isNull(String name) {
+ return isNull(rowNames.get(name));
+ }
+
public boolean isNull(int pos) {
return getField(pos) == null;
}
@@ -86,50 +74,6 @@
return toRet;
}
- public <P> P blind(Class<P> clazz) {
- P o = createInstance(clazz);
-
- for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
- Field[] fields = c.getDeclaredFields();
- for (Field cField : fields) {
- try {
- if(rowNames.containsKey(cField.getName())
- && !Modifier.isStatic(cField.getModifiers())
- && !Modifier.isTransient(cField.getModifiers())) {
-
- cField.setAccessible(true);
- Object value = getField(cField.getName());
- if (value != null) {
- if (cField.getType() == value.getClass()) {
- cField.set(o, value);
- } else if (cField.getType() == Long.class && value.getClass() == Date.class) {
- cField.set(o, ((Date) value).getTime());
- } else {
- cField.set(o, value);
- }
- }
- }
- } catch (IllegalAccessException e) {
- String error = String.format("Cannot get value for %s", cField.getName());
- throw new IllegalArgumentException(error, e);
- }
- }
- }
-
- return o;
-
- }
-
-
- private <P> P createInstance(Class<P> clazz) {
- try {
- return clazz.getConstructor().newInstance();
- } catch (ReflectiveOperationException e) {
- String error = String.format("Cannot create instance for %s", clazz.getSimpleName());
- throw new IllegalArgumentException(error, e);
- }
- }
-
@Override
public String toString() {
return blindMap().toString();
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
new file mode 100644
index 0000000..46cbff1
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRowIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+public class KuduRowIterator {
+
+ private KuduScanner scanner;
+ private RowResultIterator rowIterator;
+
+ public KuduRowIterator(KuduScanner scanner) throws KuduException {
+ this.scanner = scanner;
+ nextRows();
+ }
+
+ public void close() throws KuduException {
+ scanner.close();
+ }
+
+ public boolean hasNext() throws KuduException {
+ if (rowIterator.hasNext()) {
+ return true;
+ } else if (scanner.hasMoreRows()) {
+ nextRows();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public KuduRow next() {
+ RowResult row = this.rowIterator.next();
+ return KuduMapper.toKuduRow(row);
+ }
+
+ private void nextRows() throws KuduException {
+ this.rowIterator = scanner.nextRows();
+ }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
new file mode 100644
index 0000000..c12eb42
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/DefaultSerDe.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.kudu.Schema;
+
+public class DefaultSerDe implements KuduSerialization<KuduRow>, KuduDeserialization<KuduRow> {
+
+ @Override
+ public KuduRow deserialize(KuduRow row) {
+ return row;
+ }
+
+ @Override
+ public KuduRow serialize(KuduRow value) {
+ return value;
+ }
+
+ @Override
+ public DefaultSerDe withSchema(Schema schema) {
+ return this;
+ }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
new file mode 100644
index 0000000..355a516
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduDeserialization.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+
+import java.io.Serializable;
+
+public interface KuduDeserialization<T> extends Serializable {
+ T deserialize(KuduRow row);
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
new file mode 100644
index 0000000..99db1dc
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/KuduSerialization.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.kudu.Schema;
+
+import java.io.Serializable;
+
+public interface KuduSerialization<T> extends Serializable {
+ KuduRow serialize(T value);
+
+ KuduSerialization<T> withSchema(Schema schema);
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
new file mode 100644
index 0000000..1063aa2
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDe.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.kudu.Schema;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.stream.Stream;
+
+public class PojoSerDe<P> implements KuduSerialization<P>, KuduDeserialization<P> {
+
+
+ private Class<P> clazz;
+
+ public transient KuduTableInfo tableInfo;
+ public transient Schema schema;
+
+
+ public PojoSerDe(Class<P> clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public PojoSerDe<P> withSchema(Schema schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ @Override
+ public KuduRow serialize(P object) {
+ return mapTo(object);
+ }
+
+ private KuduRow mapTo(P object) {
+ if (schema == null) throw new IllegalArgumentException("schema must be set to serialize");
+
+ KuduRow row = new KuduRow(schema.getRowSize());
+
+ for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
+ basicValidation(c.getDeclaredFields())
+ .forEach(cField -> {
+ try {
+ cField.setAccessible(true);
+ row.setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object));
+ } catch (IllegalAccessException e) {
+ String error = String.format("Cannot get value for %s", cField.getName());
+ throw new IllegalArgumentException(error, e);
+ }
+ });
+ }
+
+ return row;
+ }
+
+ private Stream<Field> basicValidation(Field[] fields) {
+ return Arrays.stream(fields)
+ .filter(field -> schemaHasColumn(field.getName()))
+ .filter(field -> !Modifier.isStatic(field.getModifiers()))
+ .filter(field -> !Modifier.isTransient(field.getModifiers()));
+ }
+
+ private boolean schemaHasColumn(String field) {
+ return schema.getColumns().stream().anyMatch(col -> StringUtils.equalsIgnoreCase(col.getName(),field));
+ }
+
+ @Override
+ public P deserialize(KuduRow row) {
+ return mapFrom(row);
+ }
+
+ private P mapFrom(KuduRow row) {
+ P o = createInstance(clazz);
+
+ for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+
+ basicValidation(fields)
+ .forEach(cField -> {
+ try {
+ cField.setAccessible(true);
+ Object value = row.getField(cField.getName());
+ if (value != null) {
+ if (cField.getType() == value.getClass()) {
+ cField.set(o, value);
+ } else if (cField.getType() == Long.class && value.getClass() == Date.class) {
+ cField.set(o, ((Date) value).getTime());
+ } else {
+ cField.set(o, value);
+ }
+ }
+ } catch (IllegalAccessException e) {
+ String error = String.format("Cannot get value for %s", cField.getName());
+ throw new IllegalArgumentException(error, e);
+ }
+ });
+ }
+
+ return o;
+
+ }
+
+ private P createInstance(Class<P> clazz) {
+ try {
+ Constructor<P> constructor = clazz.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ return constructor.newInstance();
+ } catch (ReflectiveOperationException e) {
+ String error = String.format("Cannot create instance for %s", clazz.getSimpleName());
+ throw new IllegalArgumentException(error, e);
+ }
+ }
+
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
index e282185..35982f4 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
@@ -19,6 +19,7 @@
import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -32,18 +33,18 @@
@Test
public void testInvalidKuduMaster() throws IOException {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
}
@Test
public void testInvalidTableInfo() throws IOException {
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe()));
}
@Test
public void testNotTableExist() throws IOException {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo);
+ KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe());
Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
}
@@ -51,7 +52,7 @@
public void testOutputWithStrongConsistency() throws Exception {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo)
+ KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe())
.withStrongConsistency();
outputFormat.open(0,1);
@@ -70,7 +71,7 @@
public void testOutputWithEventualConsistency() throws Exception {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo)
+ KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe())
.withEventualConsistency();
outputFormat.open(0,1);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
index a89580f..3ca9b9a 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
@@ -20,6 +20,7 @@
import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -33,18 +34,18 @@
@Test
public void testInvalidKuduMaster() throws IOException {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo, new DefaultSerDe()));
}
@Test
public void testInvalidTableInfo() throws IOException {
- Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null));
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe()));
}
@Test
public void testNotTableExist() throws IOException {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- KuduSink sink = new KuduSink<>(hostsCluster, tableInfo);
+ KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe());
Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
}
@@ -52,7 +53,7 @@
public void testOutputWithStrongConsistency() throws Exception {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+ KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe())
.withStrongConsistency();
sink.open(new Configuration());
@@ -69,7 +70,7 @@
@Test
public void testOutputWithEventualConsistency() throws Exception {
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+ KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe())
.withEventualConsistency();
sink.open(new Configuration());
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
index 41e59b7..99efbd1 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
@@ -67,7 +67,7 @@
KuduConnector tableContext = new KuduConnector(hostsCluster, tableInfo);
booksDataRow().forEach(row -> {
try {
- tableContext.writeRow(row, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT);
+ tableContext.writeRow(row);
}catch (Exception e) {
e.printStackTrace();
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
new file mode 100644
index 0000000..afe57ca
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/serde/PojoSerDeTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed serialize the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file serialize You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed serialize in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.serde;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduColumnInfo;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PojoSerDeTest {
+
+ public class TestPojo {
+ private String field1;
+ private String field2;
+ private String field3;
+
+ public TestPojo() {
+ field1 = "field1";
+ field2 = "field2";
+ field3 = "field3";
+ }
+ }
+
+ @Test
+ public void testFieldsNotInSchema() {
+
+ TestPojo pojo = new TestPojo();
+
+ KuduTableInfo tableInfo = KuduTableInfo.Builder.create("test")
+ .addColumn(KuduColumnInfo.Builder.create("field1", Type.STRING).key(true).hashKey(true).build())
+ .addColumn(KuduColumnInfo.Builder.create("field2", Type.STRING).build())
+ .build();
+
+ KuduRow row = new PojoSerDe<>(TestPojo.class).withSchema(tableInfo.getSchema()).serialize(pojo);
+
+ Assertions.assertEquals(2, row.blindMap().size());
+ Assertions.assertEquals("field1", row.getField("field1"));
+ Assertions.assertEquals("field2", row.getField("field2"));
+
+ }
+}