TWILL-107 Add payload support for Discoverable
Signed-off-by: Gokul Gunasekaran <gokul@cask.co>
This closes #10 on Github
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
index d8e4358..9513352 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
@@ -27,7 +27,15 @@
/**
* Registers an endpoint that could be discovered by external party.
* @param serviceName Name of the endpoint
- * @param port Port of the endpoint.
+ * @param port Port of the endpoint
*/
Cancellable announce(String serviceName, int port);
+
+ /**
+ * Registers an endpoint that could be discovered by external party with a payload
+ * @param serviceName Name of the endpoint
+ * @param port Port of the endpoint
+ * @param payload byte array payload
+ */
+ Cancellable announce(String serviceName, int port, byte[] payload);
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
index 370c3d3..d9cca85 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
@@ -131,17 +131,12 @@
@Override
public Cancellable announce(final String serviceName, final int port) {
- return discoveryService.register(new Discoverable() {
- @Override
- public String getName() {
- return serviceName;
- }
+ return announce(serviceName, port, new byte[]{});
+ }
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(getHost(), port);
- }
- });
+ @Override
+ public Cancellable announce(final String serviceName, final int port, final byte[] payload) {
+ return discoveryService.register(new Discoverable(serviceName, new InetSocketAddress(getHost(), port), payload));
}
@Override
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
index a5529fe..f36a3db 100644
--- a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
@@ -19,19 +19,70 @@
package org.apache.twill.discovery;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Objects;
/**
* Discoverable defines the attributes of service to be discovered.
*/
-public interface Discoverable {
+public class Discoverable {
+ private final String name;
+ private final InetSocketAddress address;
+ private final byte[] payload;
+
+ public Discoverable(String name, InetSocketAddress address, byte[] payload) {
+ this.name = name;
+ this.address = address;
+ this.payload = payload;
+ }
+
+ public Discoverable(String name, InetSocketAddress address) {
+ this(name, address, new byte[]{});
+ }
/**
* @return Name of the service
*/
- String getName();
+ public String getName() {
+ return name;
+ }
/**
* @return An {@link InetSocketAddress} representing the host+port of the service.
*/
- InetSocketAddress getSocketAddress();
+ public InetSocketAddress getSocketAddress() {
+ return address;
+ }
+
+ /**
+ * @return A payload represented as a byte array
+ */
+ public byte[] getPayload() {
+ return payload;
+ }
+
+ @Override
+ public String toString() {
+ return "{name=" + name + ", address=" + address + ", payload=" + payload + "}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Discoverable other = (Discoverable) o;
+
+ return name.equals(other.getName()) && address.equals(other.getSocketAddress()) &&
+ Arrays.equals(payload, other.getPayload());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, address, payload);
+ }
}
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
index 63df8ad..dbb0f7c 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
@@ -96,17 +96,7 @@
@Override
public boolean contains(Discoverable discoverable) {
// If the name doesn't match, it shouldn't be in the list.
- if (!discoverable.getName().equals(name)) {
- return false;
- }
-
- // Wrap it if necessary for hashCode/equals comparison.
- Discoverable target = discoverable;
- if (!(target instanceof DiscoverableWrapper)) {
- target = new DiscoverableWrapper(target);
- }
-
- return discoverables.get().contains(target);
+ return discoverable.getName().equals(name) && discoverables.get().contains(discoverable);
}
@Override
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
index 0e4bcc3..b020dc7 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
@@ -18,6 +18,7 @@
package org.apache.twill.discovery;
import com.google.common.base.Charsets;
+import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
@@ -38,6 +39,7 @@
private static final Gson GSON =
new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec()).create();
+ private static final Type BYTE_ARRAY_TYPE = new TypeToken<byte[]>() { }.getType();
/**
* Helper function for encoding an instance of {@link Discoverable} into array of bytes.
@@ -73,21 +75,12 @@
public Discoverable deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) throws JsonParseException {
JsonObject jsonObj = json.getAsJsonObject();
- final String service = jsonObj.get("service").getAsString();
+ String service = jsonObj.get("service").getAsString();
String hostname = jsonObj.get("hostname").getAsString();
int port = jsonObj.get("port").getAsInt();
- final InetSocketAddress address = new InetSocketAddress(hostname, port);
- return new DiscoverableWrapper(new Discoverable() {
- @Override
- public String getName() {
- return service;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return address;
- }
- });
+ byte[] payload = context.deserialize(jsonObj.get("payload"), BYTE_ARRAY_TYPE);
+ InetSocketAddress address = new InetSocketAddress(hostname, port);
+ return new Discoverable(service, address, payload);
}
@Override
@@ -96,6 +89,7 @@
jsonObj.addProperty("service", src.getName());
jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
jsonObj.addProperty("port", src.getSocketAddress().getPort());
+ jsonObj.add("payload", context.serialize(src.getPayload()));
return jsonObj;
}
}
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
deleted file mode 100644
index 5fa97d1..0000000
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import java.net.InetSocketAddress;
-
-/**
- * Wrapper for a discoverable.
- */
-final class DiscoverableWrapper implements Discoverable {
- private final String name;
- private final InetSocketAddress address;
-
- DiscoverableWrapper(Discoverable discoverable) {
- this.name = discoverable.getName();
- this.address = discoverable.getSocketAddress();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return address;
- }
-
- @Override
- public String toString() {
- return "{name=" + name + ", address=" + address;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Discoverable other = (Discoverable) o;
-
- return name.equals(other.getName()) && address.equals(other.getSocketAddress());
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
-}
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
index 6a71855..83a6681 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -37,12 +37,11 @@
@Override
public Cancellable register(final Discoverable discoverable) {
- final Discoverable wrapper = new DiscoverableWrapper(discoverable);
- final String serviceName = wrapper.getName();
+ final String serviceName = discoverable.getName();
lock.lock();
try {
- services.put(serviceName, wrapper);
+ services.put(serviceName, discoverable);
DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
if (serviceDiscovered != null) {
@@ -57,7 +56,7 @@
public void cancel() {
lock.lock();
try {
- services.remove(serviceName, wrapper);
+ services.remove(serviceName, discoverable);
DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
if (serviceDiscovered != null) {
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index c563bab..4f2d0f7 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -146,23 +146,22 @@
*/
@Override
public Cancellable register(final Discoverable discoverable) {
- final Discoverable wrapper = new DiscoverableWrapper(discoverable);
final SettableFuture<String> future = SettableFuture.create();
- final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
+ final DiscoveryCancellable cancellable = new DiscoveryCancellable(discoverable);
// Create the zk ephemeral node.
- Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
+ Futures.addCallback(doRegister(discoverable), new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
// Set the sequence node path to cancellable for future cancellation.
cancellable.setPath(result);
lock.lock();
try {
- discoverables.put(wrapper, cancellable);
+ discoverables.put(discoverable, cancellable);
} finally {
lock.unlock();
}
- LOG.debug("Service registered: {} {}", wrapper, result);
+ LOG.debug("Service registered: {} {}", discoverable, result);
future.set(result);
}
@@ -171,7 +170,7 @@
if (t instanceof KeeperException.NodeExistsException) {
handleRegisterFailure(discoverable, future, this, t);
} else {
- LOG.warn("Failed to register: {}", wrapper, t);
+ LOG.warn("Failed to register: {}", discoverable, t);
future.setException(t);
}
}
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
index 23deddc..8fe5a37 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
@@ -45,28 +45,19 @@
@Test
public void simpleDiscoverable() throws Exception {
+ final String payload = "data";
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
DiscoveryServiceClient discoveryServiceClient = entry.getValue();
// Register one service running on one host:port
- Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+ Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090, payload.getBytes());
// Discover that registered host:port.
ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
- Discoverable discoverable = new Discoverable() {
- @Override
- public String getName() {
- return "foo";
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress("localhost", 8090);
- }
- };
+ Discoverable discoverable = new Discoverable("foo", new InetSocketAddress("localhost", 8090), payload.getBytes());
// Check it exists.
Assert.assertTrue(serviceDiscovered.contains(discoverable));
@@ -259,17 +250,7 @@
Thread t = new Thread() {
@Override
public void run() {
- service.register(new Discoverable() {
- @Override
- public String getName() {
- return serviceName;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(12345);
- }
- });
+ service.register(new Discoverable(serviceName, new InetSocketAddress(12345), new byte[]{}));
}
};
@@ -282,17 +263,12 @@
}
protected Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
- return service.register(new Discoverable() {
- @Override
- public String getName() {
- return name;
- }
+ return register(service, name, host, port, new byte[]{});
+ }
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(host, port);
- }
- });
+ protected Cancellable register(DiscoveryService service, final String name, final String host, final int port,
+ final byte[] payload) {
+ return service.register(new Discoverable(name, new InetSocketAddress(host, port), payload));
}
protected boolean waitTillExpected(final int expected, ServiceDiscovered serviceDiscovered) {