blob: 654ffad79e22a732583fbf58401c849a7eebee47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.zk;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.test.BaseTest;
import org.apache.drill.yarn.appMaster.EventContext;
import org.apache.drill.yarn.appMaster.RegistryHandler;
import org.apache.drill.yarn.appMaster.Task;
import org.apache.drill.yarn.appMaster.TaskLifecycleListener.Event;
import org.apache.drill.yarn.zk.ZKRegistry.DrillbitTracker;
import org.junit.Test;
/**
* Tests for the AM version of the cluster coordinator. The AM version has no
* dependencies on the DoY config system or other systems, making it easy to
* test in isolation using the Curator-provided test server.
*/
public class TestZkRegistry extends BaseTest {
private static final String BARNEY_HOST = "barney";
private static final String WILMA_HOST = "wilma";
private static final String TEST_HOST = "host";
private static final String FRED_HOST = "fred";
public static final int TEST_USER_PORT = 123;
public static final int TEST_CONTROL_PORT = 456;
public static final int TEST_DATA_PORT = 789;
public static final String ZK_ROOT = "test-root";
public static final String CLUSTER_ID = "test-cluster";
/**
* Validate that the key format used for endpoint is the same as that
* generated for hosts coming from YARN.
*/
@Test
public void testFormat() {
DrillbitEndpoint dbe = makeEndpoint(TEST_HOST);
assertEquals(makeKey(TEST_HOST), ZKClusterCoordinatorDriver.asString(dbe));
ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver()
.setPorts(123, 456, 789);
assertEquals(makeKey(TEST_HOST), driver.toKey(TEST_HOST));
// Internal default ports (used mostly for testing.)
driver = new ZKClusterCoordinatorDriver();
assertEquals("fred:31010:31011:31012", driver.toKey(FRED_HOST));
}
public static String makeKey(String host) {
return host + ":" + TEST_USER_PORT + ":" + TEST_CONTROL_PORT + ":"
+ TEST_DATA_PORT;
}
/**
* Basic setup: start a ZK and verify that the initial endpoint list is empty.
* Also validates the basics of the test setup (mock server, etc.)
*
* @throws Exception
*/
@Test
public void testBasics() throws Exception {
try (TestingServer server = new TestingServer()) {
server.start();
String connStr = server.getConnectString();
ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver()
.setConnect(connStr, "drill", "drillbits").build();
assertTrue(driver.getInitialEndpoints().isEmpty());
driver.close();
server.stop();
}
}
private class TestDrillbitStatusListener implements DrillbitStatusListener {
protected Set<DrillbitEndpoint> added;
protected Set<DrillbitEndpoint> removed;
@Override
public void drillbitUnregistered(
Set<DrillbitEndpoint> unregisteredDrillbits) {
removed = unregisteredDrillbits;
}
@Override
public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
added = registeredDrillbits;
}
public void clear() {
added = null;
removed = null;
}
}
/**
* Test a typical life cycle: existing Drillbit on AM start, add a Drilbit
* (simulates a drillbit starting), and remove a drillbit (simulates a
* Drillbit ending).
*
* @throws Exception
*/
@Test
public void testCycle() throws Exception {
TestingServer server = new TestingServer();
server.start();
String connStr = server.getConnectString();
CuratorFramework probeZk = connectToZk(connStr);
addDrillbit(probeZk, FRED_HOST);
ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver()
.setConnect(connStr, ZK_ROOT, CLUSTER_ID).build();
List<DrillbitEndpoint> bits = driver.getInitialEndpoints();
assertEquals(1, bits.size());
assertEquals(makeKey(FRED_HOST),
ZKClusterCoordinatorDriver.asString(bits.get(0)));
TestDrillbitStatusListener listener = new TestDrillbitStatusListener();
driver.addDrillbitListener(listener);
addDrillbit(probeZk, WILMA_HOST);
Thread.sleep(50);
assertNull(listener.removed);
assertNotNull(listener.added);
assertEquals(1, listener.added.size());
for (DrillbitEndpoint dbe : listener.added) {
assertEquals(makeKey(WILMA_HOST),
ZKClusterCoordinatorDriver.asString(dbe));
}
listener.clear();
removeDrillbit(probeZk, FRED_HOST);
Thread.sleep(50);
assertNull(listener.added);
assertNotNull(listener.removed);
assertEquals(1, listener.removed.size());
for (DrillbitEndpoint dbe : listener.removed) {
assertEquals(makeKey(FRED_HOST),
ZKClusterCoordinatorDriver.asString(dbe));
}
probeZk.close();
driver.close();
server.stop();
server.close();
}
/**
* Make a Drill endpoint using the hard-coded test ports and the given host
* name.
*
* @param host
* @return
*/
private DrillbitEndpoint makeEndpoint(String host) {
return DrillbitEndpoint.newBuilder().setAddress(host)
.setControlPort(TEST_CONTROL_PORT).setDataPort(TEST_DATA_PORT)
.setUserPort(TEST_USER_PORT).build();
}
/**
* Pretend to be a Drillbit creating its ZK entry. Real Drillbits use a GUID
* as the key, but we just use the host name, which is good enough for our
* purposes here.
*
* @param zk
* @param host
* @throws Exception
*/
private void addDrillbit(CuratorFramework zk, String host) throws Exception {
DrillbitEndpoint dbe = makeEndpoint(host);
ServiceInstance<DrillbitEndpoint> si = ServiceInstance
.<DrillbitEndpoint> builder().name(CLUSTER_ID).payload(dbe).build();
byte data[] = DrillServiceInstanceHelper.SERIALIZER.serialize(si);
zk.create().forPath("/" + host, data);
}
private void removeDrillbit(CuratorFramework zk, String host)
throws Exception {
zk.delete().forPath("/" + host);
}
/**
* Connect to the test ZK for the simulated Drillbit side of the test. (The AM
* side of the test uses the actual AM code, which is what we're testing
* here...)
*
* @param connectString
* @return
*/
public static CuratorFramework connectToZk(String connectString) {
CuratorFramework client = CuratorFrameworkFactory.builder()
.namespace(ZK_ROOT + "/" + CLUSTER_ID).connectString(connectString)
.retryPolicy(new RetryNTimes(3, 1000)).build();
client.start();
return client;
}
private static class TestRegistryHandler implements RegistryHandler {
String reserved;
String released;
Task start;
Task end;
public void clear() {
reserved = null;
released = null;
start = null;
end = null;
}
@Override
public void reserveHost(String hostName) {
assertNull(reserved);
reserved = hostName;
}
@Override
public void releaseHost(String hostName) {
assertNull(released);
released = hostName;
}
@Override
public void startAck(Task task, String propertyKey, Object value) {
start = task;
}
@Override
public void completionAck(Task task, String endpointProperty) {
end = task;
}
@Override
public void registryDown() {
// TODO Auto-generated method stub
}
}
public static class TestTask extends Task {
private String host;
public TestTask(String host) {
super(null, null);
this.host = host;
}
@Override
public String getHostName() {
return host;
}
@Override
public void resetTrackingState() {
trackingState = TrackingState.NEW;
}
}
@Test
public void testZKRegistry() throws Exception {
TestingServer server = new TestingServer();
server.start();
String connStr = server.getConnectString();
CuratorFramework probeZk = connectToZk(connStr);
addDrillbit(probeZk, FRED_HOST);
ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver()
.setConnect(connStr, ZK_ROOT, CLUSTER_ID)
.setPorts(TEST_USER_PORT, TEST_CONTROL_PORT, TEST_DATA_PORT).build();
ZKRegistry registry = new ZKRegistry(driver);
TestRegistryHandler handler = new TestRegistryHandler();
registry.start(handler);
// We started with one "stray" drillbit that will be reported as unmanaged.
assertEquals(FRED_HOST, handler.reserved);
List<String> unmanaged = registry.listUnmanagedDrillits();
assertEquals(1, unmanaged.size());
String fredsKey = makeKey(FRED_HOST);
assertEquals(fredsKey, unmanaged.get(0));
Map<String, DrillbitTracker> trackers = registry.getRegistryForTesting();
assertEquals(1, trackers.size());
assertTrue(trackers.containsKey(fredsKey));
DrillbitTracker fredsTracker = trackers.get(fredsKey);
assertEquals(fredsKey, fredsTracker.key);
assertEquals(DrillbitTracker.State.UNMANAGED, fredsTracker.state);
assertNull(fredsTracker.task);
assertEquals(fredsKey,
ZKClusterCoordinatorDriver.asString(fredsTracker.endpoint));
// The handler should have been told about the initial stray.
assertEquals(FRED_HOST, handler.reserved);
// Pretend to start a new Drillbit.
Task wilmasTask = new TestTask(WILMA_HOST);
EventContext context = new EventContext(wilmasTask);
// Registry ignores the created event.
registry.stateChange(Event.CREATED, context);
assertEquals(1, registry.getRegistryForTesting().size());
// But, does care about the allocated event.
registry.stateChange(Event.ALLOCATED, context);
assertEquals(2, registry.getRegistryForTesting().size());
String wilmasKey = makeKey(WILMA_HOST);
DrillbitTracker wilmasTracker = registry.getRegistryForTesting()
.get(wilmasKey);
assertNotNull(wilmasTracker);
assertEquals(wilmasTask, wilmasTracker.task);
assertNull(wilmasTracker.endpoint);
assertEquals(wilmasKey, wilmasTracker.key);
assertEquals(DrillbitTracker.State.NEW, wilmasTracker.state);
handler.clear();
// Time goes on. The Drillbit starts and registers itself.
addDrillbit(probeZk, WILMA_HOST);
Thread.sleep(100);
assertEquals(wilmasTask, handler.start);
assertEquals(DrillbitTracker.State.REGISTERED, wilmasTracker.state);
assertEquals(handler.start, wilmasTask);
// Create another task: Barney
Task barneysTask = new TestTask(BARNEY_HOST);
context = new EventContext(barneysTask);
registry.stateChange(Event.CREATED, context);
// Start Barney, but assume a latency in Yarn, but not ZK.
// We get the ZK registration before the YARN launch confirmation.
handler.clear();
addDrillbit(probeZk, BARNEY_HOST);
Thread.sleep(100);
assertEquals(BARNEY_HOST, handler.reserved);
String barneysKey = makeKey(BARNEY_HOST);
DrillbitTracker barneysTracker = registry.getRegistryForTesting()
.get(barneysKey);
assertNotNull(barneysTracker);
assertEquals(DrillbitTracker.State.UNMANAGED, barneysTracker.state);
assertNull(barneysTracker.task);
assertEquals(2, registry.listUnmanagedDrillits().size());
handler.clear();
registry.stateChange(Event.ALLOCATED, context);
assertEquals(DrillbitTracker.State.REGISTERED, barneysTracker.state);
assertEquals(handler.start, barneysTask);
assertEquals(barneysTask, barneysTracker.task);
assertEquals(1, registry.listUnmanagedDrillits().size());
// Barney is having problems, it it drops out of ZK.
handler.clear();
removeDrillbit(probeZk, BARNEY_HOST);
Thread.sleep(100);
assertEquals(barneysTask, handler.end);
assertEquals(DrillbitTracker.State.DEREGISTERED, barneysTracker.state);
// Barney comes alive (presumably before the controller gives up and kills
// the Drillbit.)
handler.clear();
addDrillbit(probeZk, BARNEY_HOST);
Thread.sleep(100);
assertEquals(barneysTask, handler.start);
assertEquals(DrillbitTracker.State.REGISTERED, barneysTracker.state);
// Barney is killed by the controller.
// ZK entry drops. Tracker is removed, controller is notified.
handler.clear();
removeDrillbit(probeZk, BARNEY_HOST);
Thread.sleep(100);
assertNotNull(registry.getRegistryForTesting().get(barneysKey));
assertEquals(barneysTask, handler.end);
// The controller tells the registry to stop tracking the Drillbit.
handler.clear();
registry.stateChange(Event.ENDED, context);
assertNull(handler.end);
assertNull(registry.getRegistryForTesting().get(barneysKey));
// The stray drillbit deregisters from ZK. The tracker is removed.
handler.clear();
removeDrillbit(probeZk, FRED_HOST);
Thread.sleep(100);
assertNull(registry.getRegistryForTesting().get(fredsKey));
assertNull(handler.end);
assertEquals(FRED_HOST, handler.released);
// Wilma is killed by the controller.
handler.clear();
removeDrillbit(probeZk, WILMA_HOST);
Thread.sleep(100);
assertEquals(wilmasTask, handler.end);
assertNull(handler.released);
assertEquals(DrillbitTracker.State.DEREGISTERED, wilmasTracker.state);
assertNotNull(registry.getRegistryForTesting().get(wilmasKey));
handler.clear();
context = new EventContext(wilmasTask);
registry.stateChange(Event.ENDED, context);
assertNull(registry.getRegistryForTesting().get(wilmasKey));
assertNull(handler.released);
assertNull(handler.end);
// All drillbits should be gone.
assertTrue(registry.getRegistryForTesting().isEmpty());
probeZk.close();
driver.close();
server.stop();
server.close();
}
}