blob: 6d95009e7d134d016f76fc0d05f6010a4d2c6838 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.Messages;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* An abstract base class for implementing a {@link ServiceController} using ZooKeeper as a means for
* communicating with the remote service. This is designed to work in pair with the {@link AbstractTwillService}.
*/
public abstract class AbstractZKServiceController extends AbstractExecutionServiceController {
private static final Logger LOG = LoggerFactory.getLogger(AbstractZKServiceController.class);
protected final ZKClient zkClient;
private final InstanceNodeDataCallback instanceNodeDataCallback;
private final List<ListenableFuture<?>> messageFutures;
private ListenableFuture<State> stopMessageFuture;
protected AbstractZKServiceController(RunId runId, ZKClient zkClient) {
super(runId);
this.zkClient = zkClient;
this.instanceNodeDataCallback = new InstanceNodeDataCallback();
this.messageFutures = Lists.newLinkedList();
}
@Override
public final ListenableFuture<Command> sendCommand(Command command) {
return sendMessage(Messages.createForAll(command), command);
}
@Override
public final ListenableFuture<Command> sendCommand(String runnableName, Command command) {
return sendMessage(Messages.createForRunnable(runnableName, command), command);
}
@Override
protected final void startUp() {
doStartUp();
// Watch for instance node existence.
actOnExists(getInstancePath(), new Runnable() {
@Override
public void run() {
watchInstanceNode();
}
});
}
@Override
protected final synchronized void shutDown() {
if (stopMessageFuture == null) {
stopMessageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(),
SystemMessages.stopApplication(), State.TERMINATED);
}
// Cancel all pending message futures.
for (ListenableFuture<?> future : messageFutures) {
future.cancel(true);
}
doShutDown();
}
/**
* Sends a {@link Message} to the remote service. Returns a future that will be completed when the message
* has been processed.
* @param message The message to send.
* @param result Object to set into the future when message is being processed.
* @param <V> Type of the result.
* @return A {@link ListenableFuture} that will be completed when the message has been processed.
*/
protected final synchronized <V> ListenableFuture<V> sendMessage(Message message, V result) {
if (!isRunning()) {
return Futures.immediateFailedFuture(new IllegalStateException("Cannot send message to non-running application"));
}
final ListenableFuture<V> messageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(), message, result);
messageFutures.add(messageFuture);
messageFuture.addListener(new Runnable() {
@Override
public void run() {
// If the completion is triggered when stopping, do nothing.
if (state() == State.STOPPING) {
return;
}
synchronized (AbstractZKServiceController.this) {
messageFutures.remove(messageFuture);
}
}
}, Threads.SAME_THREAD_EXECUTOR);
return messageFuture;
}
protected final ListenableFuture<State> getStopMessageFuture() {
return stopMessageFuture;
}
/**
* Called during startup. Executed in the startup thread.
*/
protected abstract void doStartUp();
/**
* Called during shutdown. Executed in the shutdown thread.
*/
protected abstract void doShutDown();
/**
* Called when an update on the live instance node is detected.
* @param nodeData The updated live instance node data or {@code null} if there is an error when fetching
* the node data.
*/
protected abstract void instanceNodeUpdated(NodeData nodeData);
/**
* Called when failed to fetch from live instance node.
* @param cause The cause of failure of {@code null} if cause is unknown.
*/
protected abstract void instanceNodeFailed(Throwable cause);
protected synchronized void forceShutDown() {
if (stopMessageFuture == null) {
// In force shutdown, don't send message.
stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
}
stop();
}
private void actOnExists(final String path, final Runnable action) {
// Watch for node existence.
final AtomicBoolean nodeExists = new AtomicBoolean(false);
Futures.addCallback(zkClient.exists(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!shouldProcessZKEvent()) {
return;
}
// When node is created, call the action.
// Other event type would be handled by the action.
if (event.getType() == Event.EventType.NodeCreated && nodeExists.compareAndSet(false, true)) {
action.run();
}
}
}), new FutureCallback<Stat>() {
@Override
public void onSuccess(Stat result) {
if (result != null && nodeExists.compareAndSet(false, true)) {
action.run();
}
}
@Override
public void onFailure(Throwable t) {
LOG.error("Failed in exists call to {}. Shutting down service.", path, t);
forceShutDown();
}
}, Threads.SAME_THREAD_EXECUTOR);
}
protected final void watchInstanceNode() {
if (!shouldProcessZKEvent()) {
return;
}
Futures.addCallback(zkClient.getData(getInstancePath(), new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!shouldProcessZKEvent()) {
return;
}
switch (event.getType()) {
case NodeDataChanged:
watchInstanceNode();
break;
case NodeDeleted:
instanceNodeFailed(KeeperException.create(KeeperException.Code.NONODE, getInstancePath()));
break;
default:
LOG.info("Ignore ZK event for instance node: {}", event);
}
}
}), instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
}
/**
* Returns true if ZK events needs to be processed.
*/
private boolean shouldProcessZKEvent() {
State state = state();
return (state == State.NEW || state == State.STARTING || state == State.RUNNING);
}
/**
* Returns the path prefix for creating sequential message node for the remote service.
*/
private String getMessagePrefix() {
return getZKPath("messages/msg");
}
/**
* Returns the zookeeper node path for the ephemeral instance node for this runId.
*/
protected final String getInstancePath() {
return String.format("/instances/%s", getRunId().getId());
}
private String getZKPath(String path) {
return String.format("/%s/%s", getRunId().getId(), path);
}
private final class InstanceNodeDataCallback implements FutureCallback<NodeData> {
@Override
public void onSuccess(NodeData result) {
if (shouldProcessZKEvent()) {
instanceNodeUpdated(result);
}
}
@Override
public void onFailure(Throwable t) {
LOG.error("Failed in fetching instance node data.", t);
if (shouldProcessZKEvent()) {
instanceNodeFailed(t);
}
}
}
}