Merge pull request #1276 from shlxue/master
HOP-3683: Add support for timestamp and byte[] in HopRowCoder
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java
index 11794d8..55444f6 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java
@@ -24,6 +24,7 @@
import java.io.*;
import java.math.BigDecimal;
+import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.Date;
@@ -112,6 +113,12 @@
out.writeLong(lng);
}
break;
+ case IValueMeta.TYPE_TIMESTAMP:
+ {
+ out.writeLong(((Timestamp) object).getTime());
+ out.writeInt(((Timestamp) object).getNanos());
+ }
+ break;
case IValueMeta.TYPE_DATE:
{
Long lng = ((Date) object).getTime();
@@ -136,6 +143,21 @@
out.writeUTF(bd.toString());
}
break;
+ case IValueMeta.TYPE_BINARY:
+ {
+ byte[] bytes = (byte[]) object;
+ out.write(bytes.length);
+ out.write(bytes);
+ }
+ break;
+ case IValueMeta.TYPE_INET:
+ {
+ InetAddress inetAddress = (InetAddress) object;
+ write(out, IValueMeta.TYPE_STRING, inetAddress.getHostName());
+ out.writeInt(inetAddress.getAddress().length == 4 ? 1 : 2);
+ out.write(inetAddress.getAddress());
+ }
+ break;
default:
throw new IOException(
"Data type not supported yet: " + objectType + " - " + object.toString());
@@ -159,6 +181,13 @@
return lng;
}
+ case IValueMeta.TYPE_TIMESTAMP:
+ {
+ Timestamp timestamp = new Timestamp(in.readLong());
+ timestamp.setNanos(in.readInt());
+ return timestamp;
+ }
+
case IValueMeta.TYPE_DATE:
{
Long lng = in.readLong();
@@ -182,6 +211,21 @@
String bd = in.readUTF();
return new BigDecimal(bd);
}
+
+ case IValueMeta.TYPE_BINARY:
+ {
+ byte[] bytes = new byte[in.readInt()];
+ in.read(bytes);
+ return bytes;
+ }
+
+ case IValueMeta.TYPE_INET:
+ {
+ String hostname = (String) read(in, IValueMeta.TYPE_STRING);
+ byte[] addr = new byte[in.readInt() == 1 ? 4 : 16];
+ in.read(addr);
+ return InetAddress.getByAddress(hostname, addr);
+ }
default:
throw new IOException("Data type not supported yet: " + objectType);
}
@@ -194,12 +238,12 @@
if (object instanceof Long) {
return IValueMeta.TYPE_INTEGER;
}
- if (object instanceof Date) {
- return IValueMeta.TYPE_DATE;
- }
if (object instanceof Timestamp) {
return IValueMeta.TYPE_TIMESTAMP;
}
+ if (object instanceof Date) {
+ return IValueMeta.TYPE_DATE;
+ }
if (object instanceof Boolean) {
return IValueMeta.TYPE_BOOLEAN;
}
@@ -209,6 +253,12 @@
if (object instanceof BigDecimal) {
return IValueMeta.TYPE_BIGNUMBER;
}
+ if (object instanceof byte[]) {
+ return IValueMeta.TYPE_BINARY;
+ }
+ if (object instanceof InetAddress) {
+ return IValueMeta.TYPE_INET;
+ }
throw new CoderException(
"Data type for object class " + object.getClass().getName() + " isn't supported yet");
}
diff --git a/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java b/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java
index e500ebe..0317c5f 100644
--- a/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java
+++ b/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java
@@ -24,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.sql.Timestamp;
import java.util.Date;
public class HopRowCoderTest extends TestCase {
@@ -44,7 +45,7 @@
HopRow row1 =
new HopRow(
new Object[] {
- "AAA", "BBB", Long.valueOf(100), Double.valueOf(1.234), new Date(876876868)
+ "AAA", "BBB", Long.valueOf(100), Double.valueOf(1.234), new Date(876876868), new Timestamp(810311)
});
hopRowCoder.encode(row1, outputStream);