blob: 617b4d5c4e6e20981b6241737f9ed5bfa9c20b90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso.client;
import com.google.common.base.Charsets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.zk.ZKUtils;
import org.apache.statemachine.StateMachine;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Describes the abstract methods to communicate to the TSO server
*/
public class TSOClient implements TSOProtocol, NodeCacheListener {
private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);
// Basic configuration constants & defaults TODO: Move DEFAULT_ZK_CLUSTER to a conf class???
public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";
private static final long DEFAULT_EPOCH = -1L;
private volatile long epoch = DEFAULT_EPOCH;
// Attributes
private CuratorFramework zkClient;
private NodeCache currentTSOZNode;
private ChannelFactory factory;
private ClientBootstrap bootstrap;
private Channel currentChannel;
private final ScheduledExecutorService fsmExecutor;
StateMachine.Fsm fsm;
private final int requestTimeoutInMs;
private final int requestMaxRetries;
private final int tsoReconnectionDelayInSecs;
private InetSocketAddress tsoAddr;
private String zkCurrentTsoPath;
private boolean lowLatency;
// ----------------------------------------------------------------------------------------------------------------
// Construction
// ----------------------------------------------------------------------------------------------------------------
public static TSOClient newInstance(OmidClientConfiguration tsoClientConf) throws IOException {
return new TSOClient(tsoClientConf);
}
// Avoid instantiation
private TSOClient(OmidClientConfiguration omidConf) throws IOException {
// Start client with Nb of active threads = 3 as maximum.
int tsoExecutorThreads = omidConf.getExecutorThreads();
factory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
// Create the bootstrap
bootstrap = new ClientBootstrap(factory);
requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
requestMaxRetries = omidConf.getRequestMaxRetries();
tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
LOG.info("Connecting to TSO...");
HostAndPort hp;
switch (omidConf.getConnectionType()) {
case HA:
zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
omidConf.getZkNamespace(),
omidConf.getZkConnectionTimeoutInSecs());
zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
// TSO info includes the new TSO host:port address and epoch
String[] currentTSOAndEpochArray = tsoInfo.split("#");
hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
setTSOAddress(hp.getHostText(), hp.getPort());
epoch = Long.parseLong(currentTSOAndEpochArray[1]);
LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
break;
case DIRECT:
default:
hp = HostAndPort.fromString(omidConf.getConnectionString());
setTSOAddress(hp.getHostText(), hp.getPort());
LOG.info("\t* TSO host:port {} will be connected directly", hp);
break;
}
fsmExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
fsm = new StateMachine.FsmImpl(fsmExecutor);
fsm.setInitState(new DisconnectedState(fsm));
ChannelPipeline pipeline = bootstrap.getPipeline();
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
pipeline.addLast("protobufencoder", new ProtobufEncoder());
pipeline.addLast("handler", new Handler(fsm));
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 100);
lowLatency = false;
}
// ----------------------------------------------------------------------------------------------------------------
// TSOProtocol interface
// ----------------------------------------------------------------------------------------------------------------
/**
* @see TSOProtocol#getNewStartTimestamp()
*/
@Override
public TSOFuture<Long> getNewStartTimestamp() {
TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
builder.setTimestampRequest(tsreqBuilder.build());
RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
fsm.sendEvent(request);
return new ForwardingTSOFuture<>(request);
}
/**
* @see TSOProtocol#commit(long, Set)
*/
@Override
public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
commitbuilder.setStartTimestamp(transactionId);
for (CellId cell : cells) {
commitbuilder.addCellId(cell.getCellId());
}
builder.setCommitRequest(commitbuilder.build());
RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
fsm.sendEvent(request);
return new ForwardingTSOFuture<>(request);
}
/**
* @see TSOProtocol#close()
*/
@Override
public TSOFuture<Void> close() {
final CloseEvent closeEvent = new CloseEvent();
fsm.sendEvent(closeEvent);
closeEvent.addListener(new Runnable() {
@Override
public void run() {
try {
closeEvent.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
fsmExecutor.shutdown();
if (currentTSOZNode != null) {
try {
currentTSOZNode.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (zkClient != null) {
zkClient.close();
}
}
}
}, fsmExecutor);
return new ForwardingTSOFuture<>(closeEvent);
}
// ----------------------------------------------------------------------------------------------------------------
// High availability related interface
// ----------------------------------------------------------------------------------------------------------------
/**
* @see TSOProtocol#getEpoch()
*/
@Override
public long getEpoch() {
return epoch;
}
// ----------------------------------------------------------------------------------------------------------------
// NodeCacheListener interface
// ----------------------------------------------------------------------------------------------------------------
@Override
public void nodeChanged() throws Exception {
String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
// TSO info includes the new TSO host:port address and epoch
String[] currentTSOAndEpochArray = tsoInfo.split("#");
HostAndPort hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
setTSOAddress(hp.getHostText(), hp.getPort());
epoch = Long.parseLong(currentTSOAndEpochArray[1]);
LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
if (currentChannel != null && currentChannel.isConnected()) {
LOG.info("\tClosing channel with previous TSO {}", currentChannel);
currentChannel.close();
}
}
@Override
public boolean isLowLatency() {
return lowLatency;
}
// ****************************************** Finite State Machine ************************************************
// ----------------------------------------------------------------------------------------------------------------
// FSM: Events
// ----------------------------------------------------------------------------------------------------------------
private static class ParamEvent<T> implements StateMachine.Event {
final T param;
ParamEvent(T param) {
this.param = param;
}
T getParam() {
return param;
}
}
private static class ErrorEvent extends ParamEvent<Throwable> {
ErrorEvent(Throwable t) {
super(t);
}
}
private static class ConnectedEvent extends ParamEvent<Channel> {
ConnectedEvent(Channel c) {
super(c);
}
}
private static class UserEvent<T> extends AbstractFuture<T>
implements StateMachine.DeferrableEvent {
void success(T value) {
set(value);
}
@Override
public void error(Throwable t) {
setException(t);
}
}
private static class CloseEvent extends UserEvent<Void> {
}
private static class ChannelClosedEvent extends ParamEvent<Throwable> {
ChannelClosedEvent(Throwable t) {
super(t);
}
}
private static class ReconnectEvent implements StateMachine.Event {
}
private static class HandshakeTimeoutEvent implements StateMachine.Event {
}
private static class TimestampRequestTimeoutEvent implements StateMachine.Event {
}
private static class CommitRequestTimeoutEvent implements StateMachine.Event {
final long startTimestamp;
CommitRequestTimeoutEvent(long startTimestamp) {
this.startTimestamp = startTimestamp;
}
public long getStartTimestamp() {
return startTimestamp;
}
}
private static class RequestEvent extends UserEvent<Long> {
TSOProto.Request req;
int retriesLeft;
RequestEvent(TSOProto.Request req, int retriesLeft) {
this.req = req;
this.retriesLeft = retriesLeft;
}
TSOProto.Request getRequest() {
return req;
}
void setRequest(TSOProto.Request request) {
this.req = request;
}
int getRetriesLeft() {
return retriesLeft;
}
void decrementRetries() {
retriesLeft--;
}
}
private static class ResponseEvent extends ParamEvent<TSOProto.Response> {
ResponseEvent(TSOProto.Response r) {
super(r);
}
}
// ----------------------------------------------------------------------------------------------------------------
// FSM: States
// ----------------------------------------------------------------------------------------------------------------
class BaseState extends StateMachine.State {
BaseState(StateMachine.Fsm fsm) {
super(fsm);
}
public StateMachine.State handleEvent(StateMachine.Event e) {
LOG.error("Unhandled event {} while in state {}", e, this.getClass().getName());
return this;
}
}
class DisconnectedState extends BaseState {
DisconnectedState(StateMachine.Fsm fsm) {
super(fsm);
LOG.debug("NEW STATE: DISCONNECTED");
}
public StateMachine.State handleEvent(RequestEvent e) {
fsm.deferEvent(e);
return tryToConnectToTSOServer();
}
public StateMachine.State handleEvent(CloseEvent e) {
factory.releaseExternalResources();
e.success(null);
return this;
}
private StateMachine.State tryToConnectToTSOServer() {
final InetSocketAddress tsoAddress = getAddress();
LOG.info("Trying to connect to TSO [{}]", tsoAddress);
ChannelFuture channelFuture = bootstrap.connect(tsoAddress);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
LOG.info("Connection to TSO [{}] established. Channel {}",
tsoAddress, channelFuture.getChannel());
} else {
LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
tsoAddress, channelFuture.getChannel());
}
}
});
return new ConnectingState(fsm);
}
}
private class ConnectingState extends BaseState {
ConnectingState(StateMachine.Fsm fsm) {
super(fsm);
LOG.debug("NEW STATE: CONNECTING");
}
public StateMachine.State handleEvent(UserEvent e) {
fsm.deferEvent(e);
return this;
}
public StateMachine.State handleEvent(ConnectedEvent e) {
return new HandshakingState(fsm, e.getParam());
}
public StateMachine.State handleEvent(ChannelClosedEvent e) {
return new ConnectionFailedState(fsm, e.getParam());
}
public StateMachine.State handleEvent(ErrorEvent e) {
return new ConnectionFailedState(fsm, e.getParam());
}
}
private static class RequestAndTimeout {
final RequestEvent event;
final Timeout timeout;
RequestAndTimeout(RequestEvent event, Timeout timeout) {
this.event = event;
this.timeout = timeout;
}
RequestEvent getRequest() {
return event;
}
Timeout getTimeout() {
return timeout;
}
public String toString() {
String info = "Request type ";
if (event.getRequest().hasTimestampRequest()) {
info += "[Timestamp]";
} else if (event.getRequest().hasCommitRequest()) {
info += "[Commit] Start TS ->" + event.getRequest().getCommitRequest().getStartTimestamp();
} else {
info += "NONE";
}
return info;
}
}
private class HandshakingState extends BaseState {
final Channel channel;
final HashedWheelTimer timeoutExecutor = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
final Timeout timeout;
HandshakingState(StateMachine.Fsm fsm, Channel channel) {
super(fsm);
LOG.debug("NEW STATE: HANDSHAKING");
this.channel = channel;
TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
// Add the required handshake capabilities when necessary
handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
timeout = newTimeout();
}
private Timeout newTimeout() {
if (requestTimeoutInMs > 0) {
return timeoutExecutor.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
fsm.sendEvent(new HandshakeTimeoutEvent());
}
}, 30, TimeUnit.SECONDS);
} else {
return null;
}
}
public StateMachine.State handleEvent(UserEvent e) {
fsm.deferEvent(e);
return this;
}
public StateMachine.State handleEvent(ResponseEvent e) {
lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
if (timeout != null) {
timeout.cancel();
}
return new ConnectedState(fsm, channel, timeoutExecutor);
} else {
cleanupState();
LOG.error("Client incompatible with server");
return new HandshakeFailedState(fsm, new HandshakeFailedException());
}
}
public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
cleanupState();
return new ClosingState(fsm);
}
public StateMachine.State handleEvent(ErrorEvent e) {
cleanupState();
Throwable exception = e.getParam();
LOG.error("Error during handshake", exception);
return new HandshakeFailedState(fsm, exception);
}
private void cleanupState() {
timeoutExecutor.stop();
channel.close();
if (timeout != null) {
timeout.cancel();
}
}
}
class ConnectionFailedState extends BaseState {
final HashedWheelTimer reconnectionTimeoutExecutor = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());
Throwable exception;
ConnectionFailedState(final StateMachine.Fsm fsm, final Throwable exception) {
super(fsm);
LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
this.exception = exception;
reconnectionTimeoutExecutor.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
fsm.sendEvent(new ReconnectEvent());
}
}, tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
}
public StateMachine.State handleEvent(UserEvent e) {
e.error(exception);
return this;
}
public StateMachine.State handleEvent(ErrorEvent e) {
return this;
}
public StateMachine.State handleEvent(ChannelClosedEvent e) {
return new DisconnectedState(fsm);
}
public StateMachine.State handleEvent(ReconnectEvent e) {
return new DisconnectedState(fsm);
}
}
private class HandshakeFailedState extends ConnectionFailedState {
HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
super(fsm, exception);
LOG.debug("STATE: HANDSHAKING FAILED");
}
}
class ConnectedState extends BaseState {
final Queue<RequestAndTimeout> timestampRequests;
final Map<Long, RequestAndTimeout> commitRequests;
final Channel channel;
final HashedWheelTimer timeoutExecutor;
ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
super(fsm);
LOG.debug("NEW STATE: CONNECTED");
this.channel = channel;
this.timeoutExecutor = timeoutExecutor;
timestampRequests = new ArrayDeque<>();
commitRequests = new HashMap<>();
}
private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
if (requestTimeoutInMs > 0) {
return timeoutExecutor.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
fsm.sendEvent(timeoutEvent);
}
}, requestTimeoutInMs, TimeUnit.MILLISECONDS);
} else {
return null;
}
}
private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
TSOProto.Request req = request.getRequest();
if (req.hasTimestampRequest()) {
timestampRequests.add(new RequestAndTimeout(request, newTimeout(new TimestampRequestTimeoutEvent())));
} else if (req.hasCommitRequest()) {
TSOProto.CommitRequest commitReq = req.getCommitRequest();
commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
} else {
request.error(new IllegalArgumentException("Unknown request type"));
return;
}
ChannelFuture f = channel.write(req);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
fsm.sendEvent(new ErrorEvent(future.getCause()));
}
}
});
}
private void handleResponse(ResponseEvent response) {
TSOProto.Response resp = response.getParam();
if (resp.hasTimestampResponse()) {
if (timestampRequests.size() == 0) {
LOG.debug("Received timestamp response when no requests outstanding");
return;
}
RequestAndTimeout e = timestampRequests.remove();
e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
if (e.getTimeout() != null) {
e.getTimeout().cancel();
}
} else if (resp.hasCommitResponse()) {
long startTimestamp = resp.getCommitResponse().getStartTimestamp();
RequestAndTimeout e = commitRequests.remove(startTimestamp);
if (e == null) {
LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", startTimestamp);
return;
}
if (e.getTimeout() != null) {
e.getTimeout().cancel();
}
if (resp.getCommitResponse().getAborted()) {
e.getRequest().error(new AbortException());
} else {
e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
}
}
}
public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
if (!timestampRequests.isEmpty()) {
RequestAndTimeout r = timestampRequests.remove();
if (r.getTimeout() != null) {
r.getTimeout().cancel();
}
queueRetryOrError(fsm, r.getRequest());
}
return this;
}
public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
long startTimestamp = e.getStartTimestamp();
if (commitRequests.containsKey(startTimestamp)) {
RequestAndTimeout r = commitRequests.remove(startTimestamp);
if (r.getTimeout() != null) {
r.getTimeout().cancel();
}
queueRetryOrError(fsm, r.getRequest());
}
return this;
}
public StateMachine.State handleEvent(CloseEvent e) {
LOG.debug("CONNECTED STATE: CloseEvent");
timeoutExecutor.stop();
closeChannelAndErrorRequests();
fsm.deferEvent(e);
return new ClosingState(fsm);
}
public StateMachine.State handleEvent(RequestEvent e) {
sendRequest(fsm, e);
return this;
}
public StateMachine.State handleEvent(ResponseEvent e) {
handleResponse(e);
return this;
}
public StateMachine.State handleEvent(ErrorEvent e) {
LOG.debug("CONNECTED STATE: ErrorEvent");
timeoutExecutor.stop();
handleError(fsm);
return new ClosingState(fsm);
}
private void handleError(StateMachine.Fsm fsm) {
LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
while (timestampRequests.size() > 0) {
RequestAndTimeout r = timestampRequests.remove();
if (r.getTimeout() != null) {
r.getTimeout().cancel();
}
queueRetryOrError(fsm, r.getRequest());
}
Iterator<Map.Entry<Long, RequestAndTimeout>> iter = commitRequests.entrySet().iterator();
while (iter.hasNext()) {
RequestAndTimeout r = iter.next().getValue();
if (r.getTimeout() != null) {
r.getTimeout().cancel();
}
queueRetryOrError(fsm, r.getRequest());
iter.remove();
}
channel.close();
}
private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
if (e.getRetriesLeft() > 0) {
e.decrementRetries();
if (e.getRequest().hasCommitRequest()) {
TSOProto.CommitRequest commitRequest = e.getRequest().getCommitRequest();
if (!commitRequest.getIsRetry()) { // Create a new retry for the commit request
TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
commitBuilder.mergeFrom(commitRequest);
commitBuilder.setIsRetry(true);
builder.setCommitRequest(commitBuilder.build());
e.setRequest(builder.build());
}
}
fsm.sendEvent(e);
} else {
e.error(
new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
}
}
private void closeChannelAndErrorRequests() {
channel.close();
for (RequestAndTimeout r : timestampRequests) {
if (r.getTimeout() != null) {
r.getTimeout().cancel();
}
r.getRequest().error(new ClosingException());
}
for (RequestAndTimeout r : commitRequests.values()) {
if (r.getTimeout() != null) {
r.getTimeout().cancel();
}
r.getRequest().error(new ClosingException());
}
}
}
private class ClosingState extends BaseState {
ClosingState(StateMachine.Fsm fsm) {
super(fsm);
LOG.debug("NEW STATE: CLOSING");
}
public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
// Ignored. They will be retried or errored
return this;
}
public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
// Ignored. They will be retried or errored
return this;
}
public StateMachine.State handleEvent(ErrorEvent e) {
// Ignored. They will be retried or errored
return this;
}
public StateMachine.State handleEvent(ResponseEvent e) {
// Ignored. They will be retried or errored
return this;
}
public StateMachine.State handleEvent(UserEvent e) {
fsm.deferEvent(e);
return this;
}
public StateMachine.State handleEvent(ChannelClosedEvent e) {
return new DisconnectedState(fsm);
}
public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
return this;
}
}
// ----------------------------------------------------------------------------------------------------------------
// Helper classes & methods
// ----------------------------------------------------------------------------------------------------------------
private class Handler extends SimpleChannelHandler {
private StateMachine.Fsm fsm;
Handler(StateMachine.Fsm fsm) {
this.fsm = fsm;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
currentChannel = e.getChannel();
LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
fsm.sendEvent(new ConnectedEvent(e.getChannel()));
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
fsm.sendEvent(new ErrorEvent(new ConnectionException()));
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
if (e.getMessage() instanceof TSOProto.Response) {
fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
} else {
LOG.warn("Received unknown message", e.getMessage());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
fsm.sendEvent(new ErrorEvent(e.getCause()));
}
}
private synchronized void setTSOAddress(String host, int port) {
tsoAddr = new InetSocketAddress(host, port);
}
private synchronized InetSocketAddress getAddress() {
return tsoAddr;
}
private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
try {
currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
currentTSOZNode.getListenable().addListener(this);
currentTSOZNode.start(true);
} catch (Exception e) {
throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
}
}
private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
ChildData currentTSOData = currentTSOZNode.getCurrentData();
if (currentTSOData == null) {
throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
}
byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
if (currentTSOAndEpochAsBytes == null) {
throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
}
return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
}
}