TWILL-107 Add payload support for Discoverable

Signed-off-by: Gokul Gunasekaran <gokul@cask.co>

This closes #10 on Github
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
index d8e4358..9513352 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
@@ -27,7 +27,15 @@
   /**
    * Registers an endpoint that could be discovered by external party.
    * @param serviceName Name of the endpoint
-   * @param port Port of the endpoint.
+   * @param port Port of the endpoint
    */
   Cancellable announce(String serviceName, int port);
+
+  /**
+   * Registers an endpoint that could be discovered by external party with a payload
+   * @param serviceName Name of the endpoint
+   * @param port Port of the endpoint
+   * @param payload byte array payload
+   */
+  Cancellable announce(String serviceName, int port, byte[] payload);
 }
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
index 370c3d3..d9cca85 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
@@ -131,17 +131,12 @@
 
   @Override
   public Cancellable announce(final String serviceName, final int port) {
-    return discoveryService.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return serviceName;
-      }
+    return announce(serviceName, port, new byte[]{});
+  }
 
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress(getHost(), port);
-      }
-    });
+  @Override
+  public Cancellable announce(final String serviceName, final int port, final byte[] payload) {
+    return discoveryService.register(new Discoverable(serviceName, new InetSocketAddress(getHost(), port), payload));
   }
 
   @Override
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
index a5529fe..f36a3db 100644
--- a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
@@ -19,19 +19,70 @@
 package org.apache.twill.discovery;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Objects;
 
 /**
  * Discoverable defines the attributes of service to be discovered.
  */
-public interface Discoverable {
+public class Discoverable {
+  private final String name;
+  private final InetSocketAddress address;
+  private final byte[] payload;
+
+  public Discoverable(String name, InetSocketAddress address, byte[] payload) {
+    this.name = name;
+    this.address = address;
+    this.payload = payload;
+  }
+
+  public Discoverable(String name, InetSocketAddress address) {
+    this(name, address, new byte[]{});
+  }
 
   /**
    * @return Name of the service
    */
-  String getName();
+  public String getName() {
+    return name;
+  }
 
   /**
    * @return An {@link InetSocketAddress} representing the host+port of the service.
    */
-  InetSocketAddress getSocketAddress();
+  public InetSocketAddress getSocketAddress() {
+    return address;
+  }
+
+  /**
+   * @return A payload represented as a byte array
+   */
+  public byte[] getPayload() {
+    return payload;
+  }
+
+  @Override
+  public String toString() {
+    return "{name=" + name + ", address=" + address + ", payload=" + payload + "}";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    Discoverable other = (Discoverable) o;
+
+    return name.equals(other.getName()) && address.equals(other.getSocketAddress()) &&
+      Arrays.equals(payload, other.getPayload());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, address, payload);
+  }
 }
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
index 63df8ad..dbb0f7c 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
@@ -96,17 +96,7 @@
   @Override
   public boolean contains(Discoverable discoverable) {
     // If the name doesn't match, it shouldn't be in the list.
-    if (!discoverable.getName().equals(name)) {
-      return false;
-    }
-
-    // Wrap it if necessary for hashCode/equals comparison.
-    Discoverable target = discoverable;
-    if (!(target instanceof DiscoverableWrapper)) {
-      target = new DiscoverableWrapper(target);
-    }
-
-    return discoverables.get().contains(target);
+    return discoverable.getName().equals(name) && discoverables.get().contains(discoverable);
   }
 
   @Override
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
index 0e4bcc3..b020dc7 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.twill.discovery;
 
 import com.google.common.base.Charsets;
+import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonDeserializationContext;
@@ -38,6 +39,7 @@
 
   private static final Gson GSON =
     new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec()).create();
