Merge pull request #1276 from shlxue/master

HOP-3683: Add support for timestamp and byte[] in HopRowCoder
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java
index 11794d8..55444f6 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/coder/HopRowCoder.java
@@ -24,6 +24,7 @@
 
 import java.io.*;
 import java.math.BigDecimal;
+import java.net.InetAddress;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.Date;
@@ -112,6 +113,12 @@
           out.writeLong(lng);
         }
         break;
+      case IValueMeta.TYPE_TIMESTAMP:
+        {
+          out.writeLong(((Timestamp) object).getTime());
+          out.writeInt(((Timestamp) object).getNanos());
+        }
+        break;
       case IValueMeta.TYPE_DATE:
         {
           Long lng = ((Date) object).getTime();
@@ -136,6 +143,21 @@
           out.writeUTF(bd.toString());
         }
         break;
+      case IValueMeta.TYPE_BINARY:
+        {
+          byte[] bytes = (byte[]) object;
+          out.write(bytes.length);
+          out.write(bytes);
+        }
+        break;
+      case IValueMeta.TYPE_INET:
+        {
+          InetAddress inetAddress = (InetAddress) object;
+          write(out, IValueMeta.TYPE_STRING, inetAddress.getHostName());
+          out.writeInt(inetAddress.getAddress().length == 4 ? 1 : 2);
+          out.write(inetAddress.getAddress());
+        }
+        break;
       default:
         throw new IOException(
             "Data type not supported yet: " + objectType + " - " + object.toString());
@@ -159,6 +181,13 @@
           return lng;
         }
 
+      case IValueMeta.TYPE_TIMESTAMP:
+        {
+          Timestamp timestamp = new Timestamp(in.readLong());
+          timestamp.setNanos(in.readInt());
+          return timestamp;
+        }
+
       case IValueMeta.TYPE_DATE:
         {
           Long lng = in.readLong();
@@ -182,6 +211,21 @@
           String bd = in.readUTF();
           return new BigDecimal(bd);
         }
+
+      case IValueMeta.TYPE_BINARY:
+      {
+        byte[] bytes = new byte[in.readInt()];
+        in.read(bytes);
+        return bytes;
+      }
+
+      case IValueMeta.TYPE_INET:
+      {
+        String hostname = (String) read(in, IValueMeta.TYPE_STRING);
+        byte[] addr = new byte[in.readInt() == 1 ? 4 : 16];
+        in.read(addr);
+        return InetAddress.getByAddress(hostname, addr);
+      }
       default:
         throw new IOException("Data type not supported yet: " + objectType);
     }
@@ -194,12 +238,12 @@
     if (object instanceof Long) {
       return IValueMeta.TYPE_INTEGER;
     }
-    if (object instanceof Date) {
-      return IValueMeta.TYPE_DATE;
-    }
     if (object instanceof Timestamp) {
       return IValueMeta.TYPE_TIMESTAMP;
     }
+    if (object instanceof Date) {
+      return IValueMeta.TYPE_DATE;
+    }
     if (object instanceof Boolean) {
       return IValueMeta.TYPE_BOOLEAN;
     }
@@ -209,6 +253,12 @@
     if (object instanceof BigDecimal) {
       return IValueMeta.TYPE_BIGNUMBER;
     }
+    if (object instanceof byte[]) {
+      return IValueMeta.TYPE_BINARY;
+    }
+    if (object instanceof InetAddress) {
+      return IValueMeta.TYPE_INET;
+    }
     throw new CoderException(
         "Data type for object class " + object.getClass().getName() + " isn't supported yet");
   }
diff --git a/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java b/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java
index e500ebe..0317c5f 100644
--- a/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java
+++ b/plugins/engines/beam/src/test/java/org/apache/hop/beam/core/coder/HopRowCoderTest.java
@@ -24,6 +24,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.sql.Timestamp;
 import java.util.Date;
 
 public class HopRowCoderTest extends TestCase {
@@ -44,7 +45,7 @@
     HopRow row1 =
         new HopRow(
             new Object[] {
-              "AAA", "BBB", Long.valueOf(100), Double.valueOf(1.234), new Date(876876868)
+              "AAA", "BBB", Long.valueOf(100), Double.valueOf(1.234), new Date(876876868), new Timestamp(810311)
             });
 
     hopRowCoder.encode(row1, outputStream);