| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.spi.discovery.zk.internal; |
| |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.zookeeper.AsyncCallback; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.Op; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_ZOOKEEPER_DISCOVERY_MAX_RETRY_COUNT; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT; |
| import static org.apache.zookeeper.client.ZKClientConfig.SECURE_CLIENT; |
| |
| /** |
| * Zookeeper Client. |
| */ |
| public class ZookeeperClient implements Watcher { |
| /** */ |
| private static final int DFLT_RETRY_TIMEOUT = 2000; |
| |
| /** */ |
| private static final int DFLT_MAX_RETRY_COUNT = 10; |
| |
| /** */ |
| private final AtomicInteger retryCount = new AtomicInteger(); |
| |
| /** */ |
| private static final int MAX_REQ_SIZE = 1048528; |
| |
| /** */ |
| private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; |
| |
| /** */ |
| private static final byte[] EMPTY_BYTES = {}; |
| |
| /** */ |
| private final ZooKeeper zk; |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** */ |
| private ConnectionState state = ConnectionState.Disconnected; |
| |
| /** */ |
| private long connLossTimeout; |
| |
| /** */ |
| private volatile long connStartTime; |
| |
| /** */ |
| private final Object stateMux = new Object(); |
| |
| /** */ |
| private final IgniteRunnable connLostC; |
| |
| /** */ |
| private final Timer connTimer; |
| |
| /** */ |
| private final ArrayDeque<ZkAsyncOperation> retryQ = new ArrayDeque<>(); |
| |
| /** */ |
| private volatile boolean closing; |
| |
| /** |
| * @param log Logger. |
| * @param connectString ZK connection string. |
| * @param sesTimeout ZK session timeout. |
| * @param connLostC Lost connection callback. |
| * @throws Exception If failed. |
| */ |
| ZookeeperClient(IgniteLogger log, String connectString, int sesTimeout, IgniteRunnable connLostC) throws Exception { |
| this(null, log, connectString, sesTimeout, connLostC); |
| } |
| |
| /** |
| * @param igniteInstanceName Ignite instance name. |
| * @param log Logger. |
| * @param connectString ZK connection string. |
| * @param sesTimeout ZK session timeout. |
| * @param connLostC Lost connection callback. |
| * @throws Exception If failed. |
| */ |
| ZookeeperClient(String igniteInstanceName, |
| IgniteLogger log, |
| String connectString, |
| int sesTimeout, |
| IgniteRunnable connLostC) |
| throws Exception |
| { |
| this.log = log.getLogger(getClass()); |
| this.connLostC = connLostC; |
| |
| connLossTimeout = sesTimeout; |
| |
| long connStartTime = this.connStartTime = System.currentTimeMillis(); |
| |
| connTimer = new Timer("zk-client-timer-" + igniteInstanceName); |
| |
| String threadName = Thread.currentThread().getName(); |
| |
| // ZK generates internal threads' names using current thread name. |
| Thread.currentThread().setName("zk-" + igniteInstanceName); |
| |
| try { |
| zk = new ZooKeeper(connectString, sesTimeout, this); |
| } |
| finally { |
| Thread.currentThread().setName(threadName); |
| } |
| |
| synchronized (stateMux) { |
| if (connStartTime == this.connStartTime && state == ConnectionState.Disconnected) |
| scheduleConnectionCheck(); |
| } |
| } |
| |
| /** |
| * @return Zookeeper client. |
| */ |
| ZooKeeper zk() { |
| return zk; |
| } |
| |
| /** |
| * @return {@code True} if connected to ZooKeeper. |
| */ |
| boolean connected() { |
| synchronized (stateMux) { |
| return state == ConnectionState.Connected; |
| } |
| } |
| |
| /** */ |
| String state() { |
| synchronized (stateMux) { |
| return state.toString(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void process(WatchedEvent evt) { |
| if (closing) |
| return; |
| |
| if (evt.getType() == Event.EventType.None) { |
| ConnectionState newState; |
| |
| synchronized (stateMux) { |
| if (state == ConnectionState.Lost) { |
| U.warn(log, "Received event after connection was lost [evtState=" + evt.getState() + "]"); |
| |
| return; |
| } |
| |
| if (!zk.getState().isAlive()) |
| return; |
| |
| Event.KeeperState zkState = evt.getState(); |
| |
| switch (zkState) { |
| case SaslAuthenticated: |
| return; // No-op. |
| |
| case AuthFailed: |
| newState = state; |
| |
| break; |
| |
| case Disconnected: |
| newState = ConnectionState.Disconnected; |
| |
| break; |
| |
| case SyncConnected: |
| newState = ConnectionState.Connected; |
| |
| break; |
| |
| case Expired: |
| U.warn(log, "Session expired, changing state to Lost"); |
| |
| newState = ConnectionState.Lost; |
| |
| break; |
| |
| default: |
| U.error(log, "Unexpected state for ZooKeeper client, close connection: " + zkState); |
| |
| newState = ConnectionState.Lost; |
| } |
| |
| if (newState != state) { |
| if (log.isInfoEnabled()) |
| log.info("ZooKeeper client state changed [prevState=" + state + ", newState=" + newState + ']'); |
| |
| state = newState; |
| |
| if (newState == ConnectionState.Disconnected) { |
| connStartTime = System.currentTimeMillis(); |
| |
| scheduleConnectionCheck(); |
| } |
| else if (newState == ConnectionState.Connected) { |
| retryCount.set(0); |
| |
| stateMux.notifyAll(); |
| } |
| else |
| assert state == ConnectionState.Lost : state; |
| } |
| else |
| return; |
| } |
| |
| if (newState == ConnectionState.Lost) { |
| closeClient(); |
| |
| notifyConnectionLost(); |
| } |
| else if (newState == ConnectionState.Connected) { |
| for (ZkAsyncOperation op : retryQ) |
| op.execute(); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void notifyConnectionLost() { |
| if (!closing && state == ConnectionState.Lost && connLostC != null) |
| connLostC.run(); |
| |
| connTimer.cancel(); |
| } |
| |
| /** |
| * @param path Path. |
| * @return {@code True} if node exists. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| boolean exists(String path) throws ZookeeperClientFailedException, InterruptedException { |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| return zk.exists(path, false) != null; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| |
| /** |
| * @param paths Paths to create. |
| * @param createMode Create mode. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| void createAll(List<String> paths, CreateMode createMode) |
| throws ZookeeperClientFailedException, InterruptedException { |
| if (paths.isEmpty()) |
| return; |
| |
| List<List<Op>> batches = new LinkedList<>(); |
| |
| int batchSize = 0; |
| |
| List<Op> batch = new LinkedList<>(); |
| |
| for (String path : paths) { |
| //TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187 |
| int size = requestOverhead(path) + 48 /* overhead */; |
| |
| assert size <= MAX_REQ_SIZE; |
| |
| if (batchSize + size > MAX_REQ_SIZE) { |
| batches.add(batch); |
| |
| batch = new LinkedList<>(); |
| |
| batchSize = 0; |
| } |
| |
| batch.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode)); |
| |
| batchSize += size; |
| } |
| |
| batches.add(batch); |
| |
| for (List<Op> ops : batches) { |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| zk.multi(ops); |
| |
| break; |
| } |
| catch (KeeperException.NodeExistsException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to create nodes using bulk operation: " + e); |
| |
| for (Op op : ops) |
| createIfNeeded(op.getPath(), null, createMode); |
| |
| break; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @param data Data. |
| * @param overhead Extra overhead. |
| * @return {@code True} If data size exceeds max request size and should be splitted into multiple parts. |
| */ |
| boolean needSplitNodeData(String path, byte[] data, int overhead) { |
| return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE; |
| } |
| |
| /** |
| * @param path Path. |
| * @param data Data. |
| * @param overhead Extra overhead. |
| * @return Splitted data. |
| */ |
| List<byte[]> splitNodeData(String path, byte[] data, int overhead) { |
| int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead; |
| |
| int partCnt = data.length / partSize; |
| |
| if (data.length % partSize != 0) |
| partCnt++; |
| |
| assert partCnt > 1 : "Do not need split"; |
| |
| List<byte[]> parts = new ArrayList<>(partCnt); |
| |
| int remaining = data.length; |
| |
| for (int i = 0; i < partCnt; i++) { |
| int partSize0 = Math.min(remaining, partSize); |
| |
| byte[] part = new byte[partSize0]; |
| |
| System.arraycopy(data, i * partSize, part, 0, part.length); |
| |
| remaining -= partSize0; |
| |
| parts.add(part); |
| } |
| |
| assert remaining == 0 : remaining; |
| |
| return parts; |
| } |
| |
| /** |
| * TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187 |
| * @param path Request path. |
| * @return Marshalled request overhead. |
| */ |
| private int requestOverhead(String path) { |
| return path.length(); |
| } |
| |
| /** |
| * @param path Path. |
| * @param data Data. |
| * @param createMode Create mode. |
| * @return Created path. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| String createIfNeeded(String path, byte[] data, CreateMode createMode) |
| throws ZookeeperClientFailedException, InterruptedException |
| { |
| assert !createMode.isSequential() : createMode; |
| |
| if (data == null) |
| data = EMPTY_BYTES; |
| |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| return zk.create(path, data, ZK_ACL, createMode); |
| } |
| catch (KeeperException.NodeExistsException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Node already exists: " + path); |
| |
| return path; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @param data Data. |
| * @param createMode Create mode. |
| * @return Created path. |
| * @throws KeeperException In case of zookeeper error. |
| * @throws InterruptedException If interrupted. |
| */ |
| String createIfNeededNoRetry(String path, byte[] data, CreateMode createMode) |
| throws KeeperException, InterruptedException { |
| assert !createMode.isSequential() : createMode; |
| |
| if (data == null) |
| data = EMPTY_BYTES; |
| |
| try { |
| return zk.create(path, data, ZK_ACL, createMode); |
| } |
| catch (KeeperException.NodeExistsException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Node already exists: " + path); |
| |
| return path; |
| } |
| } |
| |
| /** |
| * @param checkPrefix Unique prefix to check in case of retry. |
| * @param parentPath Parent node path. |
| * @param path Node to create. |
| * @param data Node data. |
| * @param createMode Create mode. |
| * @return Create path. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| String createSequential(String checkPrefix, String parentPath, String path, byte[] data, CreateMode createMode) |
| throws ZookeeperClientFailedException, InterruptedException |
| { |
| assert createMode.isSequential() : createMode; |
| |
| if (data == null) |
| data = EMPTY_BYTES; |
| |
| boolean first = true; |
| |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| if (!first) { |
| List<String> children = zk.getChildren(parentPath, false); |
| |
| for (int i = 0; i < children.size(); i++) { |
| String child = children.get(i); |
| |
| if (children.get(i).startsWith(checkPrefix)) { |
| String resPath = parentPath + "/" + child; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Check before retry, node already created: " + resPath); |
| |
| return resPath; |
| } |
| } |
| } |
| |
| return zk.create(path, data, ZK_ACL, createMode); |
| } |
| catch (KeeperException.NodeExistsException e) { |
| assert !createMode.isSequential() : createMode; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Node already exists: " + path); |
| |
| return path; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| |
| first = false; |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @return Children nodes. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| List<String> getChildren(String path) throws ZookeeperClientFailedException, InterruptedException { |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| return zk.getChildren(path, false); |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| |
| /** |
| * Get children paths. |
| * |
| * @param path Path. |
| * @return Children paths. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| List<String> getChildrenPaths(String path) throws ZookeeperClientFailedException, InterruptedException { |
| List<String> children = getChildren(path); |
| |
| ArrayList<String> paths = new ArrayList(children.size()); |
| |
| for (String child : children) |
| paths.add(path + "/" + child); |
| |
| return paths; |
| } |
| |
| /** |
| * @param path Path. |
| * @throws InterruptedException If interrupted. |
| * @throws KeeperException In case of error. |
| * @return {@code True} if given path exists. |
| */ |
| boolean existsNoRetry(String path) throws InterruptedException, KeeperException { |
| return zk.exists(path, false) != null; |
| } |
| |
| /** |
| * @param path Path. |
| * @param ver Expected version. |
| * @throws InterruptedException If interrupted. |
| * @throws KeeperException In case of error. |
| */ |
| void deleteIfExistsNoRetry(String path, int ver) throws InterruptedException, KeeperException { |
| try { |
| zk.delete(path, ver); |
| } |
| catch (KeeperException.NoNodeException e) { |
| // No-op if znode does not exist. |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @param ver Version. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| void deleteIfExists(String path, int ver) |
| throws ZookeeperClientFailedException, InterruptedException |
| { |
| try { |
| delete(path, ver); |
| } |
| catch (KeeperException.NoNodeException e) { |
| // No-op if znode does not exist. |
| } |
| } |
| |
| /** |
| * @param paths Children paths. |
| * @param ver Version. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| void deleteAll(List<String> paths, int ver) |
| throws ZookeeperClientFailedException, InterruptedException { |
| if (paths.isEmpty()) |
| return; |
| |
| List<List<Op>> batches = new LinkedList<>(); |
| |
| int batchSize = 0; |
| |
| List<Op> batch = new LinkedList<>(); |
| |
| for (String path : paths) { |
| //TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187 |
| int size = requestOverhead(path) + 17 /* overhead */; |
| |
| assert size <= MAX_REQ_SIZE; |
| |
| if (batchSize + size > MAX_REQ_SIZE) { |
| batches.add(batch); |
| |
| batch = new LinkedList<>(); |
| |
| batchSize = 0; |
| } |
| |
| batch.add(Op.delete(path, ver)); |
| |
| batchSize += size; |
| } |
| |
| batches.add(batch); |
| |
| for (List<Op> ops : batches) { |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| zk.multi(ops); |
| |
| break; |
| } |
| catch (KeeperException.NoNodeException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to delete nodes using bulk operation: " + e); |
| |
| for (Op op : ops) |
| deleteIfExists(op.getPath(), ver); |
| |
| break; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @param ver Version. |
| * @throws KeeperException.NoNodeException If target node does not exist. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| private void delete(String path, int ver) |
| throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException |
| { |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| zk.delete(path, ver); |
| |
| return; |
| } |
| catch (KeeperException.NoNodeException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @param data Data. |
| * @param ver Version. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| * @throws KeeperException.NoNodeException If node does not exist. |
| * @throws KeeperException.BadVersionException If version does not match. |
| */ |
| void setData(String path, byte[] data, int ver) |
| throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException, |
| KeeperException.BadVersionException |
| { |
| if (data == null) |
| data = EMPTY_BYTES; |
| |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| zk.setData(path, data, ver); |
| |
| return; |
| } |
| catch (KeeperException.BadVersionException | KeeperException.NoNodeException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @param stat Optional {@link Stat} instance to return znode state. |
| * @return Data. |
| * @throws KeeperException.NoNodeException If target node does not exist. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| byte[] getData(String path, @Nullable Stat stat) |
| throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { |
| for (;;) { |
| long connStartTime = this.connStartTime; |
| |
| try { |
| return zk.getData(path, false, stat); |
| } |
| catch (KeeperException.NoNodeException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| onZookeeperError(connStartTime, e); |
| } |
| } |
| } |
| |
| /** |
| * @param path Path. |
| * @return Data. |
| * @throws KeeperException.NoNodeException If target node does not exist. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| byte[] getData(String path) |
| throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException |
| { |
| return getData(path, null); |
| } |
| |
| /** |
| * @param path Path. |
| */ |
| void deleteIfExistsAsync(String path) { |
| new DeleteIfExistsOperation(path).execute(); |
| } |
| |
| /** |
| * @param path Path. |
| * @param watcher Watcher. |
| * @param cb Callback. |
| */ |
| void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback cb) { |
| ExistsOperation op = new ExistsOperation(path, watcher, cb); |
| |
| zk.exists(path, watcher, new StatCallbackWrapper(op), null); |
| } |
| |
| /** |
| * @param path Path. |
| * @param watcher Watcher. |
| * @param cb Callback. |
| */ |
| void getChildrenAsync(String path, Watcher watcher, AsyncCallback.Children2Callback cb) { |
| GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb); |
| |
| zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null); |
| } |
| |
| /** |
| * @param path Path. |
| * @param watcher Watcher. |
| * @param cb Callback. |
| */ |
| void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback cb) { |
| GetDataOperation op = new GetDataOperation(path, watcher, cb); |
| |
| zk.getData(path, watcher, new DataCallbackWrapper(op), null); |
| } |
| |
| /** |
| * @param path Path. |
| * @param data Data. |
| * @param createMode Create mode. |
| * @param cb Callback. |
| */ |
| private void createAsync(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { |
| if (data == null) |
| data = EMPTY_BYTES; |
| |
| CreateOperation op = new CreateOperation(path, data, createMode, cb); |
| |
| zk.create(path, data, ZK_ACL, createMode, new CreateCallbackWrapper(op), null); |
| } |
| |
| /** |
| * |
| */ |
| void onCloseStart() { |
| closing = true; |
| |
| synchronized (stateMux) { |
| stateMux.notifyAll(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public void close() { |
| closeClient(); |
| } |
| |
| /** |
| * @param prevConnStartTime Time when connection was established. |
| * @param e Error. |
| * @throws ZookeeperClientFailedException If connection to zk was lost. |
| * @throws InterruptedException If interrupted. |
| */ |
| private void onZookeeperError(long prevConnStartTime, Exception e) |
| throws ZookeeperClientFailedException, InterruptedException |
| { |
| ZookeeperClientFailedException err = null; |
| |
| synchronized (stateMux) { |
| if (closing) |
| throw new ZookeeperClientFailedException("ZooKeeper client is closed."); |
| |
| U.warn(log, "Failed to execute ZooKeeper operation [err=" + e + ", state=" + state + ']'); |
| |
| if (state == ConnectionState.Lost) { |
| U.error(log, "Operation failed with unexpected error, connection lost: " + e, e); |
| |
| Boolean sslEnabled = Boolean.valueOf(zk().getClientConfig().getProperty(SECURE_CLIENT)); |
| |
| String msg = "Connection lost, check" + (sslEnabled ? " SSL " : ' ') + "connection configuration."; |
| |
| throw new ZookeeperClientFailedException(msg, e); |
| } |
| |
| boolean retry = (e instanceof KeeperException) && needRetry(((KeeperException)e).code().intValue()); |
| |
| if (retry) { |
| long remainingTime; |
| |
| if (state == ConnectionState.Connected && connStartTime == prevConnStartTime) { |
| state = ConnectionState.Disconnected; |
| |
| connStartTime = System.currentTimeMillis(); |
| |
| remainingTime = connLossTimeout; |
| } |
| else { |
| assert connStartTime != 0; |
| |
| assert state == ConnectionState.Disconnected : state; |
| |
| remainingTime = connLossTimeout - (System.currentTimeMillis() - connStartTime); |
| |
| if (remainingTime <= 0) { |
| state = ConnectionState.Lost; |
| |
| U.warn(log, "Failed to establish ZooKeeper connection, close client " + |
| "[timeout=" + connLossTimeout + ']'); |
| |
| err = new ZookeeperClientFailedException(e); |
| } |
| } |
| |
| if (err == null) { |
| long retryTimeout = IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT, |
| DFLT_RETRY_TIMEOUT); |
| |
| U.warn(log, "ZooKeeper operation failed, will retry [err=" + e + |
| ", retryTimeout=" + retryTimeout + |
| ", connLossTimeout=" + connLossTimeout + |
| ", path=" + ((KeeperException)e).getPath() + |
| ", remainingWaitTime=" + remainingTime + ']'); |
| |
| stateMux.wait(retryTimeout); |
| |
| if (closing) |
| throw new ZookeeperClientFailedException("ZooKeeper client is closed."); |
| } |
| } |
| else { |
| U.error(log, "Operation failed with unexpected error, close ZooKeeper client: " + e, e); |
| |
| state = ConnectionState.Lost; |
| |
| err = new ZookeeperClientFailedException(e); |
| } |
| } |
| |
| if (err != null) { |
| closeClient(); |
| |
| notifyConnectionLost(); |
| |
| throw err; |
| } |
| } |
| |
| /** |
| * @param code Zookeeper error code. |
| * @return {@code True} if can retry operation. |
| */ |
| private boolean needRetry(int code) { |
| boolean retryByErrorCode = code == KeeperException.Code.CONNECTIONLOSS.intValue() || |
| code == KeeperException.Code.SESSIONMOVED.intValue() || |
| code == KeeperException.Code.OPERATIONTIMEOUT.intValue(); |
| |
| if (retryByErrorCode) { |
| int maxRetryCount = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_MAX_RETRY_COUNT, |
| DFLT_MAX_RETRY_COUNT); |
| |
| if (maxRetryCount <= 0 || retryCount.incrementAndGet() < maxRetryCount) |
| return true; |
| else |
| return false; |
| } |
| else |
| return false; |
| } |
| |
| /** |
| * |
| */ |
| private void closeClient() { |
| try { |
| zk.close(); |
| } |
| catch (Exception closeErr) { |
| U.warn(log, "Failed to close ZooKeeper client: " + closeErr, closeErr); |
| } |
| |
| connTimer.cancel(); |
| } |
| |
| /** |
| * |
| */ |
| private void scheduleConnectionCheck() { |
| assert state == ConnectionState.Disconnected : state; |
| |
| connTimer.schedule(new ConnectionTimeoutTask(connStartTime), connLossTimeout); |
| } |
| |
| /** |
| * |
| */ |
| interface ZkAsyncOperation { |
| /** |
| * |
| */ |
| void execute(); |
| } |
| |
| /** |
| * |
| */ |
| class GetChildrenOperation implements ZkAsyncOperation { |
| /** */ |
| private final String path; |
| |
| /** */ |
| private final Watcher watcher; |
| |
| /** */ |
| private final AsyncCallback.Children2Callback cb; |
| |
| /** |
| * @param path Path. |
| * @param watcher Watcher. |
| * @param cb Callback. |
| */ |
| GetChildrenOperation(String path, Watcher watcher, AsyncCallback.Children2Callback cb) { |
| this.path = path; |
| this.watcher = watcher; |
| this.cb = cb; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void execute() { |
| getChildrenAsync(path, watcher, cb); |
| } |
| } |
| |
| /** |
| * |
| */ |
| class GetDataOperation implements ZkAsyncOperation { |
| /** */ |
| private final String path; |
| |
| /** */ |
| private final Watcher watcher; |
| |
| /** */ |
| private final AsyncCallback.DataCallback cb; |
| |
| /** |
| * @param path Path. |
| * @param watcher Watcher. |
| * @param cb Callback. |
| */ |
| GetDataOperation(String path, Watcher watcher, AsyncCallback.DataCallback cb) { |
| this.path = path; |
| this.watcher = watcher; |
| this.cb = cb; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void execute() { |
| getDataAsync(path, watcher, cb); |
| } |
| } |
| |
| /** |
| * |
| */ |
| class ExistsOperation implements ZkAsyncOperation { |
| /** */ |
| private final String path; |
| |
| /** */ |
| private final Watcher watcher; |
| |
| /** */ |
| private final AsyncCallback.StatCallback cb; |
| |
| /** |
| * @param path Path. |
| * @param watcher Watcher. |
| * @param cb Callback. |
| */ |
| ExistsOperation(String path, Watcher watcher, AsyncCallback.StatCallback cb) { |
| this.path = path; |
| this.watcher = watcher; |
| this.cb = cb; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void execute() { |
| existsAsync(path, watcher, cb); |
| } |
| } |
| |
| /** |
| * |
| */ |
| class CreateOperation implements ZkAsyncOperation { |
| /** */ |
| private final String path; |
| |
| /** */ |
| private final byte[] data; |
| |
| /** */ |
| private final CreateMode createMode; |
| |
| /** */ |
| private final AsyncCallback.StringCallback cb; |
| |
| /** |
| * @param path path. |
| * @param data Data. |
| * @param createMode Create mode. |
| * @param cb Callback. |
| */ |
| CreateOperation(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { |
| this.path = path; |
| this.data = data; |
| this.createMode = createMode; |
| this.cb = cb; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void execute() { |
| createAsync(path, data, createMode, cb); |
| } |
| } |
| |
| /** |
| * |
| */ |
| class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, ZkAsyncOperation { |
| /** */ |
| private final String path; |
| |
| /** |
| * @param path Path. |
| */ |
| DeleteIfExistsOperation(String path) { |
| this.path = path; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void execute() { |
| zk.delete(path, -1, this, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void processResult(int rc, String path, Object ctx) { |
| if (closing) |
| return; |
| |
| if (rc == KeeperException.Code.NONODE.intValue()) |
| return; |
| |
| if (needRetry(rc)) { |
| U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [" + |
| "path=" + path + ']'); |
| |
| retryQ.add(this); |
| } |
| else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) |
| U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); |
| else |
| assert rc == 0 : KeeperException.Code.get(rc); |
| } |
| } |
| |
| /** |
| * |
| */ |
| class CreateCallbackWrapper implements AsyncCallback.StringCallback { |
| /** */ |
| final CreateOperation op; |
| |
| /** |
| * @param op Operation. |
| */ |
| CreateCallbackWrapper(CreateOperation op) { |
| this.op = op; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void processResult(int rc, String path, Object ctx, String name) { |
| if (closing) |
| return; |
| |
| if (rc == KeeperException.Code.NODEEXISTS.intValue()) |
| return; |
| |
| if (needRetry(rc)) { |
| U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); |
| |
| retryQ.add(op); |
| } |
| else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) |
| U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); |
| else { |
| if (op.cb != null) |
| op.cb.processResult(rc, path, ctx, name); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| class ChildrenCallbackWrapper implements AsyncCallback.Children2Callback { |
| /** */ |
| private final GetChildrenOperation op; |
| |
| /** |
| * @param op Operation. |
| */ |
| private ChildrenCallbackWrapper(GetChildrenOperation op) { |
| this.op = op; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { |
| if (closing) |
| return; |
| |
| if (needRetry(rc)) { |
| U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); |
| |
| retryQ.add(op); |
| } |
| else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) |
| U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); |
| else |
| op.cb.processResult(rc, path, ctx, children, stat); |
| } |
| } |
| |
| /** |
| * |
| */ |
| class DataCallbackWrapper implements AsyncCallback.DataCallback { |
| /** */ |
| private final GetDataOperation op; |
| |
| /** |
| * @param op Operation. |
| */ |
| private DataCallbackWrapper(GetDataOperation op) { |
| this.op = op; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { |
| if (closing) |
| return; |
| |
| if (needRetry(rc)) { |
| U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); |
| |
| retryQ.add(op); |
| } |
| else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) |
| U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); |
| else |
| op.cb.processResult(rc, path, ctx, data, stat); |
| } |
| } |
| |
| /** |
| * |
| */ |
| class StatCallbackWrapper implements AsyncCallback.StatCallback { |
| /** */ |
| private final ExistsOperation op; |
| |
| /** |
| * @param op Operation. |
| */ |
| private StatCallbackWrapper(ExistsOperation op) { |
| this.op = op; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void processResult(int rc, String path, Object ctx, Stat stat) { |
| if (closing) |
| return; |
| |
| if (needRetry(rc)) { |
| U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); |
| |
| retryQ.add(op); |
| } |
| else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) |
| U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); |
| else |
| op.cb.processResult(rc, path, ctx, stat); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class ConnectionTimeoutTask extends TimerTask { |
| /** */ |
| private final long connectStartTime; |
| |
| /** |
| * @param connectStartTime Time was connection started. |
| */ |
| ConnectionTimeoutTask(long connectStartTime) { |
| this.connectStartTime = connectStartTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| boolean connLoss = false; |
| |
| synchronized (stateMux) { |
| if (closing) |
| return; |
| |
| if (state == ConnectionState.Disconnected && |
| ZookeeperClient.this.connStartTime == connectStartTime) { |
| |
| state = ConnectionState.Lost; |
| |
| U.warn(log, "Failed to establish ZooKeeper connection, close client " + |
| "[timeout=" + connLossTimeout + ']'); |
| |
| connLoss = true; |
| } |
| } |
| |
| if (connLoss) { |
| closeClient(); |
| |
| notifyConnectionLost(); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private enum ConnectionState { |
| /** */ |
| Connected, |
| /** */ |
| Disconnected, |
| /** */ |
| Lost |
| } |
| } |