blob: fd54a43c7fe026fdd3e57f983ca7c7bb0d8b8445 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.zk;
import static org.apache.drill.shaded.guava.com.google.common.collect.Collections2.transform;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.drill.shaded.guava.com.google.common.base.Throwables;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.coord.zk.ZKRegistrationHandle;
import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore;
import org.apache.drill.exec.coord.zk.ZkEphemeralStore;
import org.apache.drill.exec.coord.zk.ZkTransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
/**
* Manages cluster coordination utilizing zookeeper.
* <p>
* This is a clone of the Drill class
* org.apache.drill.exec.coord.zk.ZKClusterCoordinator with a number of
* modifications:
* <ul>
* <li>Removed dependency on the Drill config system. That system uses Google's
* Guava library version 18, which conflicts with the earlier versions used by
* YARN and Hadoop, which resulted in runtime undefined method exceptions.</li>
* <li>Instead of getting config information out of the Drill config, the
* parameters are instead passed directly.</li>
* <li>Adds support for the drillbits registered event which was neither needed
* nor implemented by Drill.</li>
* <li>Use the YARN logging system instead of Drill's.</li>
* </ul>
* <p>
* This class should be replaced by the Drill version if/when the Guava
* conflicts can be resolved (and when registered Drillbit notifications are
* added to the Drill version.)
*/
public class ZKClusterCoordinator extends ClusterCoordinator {
protected static final Log logger = LogFactory
.getLog(ZKClusterCoordinator.class);
private CuratorFramework curator;
private ServiceDiscovery<DrillbitEndpoint> discovery;
private volatile Collection<DrillbitEndpoint> endpoints = Collections
.emptyList();
private final String serviceName;
private final CountDownLatch initialConnection = new CountDownLatch(1);
private final TransientStoreFactory factory;
private ServiceCache<DrillbitEndpoint> serviceCache;
public ZKClusterCoordinator(String connect, String zkRoot, String clusterId,
int retryCount, int retryDelayMs, int connectTimeoutMs)
throws IOException {
logger.debug("ZK connect: " + connect + ", zkRoot: " + zkRoot
+ ", clusterId: " + clusterId);
this.serviceName = clusterId;
RetryPolicy rp = new RetryNTimes(retryCount, retryDelayMs);
curator = CuratorFrameworkFactory.builder().namespace(zkRoot)
.connectionTimeoutMs(connectTimeoutMs).retryPolicy(rp)
.connectString(connect).build();
curator.getConnectionStateListenable()
.addListener(new InitialConnectionListener());
curator.start();
discovery = newDiscovery();
factory = CachingTransientStoreFactory
.of(new ZkTransientStoreFactory(curator));
}
public CuratorFramework getCurator() {
return curator;
}
@Override
public void start(long millisToWait) throws Exception {
logger.debug("Starting ZKClusterCoordination.");
discovery.start();
if (millisToWait != 0) {
boolean success = this.initialConnection.await(millisToWait,
TimeUnit.MILLISECONDS);
if (!success) {
throw new IOException(String.format(
"Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.",
millisToWait));
}
} else {
this.initialConnection.await();
}
serviceCache = discovery.serviceCacheBuilder().name(serviceName).build();
serviceCache.addListener(new EndpointListener());
serviceCache.start();
updateEndpoints();
}
private class InitialConnectionListener implements ConnectionStateListener {
@Override
public void stateChanged(CuratorFramework client,
ConnectionState newState) {
if (newState == ConnectionState.CONNECTED) {
initialConnection.countDown();
client.getConnectionStateListenable().removeListener(this);
}
}
}
private class EndpointListener implements ServiceCacheListener {
@Override
public void stateChanged(CuratorFramework client,
ConnectionState newState) {
}
@Override
public void cacheChanged() {
logger.debug("Got cache changed --> updating endpoints");
updateEndpoints();
}
}
@Override
public void close() throws Exception {
// discovery attempts to close its caches(ie serviceCache) already. however,
// being good citizens we make sure to
// explicitly close serviceCache. Not only that we make sure to close
// serviceCache before discovery to prevent
// double releasing and disallowing jvm to spit bothering warnings. simply
// put, we are great!
AutoCloseables.close(serviceCache, discovery, factory, curator);
}
@Override
public RegistrationHandle register(DrillbitEndpoint data) {
try {
ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(
data);
discovery.registerService(serviceInstance);
return new ZKRegistrationHandle(serviceInstance.getId(), data);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@Override
public void unregister(RegistrationHandle handle) {
if (!(handle instanceof ZKRegistrationHandle)) {
throw new UnsupportedOperationException(
"Unknown handle type: " + handle.getClass().getName());
}
// when Drillbit is unregistered, clean all the listeners registered in CC.
this.listeners.clear();
ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
try {
ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance
.<DrillbitEndpoint> builder().address("").port(0).id(h.id)
.name(serviceName).build();
discovery.unregisterService(serviceInstance);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@Override
public Collection<DrillbitEndpoint> getAvailableEndpoints() {
return this.endpoints;
}
@Override
public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
return new ZkDistributedSemaphore(curator, "/semaphore/" + name,
maximumLeases);
}
@Override
public <V> TransientStore<V> getOrCreateTransientStore(
final TransientStoreConfig<V> config) {
final ZkEphemeralStore<V> store = (ZkEphemeralStore<V>) factory
.getOrCreateStore(config);
return store;
}
private synchronized void updateEndpoints() {
try {
Collection<DrillbitEndpoint> newDrillbitSet = transform(
discovery.queryForInstances(serviceName),
new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
@Override
public DrillbitEndpoint apply(
ServiceInstance<DrillbitEndpoint> input) {
return input.getPayload();
}
});
// set of newly dead bits : original bits - new set of active bits.
Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
unregisteredBits.removeAll(newDrillbitSet);
// Set of newly live bits : new set of active bits - original bits.
Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet);
registeredBits.removeAll(endpoints);
endpoints = newDrillbitSet;
if (logger.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Active drillbit set changed. Now includes ");
builder.append(newDrillbitSet.size());
builder.append(" total bits.");
if (!newDrillbitSet.isEmpty()) {
builder.append(" New active drillbits: \n");
}
for (DrillbitEndpoint bit : newDrillbitSet) {
builder.append('\t');
builder.append(bit.getAddress());
builder.append(':');
builder.append(bit.getUserPort());
builder.append(':');
builder.append(bit.getControlPort());
builder.append(':');
builder.append(bit.getDataPort());
builder.append('\n');
}
logger.debug(builder.toString());
}
// Notify the drillbit listener for newly unregistered bits.
if (!(unregisteredBits.isEmpty())) {
drillbitUnregistered(unregisteredBits);
}
// Notify the drillbit listener for newly registered bits.
if (!(registeredBits.isEmpty())) {
drillbitRegistered(registeredBits);
}
} catch (Exception e) {
logger.error("Failure while update Drillbit service location cache.", e);
}
}
protected ServiceInstance<DrillbitEndpoint> newServiceInstance(
DrillbitEndpoint endpoint) throws Exception {
return ServiceInstance.<DrillbitEndpoint> builder().name(serviceName)
.payload(endpoint).build();
}
protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() {
return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath("/")
.client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER)
.build();
}
@Override
public Collection<DrillbitEndpoint> getOnlineEndPoints() {
// Not used in DoY
throw new UnsupportedOperationException();
}
@Override
public RegistrationHandle update(RegistrationHandle handle, State state) {
// Not used in DoY
throw new UnsupportedOperationException();
}
}