blob: fe097e539e7b6a7525404a2ff8727861c985e633 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.statemgr.zookeeper.curator;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Message;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.proto.system.ExecutionEnvironment;
import org.apache.heron.proto.system.PackingPlans;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.proto.tmaster.TopologyMaster;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.statemgr.Lock;
import org.apache.heron.spi.statemgr.WatchCallback;
import org.apache.heron.spi.utils.NetworkUtils;
import org.apache.heron.statemgr.FileSystemStateManager;
import org.apache.heron.statemgr.zookeeper.ZkContext;
import org.apache.heron.statemgr.zookeeper.ZkUtils;
import org.apache.heron.statemgr.zookeeper.ZkWatcherCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
public class CuratorStateManager extends FileSystemStateManager {
private static final Logger LOG = Logger.getLogger(CuratorStateManager.class.getName());
private CuratorFramework client;
private String connectionString;
private boolean isSchedulerService;
private List<Process> tunnelProcesses;
private Config config;
@Override
public void initialize(Config newConfig) {
super.initialize(newConfig);
this.config = newConfig;
this.connectionString = Context.stateManagerConnectionString(newConfig);
this.isSchedulerService = Context.schedulerService(newConfig);
this.tunnelProcesses = new ArrayList<>();
NetworkUtils.TunnelConfig tunnelConfig =
NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.STATE_MANAGER);
if (tunnelConfig.isTunnelNeeded()) {
Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
String newConnectionString = tunneledResults.first;
if (newConnectionString.isEmpty()) {
throw new IllegalArgumentException("Failed to connect to tunnel host '"
+ tunnelConfig.getTunnelHost() + "'");
}
// Use the new connection string
connectionString = newConnectionString;
tunnelProcesses.addAll(tunneledResults.second);
}
// Start it
client = getCuratorClient();
LOG.info("Starting Curator client connecting to: " + connectionString);
client.start();
try {
if (!client.blockUntilConnected(ZkContext.connectionTimeoutMs(newConfig),
TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Failed to initialize CuratorClient");
}
} catch (InterruptedException e) {
throw new RuntimeException("Failed to initialize CuratorClient", e);
}
if (ZkContext.isInitializeTree(newConfig)) {
initTree();
}
}
/**
* Lock backed by {@code InterProcessSemaphoreMutex}. Guaranteed to atomically get a
* distributed ephemeral lock backed by zookeeper. The lock should be explicitly released to
* avoid unnecessary waiting by other threads waiting on it.
*/
private final class DistributedLock implements Lock {
private String path;
private InterProcessSemaphoreMutex lock;
private DistributedLock(CuratorFramework client, String path) {
this.path = path;
this.lock = new InterProcessSemaphoreMutex(client, path);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
try {
return this.lock.acquire(timeout, unit);
} catch (InterruptedException e) {
throw e;
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
throw new RuntimeException("Error while trying to acquire distributed lock at " + path, e);
}
}
@Override
public void unlock() {
try {
this.lock.release();
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
throw new RuntimeException("Error while trying to release distributed lock at " + path, e);
}
}
}
protected CuratorFramework getCuratorClient() {
// these are reasonable arguments for the ExponentialBackoffRetry. The first
// retry will wait 1 second - the second will wait up to 2 seconds - the
// third will wait up to 4 seconds.
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(
ZkContext.retryIntervalMs(config), ZkContext.retryCount(config));
// using the CuratorFrameworkFactory.builder() gives fine grained control
// over creation options. See the CuratorFrameworkFactory.Builder javadoc
// details
return CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(ZkContext.connectionTimeoutMs(config))
.sessionTimeoutMs(ZkContext.sessionTimeoutMs(config))
// etc. etc.
.build();
}
protected Pair<String, List<Process>> setupZkTunnel(NetworkUtils.TunnelConfig tunnelConfig) {
return ZkUtils.setupZkTunnel(config, tunnelConfig);
}
protected void initTree() {
// Make necessary directories
for (StateLocation location : StateLocation.values()) {
LOG.fine(String.format("%s directory: %s", location.getName(), getStateDirectory(location)));
}
try {
for (StateLocation location : StateLocation.values()) {
client.createContainers(getStateDirectory(location));
}
// Suppress it since createContainers() throws Exception
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
throw new RuntimeException("Failed to initialize tree", e);
}
LOG.info("Directory tree initialized.");
}
@Override
public void close() {
if (client != null) {
LOG.info("Closing the CuratorClient to: " + connectionString);
client.close();
}
// Close the tunneling
LOG.info("Closing the tunnel processes");
if (tunnelProcesses != null) {
for (Process process : tunnelProcesses) {
process.destroy();
}
}
}
public String getConnectionString() {
return connectionString;
}
// Make utils class protected for easy unit testing
@Override
protected ListenableFuture<Boolean> nodeExists(String path) {
final SettableFuture<Boolean> result = SettableFuture.create();
try {
LOG.info("Checking existence of path: " + path);
safeSetFuture(result, client.checkExists().forPath(path) != null);
// Suppress it since forPath() throws Exception
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
safeSetException(result, new RuntimeException("Could not check Exist", e));
}
return result;
}
protected ListenableFuture<Boolean> createNode(
StateLocation location, String topologyName,
byte[] data,
boolean isEphemeral) {
return createNode(getStatePath(location, topologyName), data, isEphemeral);
}
@VisibleForTesting
protected ListenableFuture<Boolean> createNode(
String path,
byte[] data,
boolean isEphemeral) {
final SettableFuture<Boolean> result = SettableFuture.create();
try {
client.create().
withMode(isEphemeral ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT)
.forPath(path, data);
LOG.info("Created node for path: " + path);
safeSetFuture(result, true);
// Suppress it since forPath() throws Exception
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
safeSetException(result, new RuntimeException("Could not createNode:", e));
}
return result;
}
@Override
protected ListenableFuture<Boolean> deleteNode(String path, boolean deleteChildrenIfNecessary) {
final SettableFuture<Boolean> result = SettableFuture.create();
try {
DeleteBuilder deleteBuilder = client.delete();
if (deleteChildrenIfNecessary) {
deleteBuilder = (DeleteBuilder) deleteBuilder.deletingChildrenIfNeeded();
}
deleteBuilder.withVersion(-1).forPath(path);
LOG.info("Deleted node for path: " + path);
safeSetFuture(result, true);
} catch (KeeperException e) {
if (KeeperException.Code.NONODE.equals(e.code())) {
safeSetFuture(result, true);
} else {
safeSetException(result, new RuntimeException("Could not deleteNode", e));
}
// Suppress it since forPath() throws Exception
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
safeSetException(result, new RuntimeException("Could not deleteNode", e));
}
return result;
}
@Override
protected <M extends Message> ListenableFuture<M> getNodeData(
WatchCallback watcher,
String path,
final Message.Builder builder) {
final SettableFuture<M> future = SettableFuture.create();
Watcher wc = ZkWatcherCallback.makeZkWatcher(watcher);
BackgroundCallback cb = new BackgroundCallback() {
@Override
@SuppressWarnings("unchecked") // we don't know what M is until runtime
public void processResult(CuratorFramework aClient, CuratorEvent event) throws Exception {
byte[] data;
if (event != null & (data = event.getData()) != null) {
builder.mergeFrom(data);
safeSetFuture(future, (M) builder.build());
} else {
safeSetException(future, new RuntimeException("Failed to fetch data from path: "
+ event.getPath()));
}
}
};
try {
client.getData().usingWatcher(wc).inBackground(cb).forPath(path);
// Suppress it since forPath() throws Exception
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
safeSetException(future, new RuntimeException("Could not getNodeData", e));
}
return future;
}
@Override
protected Lock getLock(String path) {
return new DistributedLock(this.client, path);
}
@Override
public ListenableFuture<Boolean> setTMasterLocation(
TopologyMaster.TMasterLocation location,
String topologyName) {
return createNode(StateLocation.TMASTER_LOCATION, topologyName, location.toByteArray(), true);
}
@Override
public ListenableFuture<Boolean> setMetricsCacheLocation(
TopologyMaster.MetricsCacheLocation location,
String topologyName) {
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
if (arg1 != ConnectionState.CONNECTED) {
// if not the first time successful connection, fail fast
throw new RuntimeException("Unexpected state change to: " + arg1.name());
}
}
});
return createNode(
StateLocation.METRICSCACHE_LOCATION, topologyName, location.toByteArray(), true);
}
@Override
public ListenableFuture<Boolean> setExecutionState(
ExecutionEnvironment.ExecutionState executionState,
String topologyName) {
return createNode(
StateLocation.EXECUTION_STATE, topologyName, executionState.toByteArray(), false);
}
@Override
public ListenableFuture<Boolean> setTopology(
TopologyAPI.Topology topology,
String topologyName) {
return createNode(StateLocation.TOPOLOGY, topologyName, topology.toByteArray(), false);
}
@Override
public ListenableFuture<Boolean> setPhysicalPlan(
PhysicalPlans.PhysicalPlan physicalPlan,
String topologyName) {
return createNode(StateLocation.PHYSICAL_PLAN, topologyName, physicalPlan.toByteArray(), false);
}
@Override
public ListenableFuture<Boolean> setPackingPlan(
PackingPlans.PackingPlan packingPlan,
String topologyName) {
return createNode(StateLocation.PACKING_PLAN, topologyName, packingPlan.toByteArray(), false);
}
@Override
public ListenableFuture<Boolean> setStatefulCheckpoints(
CheckpointManager.StatefulConsistentCheckpoints checkpoint,
String topologyName) {
return createNode(StateLocation.STATEFUL_CHECKPOINT, topologyName,
checkpoint.toByteArray(), false);
}
@Override
public ListenableFuture<Boolean> setSchedulerLocation(
Scheduler.SchedulerLocation location,
String topologyName) {
// if isService, set the node as ephemeral node; set as persistent node otherwise
return createNode(StateLocation.SCHEDULER_LOCATION, topologyName,
location.toByteArray(),
isSchedulerService);
}
@Override
public ListenableFuture<Boolean> deleteTMasterLocation(String topologyName) {
// It is a EPHEMERAL node and would be removed automatically
final SettableFuture<Boolean> result = SettableFuture.create();
safeSetFuture(result, true);
return result;
}
@Override
public ListenableFuture<Boolean> deleteMetricsCacheLocation(String topologyName) {
// It is a EPHEMERAL node and would be removed automatically
final SettableFuture<Boolean> result = SettableFuture.create();
safeSetFuture(result, true);
return result;
}
@Override
public ListenableFuture<Boolean> deleteSchedulerLocation(String topologyName) {
// if scheduler is service, the znode is ephemeral and it's deleted automatically
if (isSchedulerService) {
final SettableFuture<Boolean> result = SettableFuture.create();
safeSetFuture(result, true);
return result;
} else {
return deleteNode(getStatePath(StateLocation.SCHEDULER_LOCATION, topologyName), false);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException,
IllegalAccessException, ClassNotFoundException, InstantiationException {
if (args.length < 2) {
throw new RuntimeException("Expects arguments: <topology_name> <zookeeper_hostname>");
}
String zookeeperHostname = args[1];
Config config = Config.newBuilder()
.put(Key.STATEMGR_ROOT_PATH, "/storm/heron/states")
.put(Key.STATEMGR_CONNECTION_STRING, zookeeperHostname)
.put(Key.SCHEDULER_IS_SERVICE, false)
.build();
CuratorStateManager stateManager = new CuratorStateManager();
stateManager.doMain(args, config);
}
}