blob: 8fe5a37ac56ad3e6a4a490444baa51c4b89f39d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.discovery;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.junit.Assert;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* Base class for testing different discovery service implementation.
*/
public abstract class DiscoveryServiceTestBase {
protected abstract Map.Entry<DiscoveryService, DiscoveryServiceClient> create();
@Test
public void simpleDiscoverable() throws Exception {
final String payload = "data";
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
DiscoveryServiceClient discoveryServiceClient = entry.getValue();
// Register one service running on one host:port
Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090, payload.getBytes());
// Discover that registered host:port.
ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
Discoverable discoverable = new Discoverable("foo", new InetSocketAddress("localhost", 8090), payload.getBytes());
// Check it exists.
Assert.assertTrue(serviceDiscovered.contains(discoverable));
// Remove the service
cancellable.cancel();
// There should be no service.
Assert.assertTrue(waitTillExpected(0, serviceDiscovered));
Assert.assertFalse(serviceDiscovered.contains(discoverable));
}
@Test
public void testChangeListener() throws InterruptedException {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
DiscoveryServiceClient discoveryServiceClient = entry.getValue();
// Start discovery
String serviceName = "listener_test";
ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
// Watch for changes.
final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
@Override
public void onChange(ServiceDiscovered serviceDiscovered) {
events.add(ImmutableList.copyOf(serviceDiscovered));
}
}, Threads.SAME_THREAD_EXECUTOR);
// An empty list will be received first, as no endpoint has been registered.
List<Discoverable> discoverables = events.poll(20, TimeUnit.SECONDS);
Assert.assertNotNull(discoverables);
Assert.assertTrue(discoverables.isEmpty());
// Register a service
Cancellable cancellable = register(discoveryService, serviceName, "localhost", 10000);
discoverables = events.poll(20, TimeUnit.SECONDS);
Assert.assertNotNull(discoverables);
Assert.assertEquals(1, discoverables.size());
// Register another service endpoint
Cancellable cancellable2 = register(discoveryService, serviceName, "localhost", 10001);
discoverables = events.poll(20, TimeUnit.SECONDS);
Assert.assertNotNull(discoverables);
Assert.assertEquals(2, discoverables.size());
// Cancel both of them
cancellable.cancel();
cancellable2.cancel();
// There could be more than one event triggered, but the last event should be an empty list.
discoverables = events.poll(20, TimeUnit.SECONDS);
Assert.assertNotNull(discoverables);
if (!discoverables.isEmpty()) {
discoverables = events.poll(20, TimeUnit.SECONDS);
}
Assert.assertTrue(discoverables.isEmpty());
}
@Test
public void testCancelChangeListener() throws InterruptedException {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
DiscoveryServiceClient discoveryServiceClient = entry.getValue();
String serviceName = "cancel_listener";
ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
// An executor that delay execute a Runnable. It's for testing race because listener cancel and discovery changes.
Executor delayExecutor = new Executor() {
@Override
public void execute(final Runnable command) {
Thread t = new Thread() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
command.run();
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
};
t.start();
}
};
final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
Cancellable cancelWatch = serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
@Override
public void onChange(ServiceDiscovered serviceDiscovered) {
events.add(ImmutableList.copyOf(serviceDiscovered));
}
}, delayExecutor);
// Wait for the init event call
Assert.assertNotNull(events.poll(3, TimeUnit.SECONDS));
// Register a new service endpoint, wait a short while and then cancel the listener
register(discoveryService, serviceName, "localhost", 1);
TimeUnit.SECONDS.sleep(1);
cancelWatch.cancel();
// The change listener shouldn't get any event, since the invocation is delayed by the executor.
Assert.assertNull(events.poll(3, TimeUnit.SECONDS));
}
@Test
public void manySameDiscoverable() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
DiscoveryServiceClient discoveryServiceClient = entry.getValue();
List<Cancellable> cancellables = Lists.newArrayList();
cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("manyDiscoverable");
Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
for (int i = 0; i < 5; i++) {
cancellables.get(i).cancel();
Assert.assertTrue(waitTillExpected(4 - i, serviceDiscovered));
}
}
@Test
public void multiServiceDiscoverable() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
DiscoveryServiceClient discoveryServiceClient = entry.getValue();
List<Cancellable> cancellables = Lists.newArrayList();
cancellables.add(register(discoveryService, "service1", "localhost", 1));
cancellables.add(register(discoveryService, "service1", "localhost", 2));
cancellables.add(register(discoveryService, "service1", "localhost", 3));
cancellables.add(register(discoveryService, "service1", "localhost", 4));
cancellables.add(register(discoveryService, "service1", "localhost", 5));
cancellables.add(register(discoveryService, "service2", "localhost", 1));
cancellables.add(register(discoveryService, "service2", "localhost", 2));
cancellables.add(register(discoveryService, "service2", "localhost", 3));
cancellables.add(register(discoveryService, "service3", "localhost", 1));
cancellables.add(register(discoveryService, "service3", "localhost", 2));
ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("service1");
Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
serviceDiscovered = discoveryServiceClient.discover("service2");
Assert.assertTrue(waitTillExpected(3, serviceDiscovered));
serviceDiscovered = discoveryServiceClient.discover("service3");
Assert.assertTrue(waitTillExpected(2, serviceDiscovered));
cancellables.add(register(discoveryService, "service3", "localhost", 3));
Assert.assertTrue(waitTillExpected(3, serviceDiscovered)); // Shows live iterator.
for (Cancellable cancellable : cancellables) {
cancellable.cancel();
}
Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
}
@Test
public void testIterator() throws InterruptedException {
// This test is to verify TWILL-75
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
final DiscoveryService service = entry.getKey();
DiscoveryServiceClient client = entry.getValue();
final String serviceName = "iterator";
ServiceDiscovered discovered = client.discover(serviceName);
// Create a thread for performing registration.
Thread t = new Thread() {
@Override
public void run() {
service.register(new Discoverable(serviceName, new InetSocketAddress(12345), new byte[]{}));
}
};
Iterator<Discoverable> iterator = discovered.iterator();
t.start();
t.join();
// This would throw exception if there is race condition.
Assert.assertFalse(iterator.hasNext());
}
protected Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
return register(service, name, host, port, new byte[]{});
}
protected Cancellable register(DiscoveryService service, final String name, final String host, final int port,
final byte[] payload) {
return service.register(new Discoverable(name, new InetSocketAddress(host, port), payload));
}
protected boolean waitTillExpected(final int expected, ServiceDiscovered serviceDiscovered) {
final CountDownLatch latch = new CountDownLatch(1);
serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
@Override
public void onChange(ServiceDiscovered serviceDiscovered) {
if (expected == Iterables.size(serviceDiscovered)) {
latch.countDown();
}
}
}, Threads.SAME_THREAD_EXECUTOR);
try {
return latch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}