Merge pull request #115 from caskdata/fix/copy-zk
TEPHRA-171 Added ZkClientService in Tephra which does not extend from…
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
index c8c047e..b23b6d6 100644
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
+++ b/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
@@ -17,6 +17,8 @@
package co.cask.tephra.runtime;
import co.cask.tephra.TxConstants;
+import co.cask.tephra.zookeeper.TephraZKClientService;
+import com.google.common.collect.ArrayListMultimap;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
@@ -52,14 +54,12 @@
zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM);
}
+ int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT);
+ ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null,
+ ArrayListMultimap.<String, byte[]>create());
return ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(zkStr)
- .setSessionTimeout(conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT,
- TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT))
- .build(),
- RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+ ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
)
)
);
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
new file mode 100644
index 0000000..21b2b75
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2016 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.tephra.zookeeper;
+
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * A straightforward implementation of {@link ACLData}.
+ */
+final class BasicACLData implements ACLData {
+
+ private final List<ACL> acl;
+ private final Stat stat;
+
+ BasicACLData(List<ACL> acl, Stat stat) {
+ this.acl = acl;
+ this.stat = stat;
+ }
+
+ @Override
+ public List<ACL> getACL() {
+ return acl;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+}
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
new file mode 100644
index 0000000..226d2c4
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2016 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.tephra.zookeeper;
+
+import com.google.common.base.Objects;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * Implementation of the {@link NodeChildren}.
+ */
+final class BasicNodeChildren implements NodeChildren {
+
+ private final Stat stat;
+ private final List<String> children;
+
+ BasicNodeChildren(List<String> children, Stat stat) {
+ this.stat = stat;
+ this.children = children;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+
+ @Override
+ public List<String> getChildren() {
+ return children;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof NodeChildren)) {
+ return false;
+ }
+
+ NodeChildren that = (NodeChildren) o;
+ return stat.equals(that.getStat()) && children.equals(that.getChildren());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(children, stat);
+ }
+}
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
new file mode 100644
index 0000000..65ebf6d
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright © 2016 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.tephra.zookeeper;
+
+import com.google.common.base.Objects;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.Arrays;
+
+/**
+ * A straightforward implementation for {@link NodeData}.
+ */
+final class BasicNodeData implements NodeData {
+
+ private final byte[] data;
+ private final Stat stat;
+
+ BasicNodeData(byte[] data, Stat stat) {
+ this.data = data;
+ this.stat = stat;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof NodeData)) {
+ return false;
+ }
+
+ BasicNodeData that = (BasicNodeData) o;
+
+ return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(data, stat);
+ }
+}
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
new file mode 100644
index 0000000..c6d9a98
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
@@ -0,0 +1,625 @@
+/*
+ * Copyright © 2016 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.tephra.zookeeper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
+/**
+ * The implementation of {@link ZKClientService}.
+ */
+public class TephraZKClientService extends AbstractService implements ZKClientService, Watcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class);
+
+ private final String zkStr;
+ private final int sessionTimeout;
+ private final List<Watcher> connectionWatchers;
+ private final Multimap<String, byte[]> authInfos;
+ private final AtomicReference<ZooKeeper> zooKeeper;
+ private final Runnable stopTask;
+ private ExecutorService eventExecutor;
+
+ /**
+ * Create a new instance.
+ * @param zkStr zookeper connection string
+ * @param sessionTimeout timeout in milliseconds
+ * @param connectionWatcher watcher to set
+ * @param authInfos authorization bytes
+ */
+ public TephraZKClientService(String zkStr, int sessionTimeout,
+ Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
+ this.zkStr = zkStr;
+ this.sessionTimeout = sessionTimeout;
+ this.connectionWatchers = new CopyOnWriteArrayList<>();
+ this.authInfos = copyAuthInfo(authInfos);
+ addConnectionWatcher(connectionWatcher);
+
+ this.zooKeeper = new AtomicReference<>();
+ this.stopTask = createStopTask();
+ }
+
+ @Override
+ public Long getSessionId() {
+ ZooKeeper zk = zooKeeper.get();
+ return zk == null ? null : zk.getSessionId();
+ }
+
+ @Override
+ public String getConnectString() {
+ return zkStr;
+ }
+
+ @Override
+ public Cancellable addConnectionWatcher(final Watcher watcher) {
+ if (watcher == null) {
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ // No-op
+ }
+ };
+ }
+
+ // Invocation of connection watchers are already done inside the event thread,
+ // hence no need to wrap the watcher again.
+ connectionWatchers.add(watcher);
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ connectionWatchers.remove(watcher);
+ }
+ };
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+ return create(path, data, createMode, true);
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent) {
+ return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, Iterable<ACL> acl) {
+ return create(path, data, createMode, true, acl);
+ }
+
+ @Override
+ public OperationFuture<Stat> exists(String path) {
+ return exists(path, null);
+ }
+
+ @Override
+ public OperationFuture<NodeChildren> getChildren(String path) {
+ return getChildren(path, null);
+ }
+
+ @Override
+ public OperationFuture<NodeData> getData(String path) {
+ return getData(path, null);
+ }
+
+ @Override
+ public OperationFuture<Stat> setData(String path, byte[] data) {
+ return setData(path, data, -1);
+ }
+
+ @Override
+ public OperationFuture<String> delete(String path) {
+ return delete(path, -1);
+ }
+
+ @Override
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
+ return setACL(path, acl, -1);
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+ boolean createParent, Iterable<ACL> acl) {
+ return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
+ }
+
+ private OperationFuture<String> doCreate(final String path,
+ @Nullable final byte[] data,
+ final CreateMode createMode,
+ final boolean createParent,
+ final List<ACL> acl,
+ final boolean ignoreNodeExists) {
+ final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
+ if (!createParent) {
+ return createFuture;
+ }
+
+ // If create parent is request, return a different future
+ final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
+ // Watch for changes in the original future
+ Futures.addCallback(createFuture, new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String path) {
+ // Propagate if creation was successful
+ result.set(path);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // See if the failure can be handled
+ if (updateFailureResult(t, result, path, ignoreNodeExists)) {
+ return;
+ }
+ // Create the parent node
+ String parentPath = getParent(path);
+ if (parentPath.isEmpty()) {
+ result.setException(t);
+ return;
+ }
+ // Watch for parent creation complete. Parent is created with the unsafe ACL.
+ Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
+ true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String parentPath) {
+ // Create the requested path again
+ Futures.addCallback(
+ doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String pathResult) {
+ result.set(pathResult);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // handle the failure
+ updateFailureResult(t, result, path, ignoreNodeExists);
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ result.setException(t);
+ }
+ });
+ }
+
+ /**
+ * Updates the result future based on the given {@link Throwable}.
+ * @param t Cause of the failure
+ * @param result Future to be updated
+ * @param path Request path for the operation
+ * @return {@code true} if it is a failure, {@code false} otherwise.
+ */
+ private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
+ String path, boolean ignoreNodeExists) {
+ // Propagate if there is error
+ if (!(t instanceof KeeperException)) {
+ result.setException(t);
+ return true;
+ }
+ KeeperException.Code code = ((KeeperException) t).code();
+ // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
+ if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
+ // The requested path could be used because it only applies to non-sequential node
+ result.set(path);
+ return false;
+ }
+ if (code != KeeperException.Code.NONODE) {
+ result.setException(t);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Gets the parent of the given path.
+ * @param path Path for computing its parent
+ * @return Parent of the given path, or empty string if the given path is the root path already.
+ */
+ private String getParent(String path) {
+ String parentPath = path.substring(0, path.lastIndexOf('/'));
+ return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
+ }
+ });
+
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> exists(String path, Watcher watcher) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
+ SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<NodeData> getData(String path, Watcher watcher) {
+ SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
+
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
+ getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<String> delete(String deletePath, int version) {
+ SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
+ getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<ACLData> getACL(String path) {
+ SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
+ return result;
+ }
+
+ @Override
+ public Supplier<ZooKeeper> getZooKeeperSupplier() {
+ return new Supplier<ZooKeeper>() {
+ @Override
+ public ZooKeeper get() {
+ return getZooKeeper();
+ }
+ };
+ }
+
+ @Override
+ protected void doStart() {
+ // A single thread executor for all events
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ Threads.createDaemonThreadFactory("zk-client-EventThread"));
+ // Just discard the execution if the executor is closed
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ eventExecutor = executor;
+
+ try {
+ zooKeeper.set(createZooKeeper());
+ } catch (IOException e) {
+ notifyFailed(e);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ // Submit a task to the executor to make sure all pending events in the executor are fired before
+ // transiting this Service into STOPPED state
+ eventExecutor.submit(stopTask);
+ eventExecutor.shutdown();
+ }
+
+ /**
+ * @return Current {@link ZooKeeper} client.
+ */
+ private ZooKeeper getZooKeeper() {
+ ZooKeeper zk = zooKeeper.get();
+ Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
+ return zk;
+ }
+
+ /**
+ * Wraps the given watcher to be called from the event executor.
+ * @param watcher Watcher to be wrapped
+ * @return The wrapped Watcher
+ */
+ private Watcher wrapWatcher(final Watcher watcher) {
+ if (watcher == null) {
+ return null;
+ }
+ return new Watcher() {
+ @Override
+ public void process(final WatchedEvent event) {
+ if (eventExecutor.isShutdown()) {
+ LOG.debug("Already shutdown. Discarding event: {}", event);
+ return;
+ }
+ eventExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ watcher.process(event);
+ } catch (Throwable t) {
+ LOG.error("Watcher throws exception.", t);
+ }
+ }
+ });
+ }
+ };
+ }
+
+ /**
+ * Creates a deep copy of the given authInfos multimap.
+ */
+ private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
+ Multimap<String, byte[]> result = ArrayListMultimap.create();
+
+ for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
+ byte[] info = entry.getValue();
+ result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
+ }
+
+ return result;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ State state = state();
+ if (state == State.TERMINATED || state == State.FAILED) {
+ return;
+ }
+
+ try {
+ if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
+ LOG.debug("Connected to ZooKeeper: {}", zkStr);
+ notifyStarted();
+ return;
+ }
+ if (event.getState() == Event.KeeperState.Expired) {
+ LOG.info("ZooKeeper session expired: {}", zkStr);
+
+ // When connection expired, simply reconnect again
+ if (state != State.RUNNING) {
+ return;
+ }
+ eventExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ // Only reconnect if the current state is running
+ if (state() != State.RUNNING) {
+ return;
+ }
+ try {
+ LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
+ closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
+ } catch (IOException e) {
+ notifyFailed(e);
+ }
+ }
+ });
+ }
+ } finally {
+ if (event.getType() == Event.EventType.None) {
+ for (Watcher connectionWatcher : connectionWatchers) {
+ connectionWatcher.process(event);
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a new ZooKeeper connection.
+ */
+ private ZooKeeper createZooKeeper() throws IOException {
+ ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
+ for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
+ zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
+ }
+ return zk;
+ }
+
+ /**
+ * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
+ * it will get logged.
+ */
+ private void closeZooKeeper(@Nullable ZooKeeper zk) {
+ try {
+ if (zk != null) {
+ zk.close();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when closing ZooKeeper", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Creates a {@link Runnable} task that will get executed in the event executor for transiting this
+ * Service into STOPPED state.
+ */
+ private Runnable createStopTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Close the ZK connection in this task will make sure if there is ZK connection created
+ // after doStop() was called but before this task has been executed is also closed.
+ // It is possible to happen when the following sequence happens:
+ //
+ // 1. session expired, hence the expired event is triggered
+ // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client
+ // 3. Service.stop() gets called, Service.state() changed to STOPPING
+ // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one
+ closeZooKeeper(zooKeeper.getAndSet(null));
+ notifyStopped();
+ } catch (Exception e) {
+ notifyFailed(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Collection of generic callbacks that simply reflect results into OperationFuture.
+ */
+ private static final class Callbacks {
+ static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, String name) {
+ SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set((name == null || name.isEmpty()) ? path : name);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(stat);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ /**
+ * A stat callback that treats NONODE as success.
+ */
+ static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
+ result.set(stat);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+ SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicNodeChildren(children, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicNodeData(data, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx) {
+ SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(result.getRequestPath());
+ return;
+ }
+ // Otherwise, it is an error
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
+ SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicACLData(acl, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+ }
+}