blob: d05c5628b47f46a1ae07a4401459ac5ff7194ab2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.zk;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.yarn.appMaster.AMWrapperException;
import org.apache.drill.yarn.appMaster.EventContext;
import org.apache.drill.yarn.appMaster.Pollable;
import org.apache.drill.yarn.appMaster.RegistryHandler;
import org.apache.drill.yarn.appMaster.Task;
import org.apache.drill.yarn.appMaster.TaskLifecycleListener;
/**
* AM-specific implementation of a Drillbit registry backed by ZooKeeper.
* Listens to ZK events for registering a Drillbit and deregistering. Alerts the
* Cluster Controller of these events.
* <p>
* Locking strategy: Receives events from both ZK and the cluster controller,
* both of which must be synchronized. To prevent deadlocks, this class NEVER
* calls into the cluster controller while holding a lock. This prevents the
* following:
* <p>
* ClusterController --> ZKRegistry (OK) <br>
* ZK --> ZKRegistry (OK) <br>
* ZK --> ZKRegistry --> Cluster Controller (bad)
* <p>
* In the case of registration, ZK calls the registry which must alert the
* cluster controller. Cluster controller alerting is handled outside the ZK
* update critical section.
* <p>
* Because ZK events are occur relatively infrequently, any deadlock will occur
* once in a blue moon, which will make it very hard to reproduce. So, extreme
* caution is needed at design time to prevent the problem.
*/
public class ZKRegistry
implements TaskLifecycleListener, DrillbitStatusListener, Pollable {
/**
* State of each Drillbit that we've discovered through ZK or launched via the
* AM. The tracker is where we combine the ZK information with AM to correlate
* overall Drillbit health.
*/
protected static class DrillbitTracker {
/**
* A Drillbit can be in one of four states.
*/
public enum State {
/**
* An unmanaged Drillbit is one that has announced itself via ZK, but
* which the AM didn't launch (or has not yet received confirmation from
* YARN that it was launched.) In the normal state, this state either does
* not occur (YARN reports the task launch before the Drillbit registers
* in ZK) or is transient (if the Drillbit registers in ZK before YARN
* gets around to telling the AM that the Drillbit was launched.) A
* Drillbit that stays in the unregistered state is likely one launched
* outside the AM: either launched manually or (possibly), one left from a
* previous, failed AM run (though YARN is supposed to kill left-over
* child processes in that case.)
*/
UNMANAGED,
/**
* A new Drillbit is one that the AM has launched, but that has not yet
* registered itself with ZK. This is normally a transient state that
* occurs as ZK registration catches up with the YARN launch notification.
* If a Drillbit says in this state, then something is seriously wrong
* (perhaps a mis-configuration). The cluster controller will patiently
* wait a while, then decide bad things are happening and will ask YARN to
* kill the Drillbit, then will retry a time or two, after which it will
* throw up its hands, blacklist the node, and wait for the admin to sort
* things out.
*/
NEW,
/**
* Normal operating state: the AM launched the Drillbit, which then
* dutifully registered itself in ZK. Nothing to see here, move along.
*/
REGISTERED,
/**
* The Drillbit was working just fine, but its registration has dropped
* out of ZK for a reason best left to the cluster controller to
* determine. Perhaps the controller has decided to kill the Drillbit.
* Perhaps the Drillbit became unresponsive (in which case the controller
* will kill it and retry) or has died (in which case YARN will alert the
* AM that the process exited.)
*/
DEREGISTERED
}
/**
* The common key used between tasks and ZK registrations. The key is of the
* form:<br>
*
* <pre>
* host:port:port:port
* </pre>
*/
protected final String key;
/**
* ZK tracking state.
*
* See {@link org.apache.drill.yarn.zk.ZKRegistry.DrillbitTracker.State}
*/
protected State state;
/**
* For Drillbits started by the AM, the task object for this Drillbit.
*/
protected Task task;
/**
* For Drillbits discovered through ZK, the Drill endpoint for the Drillbit.
*/
protected DrillbitEndpoint endpoint;
public DrillbitTracker(String key, DrillbitEndpoint endpoint) {
this.key = key;
this.state = DrillbitTracker.State.UNMANAGED;
this.endpoint = endpoint;
}
public DrillbitTracker(String key, Task task) {
this.key = key;
this.task = task;
state = DrillbitTracker.State.NEW;
}
/**
* Mark that a YARN-managed task has become registered in ZK. This indicates
* that the task has come online. Tell the task to update its state to
* record that the task is, in fact, registered in ZK. This indicates a
* normal, healthy task.
*/
private void becomeRegistered() {
state = DrillbitTracker.State.REGISTERED;
}
/**
* Mark that a YARN-managed Drillbit has dropped out of ZK.
*/
public void becomeUnregistered() {
assert state == DrillbitTracker.State.REGISTERED;
state = DrillbitTracker.State.DEREGISTERED;
endpoint = null;
}
}
public static final String CONTROLLER_PROPERTY = "zk";
public static final int UPDATE_PERIOD_MS = 20_000;
public static final String ENDPOINT_PROPERTY = "endpoint";
private static final Log LOG = LogFactory.getLog(ZKRegistry.class);
/**
* Map of host:port:port:port keys to tracking objects. Identifies the
* Drillbits discovered from ZK, started by the controller, or (ideally) both.
*/
private Map<String, DrillbitTracker> registry = new HashMap<>();
/**
* Interface to Drill's cluster coordinator.
*/
private ZKClusterCoordinatorDriver zkDriver;
/**
* Drill's cluster coordinator (or, at least, Drill-on-YARN's version of it.
*/
private RegistryHandler registryHandler;
/**
* Last check of ZK status.
*/
private long lastUpdateTime;
public ZKRegistry(ZKClusterCoordinatorDriver zkDriver) {
this.zkDriver = zkDriver;
}
/**
* Called during AM startup to initialize ZK. Checks if any Drillbits are
* already running. These are "unmanaged" because the AM could not have
* started them (since they predate the AM.)
*/
public void start(RegistryHandler controller) {
this.registryHandler = controller;
try {
zkDriver.build();
} catch (ZKRuntimeException e) {
LOG.error("Failed to start ZK monitoring", e);
throw new AMWrapperException("Failed to start ZK monitoring", e);
}
for (DrillbitEndpoint dbe : zkDriver.getInitialEndpoints()) {
String key = toKey(dbe);
registry.put(key, new DrillbitTracker(key, dbe));
// Blacklist the host for each unmanaged drillbit.
controller.reserveHost(dbe.getAddress());
LOG.warn("Host " + dbe.getAddress()
+ " already running a Drillbit outside of YARN.");
}
zkDriver.addDrillbitListener(this);
}
/**
* Convert a Drillbit endpoint to a string key used in the (zk-->task) map.
* Note that the string format here must match the one used in
* {@link #toKey(Task)} to map a task to string key.
*
* @param dbe
* the Drillbit endpoint from ZK
* @return a string key for this object
*/
private String toKey(DrillbitEndpoint dbe) {
return ZKClusterCoordinatorDriver.asString(dbe);
}
/**
* Convert a task to a string key used in the (zk-->task) map. Note that the
* string format here must match the one used in
* {@link #toKey(DrillbitEndpoint)} to map a drillbit endpoint to string key.
*
* @param task
* the task tracked by the cluster controller
* @return a string key for this object
*/
private String toKey(Task task) {
return zkDriver.toKey(task.getHostName());
}
// private String toKey(Container container) {
// return zkDriver.toKey(container.getNodeId().getHost());
// }
public static class AckEvent {
Task task;
DrillbitEndpoint endpoint;
public AckEvent(Task task, DrillbitEndpoint endpoint) {
this.task = task;
this.endpoint = endpoint;
}
}
/**
* Callback from ZK to indicate that one or more drillbits have become
* registered. We handle registrations in a critical section, then alert the
* cluster controller outside the critical section.
*/
@Override
public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
List<AckEvent> updates = registerDrillbits(registeredDrillbits);
for (AckEvent event : updates) {
if (event.task == null) {
registryHandler.reserveHost(event.endpoint.getAddress());
} else {
registryHandler.startAck(event.task, ENDPOINT_PROPERTY, event.endpoint);
}
}
}
private synchronized List<AckEvent> registerDrillbits(
Set<DrillbitEndpoint> registeredDrillbits) {
List<AckEvent> events = new ArrayList<>();
for (DrillbitEndpoint dbe : registeredDrillbits) {
AckEvent event = drillbitRegistered(dbe);
if (event != null) {
events.add(event);
}
}
return events;
}
/**
* Called when a drillbit has become registered. There are two cases. Either
* this is a normal registration of a previously-started task, or this is an
* unmanaged drillbit for which we have no matching task.
*/
private AckEvent drillbitRegistered(DrillbitEndpoint dbe) {
String key = toKey(dbe);
DrillbitTracker tracker = registry.get(key);
if (tracker == null) {
// Unmanaged drillbit case
LOG.info("Registration of unmanaged drillbit: " + key);
tracker = new DrillbitTracker(key, dbe);
registry.put(key, tracker);
return new AckEvent(null, dbe);
}
// Managed drillbit case. Might be we lost, then regained
// ZK connection.
if (tracker.state == DrillbitTracker.State.REGISTERED) {
LOG.info("Re-registration of known drillbit: " + key);
return null;
}
// Otherwise, the Drillbit has just registered with ZK.
// Or, if the ZK connection was lost and regained, the
// state changes from DEREGISTERED --> REGISTERED
LOG.info("Drillbit registered: " + key + ", task: " + tracker.task.toString() );
tracker.endpoint = dbe;
tracker.becomeRegistered();
return new AckEvent(tracker.task, dbe);
}
/**
* Callback from ZK to indicate that one or more drillbits have become
* deregistered from ZK. We handle the deregistrations in a critical section,
* but updates to the cluster controller outside of a critical section.
*/
@Override
public void drillbitUnregistered(
Set<DrillbitEndpoint> unregisteredDrillbits) {
List<AckEvent> updates = unregisterDrillbits(unregisteredDrillbits);
for (AckEvent event : updates) {
registryHandler.completionAck(event.task, ENDPOINT_PROPERTY);
}
}
private synchronized List<AckEvent> unregisterDrillbits(
Set<DrillbitEndpoint> unregisteredDrillbits) {
List<AckEvent> events = new ArrayList<>();
for (DrillbitEndpoint dbe : unregisteredDrillbits) {
AckEvent event = drillbitUnregistered(dbe);
if (event != null) {
events.add(event);
}
}
return events;
}
/**
* Handle the case that a drillbit becomes unregistered. There are three
* cases.
* <ol>
* <li>The deregistration is for a drillbit that is not in the registry table.
* Indicates a code error.</li>
* <li>The drillbit is unmanaged. This occurs for drillbits started and
* stopped outside of YARN.</li>
* <li>Normal case of deregistration of a YARN-managed drillbit. Inform the
* controller of this event.</li>
* </ol>
*
* @param dbe
*/
private AckEvent drillbitUnregistered(DrillbitEndpoint dbe) {
String key = toKey(dbe);
DrillbitTracker tracker = registry.get(key);
assert tracker != null;
if (tracker == null) {
// Something is terribly wrong.
// Have seen this when a user kills the Drillbit just after it starts. Evidently, the
// Drillbit registers with ZK just before it is killed, but before DoY hears about
// the registration.
LOG.error("Internal error - Unexpected drillbit unregistration: " + key);
return null;
}
if (tracker.state == DrillbitTracker.State.UNMANAGED) {
// Unmanaged drillbit
assert tracker.task == null;
LOG.info("Unmanaged drillbit unregistered: " + key);
registry.remove(key);
registryHandler.releaseHost(dbe.getAddress());
return null;
}
LOG.info("Drillbit unregistered: " + key + ", task: " + tracker.task.toString() );
tracker.becomeUnregistered();
return new AckEvent(tracker.task, dbe);
}
/**
* Listen for selected YARN task state changes. Called from within the cluster
* controller's critical section.
*/
@Override
public synchronized void stateChange(Event event, EventContext context) {
switch (event) {
case ALLOCATED:
taskCreated(context.task);
break;
case ENDED:
taskEnded(context.task);
break;
default:
break;
}
}
/**
* Indicates that the cluster controller has created a task that we expect to
* be monitored by ZK. We handle two cases: the normal case in which we later
* receive a ZK notification. And, the unusual case in which we've already
* received the ZK notification and we now match that notification with this
* task. (The second case could occur if latency causes us to receive the ZK
* notification before we learn from the NM that the task is alive.)
*
* @param task
*/
private void taskCreated(Task task) {
String key = toKey(task);
DrillbitTracker tracker = registry.get(key);
if (tracker == null) {
// Normal case: no ZK registration yet.
registry.put(key, new DrillbitTracker(key, task));
} else if (tracker.state == DrillbitTracker.State.UNMANAGED) {
// Unusual case: ZK registration came first.
LOG.info("Unmanaged drillbit became managed: " + key);
tracker.task = task;
tracker.becomeRegistered();
// Note: safe to call this here as we are already in the controller
// critical section.
registryHandler.startAck(task, ENDPOINT_PROPERTY, tracker.endpoint);
} else {
LOG.error(task.getLabel() + " - Drillbit registry in wrong state "
+ tracker.state + " for new task: " + key);
}
}
/**
* Report whether the given task is still registered in ZK. Called while
* waiting for a deregistration event to catch possible cases where the
* messages is lost. The message should never be lost, but we've seen
* cases where tasks hang in this state. This is a potential work-around.
*
* @param task
* @return True if the given task is regestered. False otherwise.
*/
public synchronized boolean isRegistered(Task task) {
String key = toKey(task);
DrillbitTracker tracker = registry.get(key);
if (tracker==null) {
return false;
}
return tracker.state == DrillbitTracker.State.REGISTERED;
}
/**
* Mark that a task (YARN container) has ended. Updates the (zk --> task)
* registry by removing the task. The cluster controller state machine
* monitors ZK and does not end the task until the ZK registration for that
* task drops. As a result, the entry here should be in the deregistered state
* or something is seriously wrong.
*
* @param task
*/
private void taskEnded(Task task) {
// If the task has no host name then the task is being cancelled before
// a YARN container was allocated. Just ignore such a case.
if (task.getHostName() == null) {
return;
}
String key = toKey(task);
DrillbitTracker tracker = registry.get(key);
assert tracker != null;
assert tracker.state == DrillbitTracker.State.DEREGISTERED;
registry.remove(key);
}
/**
* Periodically check ZK status. If the ZK connection has timed out, something
* is very seriously wrong. Shut the whole Drill cluster down since Drill
* cannot operate without ZooKeeper.
* <p>
* This method should not be synchronized. It checks only the ZK state, not
* internal state. Further, if we do reconnect to ZK, then a ZK thread may
* attempt to update this registry, which will acquire a synchronization lock.
*/
@Override
public void tick(long curTime) {
if (lastUpdateTime + UPDATE_PERIOD_MS < curTime) {
return;
}
lastUpdateTime = curTime;
if (zkDriver.hasFailed()) {
int secs = (int) ((zkDriver.getLostConnectionDurationMs() + 500) / 1000);
LOG.error(
"ZooKeeper connection lost, failing after " + secs + " seconds.");
registryHandler.registryDown();
}
}
public void finish(RegistryHandler handler) {
zkDriver.removeDrillbitListener(this);
zkDriver.close();
}
public synchronized List<String> listUnmanagedDrillits() {
List<String> drillbits = new ArrayList<>();
for (DrillbitTracker item : registry.values()) {
if (item.state == DrillbitTracker.State.UNMANAGED) {
drillbits.add(item.key);
}
}
return drillbits;
}
/**
* Get the current registry for testing. Why for testing? Because this is
* unsynchronized. In production code, the map may change out from under you.
*
* @return The current registry.
*/
@VisibleForTesting
protected Map<String, DrillbitTracker> getRegistryForTesting() {
return registry;
}
}