+  private static final Type BYTE_ARRAY_TYPE = new TypeToken<byte[]>() { }.getType();
 
   /**
    * Helper function for encoding an instance of {@link Discoverable} into array of bytes.
@@ -73,21 +75,12 @@
     public Discoverable deserialize(JsonElement json, Type typeOfT,
                                     JsonDeserializationContext context) throws JsonParseException {
       JsonObject jsonObj = json.getAsJsonObject();
-      final String service = jsonObj.get("service").getAsString();
+      String service = jsonObj.get("service").getAsString();
       String hostname = jsonObj.get("hostname").getAsString();
       int port = jsonObj.get("port").getAsInt();
-      final InetSocketAddress address = new InetSocketAddress(hostname, port);
-      return new DiscoverableWrapper(new Discoverable() {
-        @Override
-        public String getName() {
-          return service;
-        }
-
-        @Override
-        public InetSocketAddress getSocketAddress() {
-          return address;
-        }
-      });
+      byte[] payload = context.deserialize(jsonObj.get("payload"), BYTE_ARRAY_TYPE);
+      InetSocketAddress address = new InetSocketAddress(hostname, port);
+      return new Discoverable(service, address, payload);
     }
 
     @Override
@@ -96,6 +89,7 @@
       jsonObj.addProperty("service", src.getName());
       jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
       jsonObj.addProperty("port", src.getSocketAddress().getPort());
+      jsonObj.add("payload", context.serialize(src.getPayload()));
       return jsonObj;
     }
   }
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
deleted file mode 100644
index 5fa97d1..0000000
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import java.net.InetSocketAddress;
-
-/**
- * Wrapper for a discoverable.
- */
-final class DiscoverableWrapper implements Discoverable {
-  private final String name;
-  private final InetSocketAddress address;
-
-  DiscoverableWrapper(Discoverable discoverable) {
-    this.name = discoverable.getName();
-    this.address = discoverable.getSocketAddress();
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public InetSocketAddress getSocketAddress() {
-    return address;
-  }
-
-  @Override
-  public String toString() {
-    return "{name=" + name + ", address=" + address;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    Discoverable other = (Discoverable) o;
-
-    return name.equals(other.getName()) && address.equals(other.getSocketAddress());
-  }
-
-  @Override
-  public int hashCode() {
-    int result = name.hashCode();
-    result = 31 * result + address.hashCode();
-    return result;
-  }
-}
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
index 6a71855..83a6681 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -37,12 +37,11 @@
 
   @Override
   public Cancellable register(final Discoverable discoverable) {
-    final Discoverable wrapper = new DiscoverableWrapper(discoverable);
-    final String serviceName = wrapper.getName();
+    final String serviceName = discoverable.getName();
 
     lock.lock();
     try {
-      services.put(serviceName, wrapper);
+      services.put(serviceName, discoverable);
 
       DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
       if (serviceDiscovered != null) {
@@ -57,7 +56,7 @@
       public void cancel() {
         lock.lock();
         try {
-          services.remove(serviceName, wrapper);
+          services.remove(serviceName, discoverable);
 
           DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
           if (serviceDiscovered != null) {
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index c563bab..4f2d0f7 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -146,23 +146,22 @@
    */
   @Override
   public Cancellable register(final Discoverable discoverable) {
-    final Discoverable wrapper = new DiscoverableWrapper(discoverable);
     final SettableFuture<String> future = SettableFuture.create();
-    final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
+    final DiscoveryCancellable cancellable = new DiscoveryCancellable(discoverable);
 
     // Create the zk ephemeral node.
-    Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
+    Futures.addCallback(doRegister(discoverable), new FutureCallback<String>() {
       @Override
       public void onSuccess(String result) {
         // Set the sequence node path to cancellable for future cancellation.
         cancellable.setPath(result);
         lock.lock();
         try {
-          discoverables.put(wrapper, cancellable);
+          discoverables.put(discoverable, cancellable);
         } finally {
           lock.unlock();
         }
-        LOG.debug("Service registered: {} {}", wrapper, result);
+        LOG.debug("Service registered: {} {}", discoverable, result);
         future.set(result);
       }
 
@@ -171,7 +170,7 @@
         if (t instanceof KeeperException.NodeExistsException) {
           handleRegisterFailure(discoverable, future, this, t);
         } else {
-          LOG.warn("Failed to register: {}", wrapper, t);
+          LOG.warn("Failed to register: {}", discoverable, t);
           future.setException(t);
         }
       }
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
index 23deddc..8fe5a37 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
@@ -45,28 +45,19 @@
 
   @Test
   public void simpleDiscoverable() throws Exception {
+    final String payload = "data";
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
     DiscoveryService discoveryService = entry.getKey();
     DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
     // Register one service running on one host:port
-    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090, payload.getBytes());
 
     // Discover that registered host:port.
     ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
     Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
 
-    Discoverable discoverable = new Discoverable() {
-      @Override
-      public String getName() {
-        return "foo";
-      }
-
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress("localhost", 8090);
-      }
-    };
+    Discoverable discoverable = new Discoverable("foo", new InetSocketAddress("localhost", 8090), payload.getBytes());
 
     // Check it exists.
     Assert.assertTrue(serviceDiscovered.contains(discoverable));
@@ -259,17 +250,7 @@
     Thread t = new Thread() {
       @Override
       public void run() {
-        service.register(new Discoverable() {
-          @Override
-          public String getName() {
-            return serviceName;
-          }
-
-          @Override
-          public InetSocketAddress getSocketAddress() {
-            return new InetSocketAddress(12345);
-          }
-        });
+        service.register(new Discoverable(serviceName, new InetSocketAddress(12345), new byte[]{}));
       }
     };
 
@@ -282,17 +263,12 @@
   }
 
   protected Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
-    return service.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return name;
-      }
+    return register(service, name, host, port, new byte[]{});
+  }
 
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress(host, port);
-      }
-    });
+  protected Cancellable register(DiscoveryService service, final String name, final String host, final int port,
+                                 final byte[] payload) {
+    return service.register(new Discoverable(name, new InetSocketAddress(host, port), payload));
   }
 
   protected boolean waitTillExpected(final int expected, ServiceDiscovered serviceDiscovered) {