blob: 97f7b86d0c3a7e2b9ce3fc5e4593835fe4e411ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.zk.internal;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.ShutdownPolicy;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CommunicationFailureResolver;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.NodeValidationFailedEvent;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2;
import static org.apache.ignite.internal.processors.security.SecurityUtils.authenticateLocalNode;
import static org.apache.ignite.internal.processors.security.SecurityUtils.withSecurityContext;
import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
import static org.apache.zookeeper.CreateMode.PERSISTENT;
/**
* Zookeeper Discovery Impl.
*/
public class ZookeeperDiscoveryImpl {
/** @see #IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD */
public static final int DFLT_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 5;
/** @see #IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT */
public static final int DFLT_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT = 60_000;
/** @see #IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS */
public static final int DFLT_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS = 100;
/** @see #IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE */
public static final int DFLT_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE = 0;
/** */
@SystemProperty(value = "Maximum count of unprocessed events", type = Long.class,
defaults = "" + DFLT_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD)
public static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
/** */
@SystemProperty(value = "Timeout to update processed events", type = Long.class,
defaults = "" + DFLT_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT)
public static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT";
/** */
@SystemProperty(value = "Maximum count of unrpocessed events", type = Long.class,
defaults = "" + DFLT_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS)
public static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS";
/** */
@SystemProperty(value = "Maximum count of events to delay alive nodes change process", type = Long.class,
defaults = "" + DFLT_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE)
public static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE =
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE";
/** */
final ZookeeperDiscoverySpi spi;
/** */
private final String igniteInstanceName;
/** */
private final String connectString;
/** */
private final int sesTimeout;
/** */
private final JdkMarshaller marsh = new JdkMarshaller();
/** */
private final ZkIgnitePaths zkPaths;
/** */
private final IgniteLogger log;
/** */
final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** */
private final ZookeeperClusterNode locNode;
/** */
private final DiscoverySpiListener lsnr;
/** */
private final DiscoverySpiDataExchange exchange;
/** */
private final boolean clientReconnectEnabled;
/** */
private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>();
/** */
private final int evtsAckThreshold;
/** */
private IgniteThreadPoolExecutor utilityPool;
/** */
private volatile ZkRuntimeState rtState;
/** */
private volatile ConnectionState connState = ConnectionState.STARTED;
/** */
private final AtomicBoolean stop = new AtomicBoolean();
/** */
private final Object stateMux = new Object();
/** */
public volatile IgniteDiscoverySpiInternalListener internalLsnr;
/** */
private final ConcurrentHashMap<Long, PingFuture> pingFuts = new ConcurrentHashMap<>();
/** */
private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>();
/** */
private long prevSavedEvtsTopVer;
/** */
private final ZookeeperDiscoveryStatistics stats;
/**
* @param spi Discovery SPI.
* @param igniteInstanceName Instance name.
* @param log Logger.
* @param zkRootPath Zookeeper base path node all nodes.
* @param locNode Local node instance.
* @param lsnr Discovery events listener.
* @param exchange Discovery data exchange.
* @param internalLsnr Internal listener (used for testing only).
* @param stats Zookeeper DiscoverySpi statistics collector.
*/
public ZookeeperDiscoveryImpl(
ZookeeperDiscoverySpi spi,
String igniteInstanceName,
IgniteLogger log,
String zkRootPath,
ZookeeperClusterNode locNode,
DiscoverySpiListener lsnr,
DiscoverySpiDataExchange exchange,
IgniteDiscoverySpiInternalListener internalLsnr,
ZookeeperDiscoveryStatistics stats) {
assert locNode.id() != null && locNode.isLocal() : locNode;
MarshallerUtils.setNodeName(marsh, igniteInstanceName);
zkPaths = new ZkIgnitePaths(zkRootPath);
this.spi = spi;
this.igniteInstanceName = igniteInstanceName;
this.connectString = spi.getZkConnectionString();
this.sesTimeout = (int)spi.getSessionTimeout();
this.log = log.getLogger(getClass());
this.locNode = locNode;
this.lsnr = lsnr;
this.exchange = exchange;
this.clientReconnectEnabled = locNode.isClient() && !spi.isClientReconnectDisabled();
int evtsAckThreshold = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD,
DFLT_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD);
if (evtsAckThreshold <= 0)
evtsAckThreshold = 1;
this.evtsAckThreshold = evtsAckThreshold;
if (internalLsnr != null)
this.internalLsnr = internalLsnr;
this.stats = stats;
}
/**
* @return Exception.
*/
private static IgniteClientDisconnectedCheckedException disconnectError() {
return new IgniteClientDisconnectedCheckedException(null, "Client node disconnected.");
}
/**
* @return Logger.
*/
IgniteLogger log() {
return log;
}
/**
* @return Local node instance.
*/
public ClusterNode localNode() {
return locNode;
}
/**
* @param nodeId Node ID.
* @return Node instance.
*/
@Nullable public ZookeeperClusterNode node(UUID nodeId) {
assert nodeId != null;
ZkRuntimeState rtState0 = rtState;
return rtState0 != null ? rtState0.top.nodesById.get(nodeId) : null;
}
/**
* @param nodeOrder Node order.
* @return Node instance.
*/
@Nullable public ZookeeperClusterNode node(long nodeOrder) {
assert nodeOrder > 0 : nodeOrder;
ZkRuntimeState rtState0 = rtState;
return rtState0 != null ? rtState.top.nodesByOrder.get(nodeOrder) : null;
}
/**
* @param fut Future to remove.
*/
void clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) {
assert fut.isDone() : fut;
commErrProcFut.compareAndSet(fut, null);
}
/**
* @param node0 Problem node ID
* @param err Connect error.
*/
public void resolveCommunicationError(ClusterNode node0, Exception err) {
if (node0.isClient())
return;
ZookeeperClusterNode node = node(node0.id());
if (node == null)
throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id()));
IgniteInternalFuture<Boolean> nodeStatusFut;
for (;;) {
checkState();
ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
if (fut == null || fut.isDone()) {
ZkCommunicationErrorProcessFuture newFut = ZkCommunicationErrorProcessFuture.createOnCommunicationError(
this,
node.sessionTimeout() + 1000);
stats.onCommunicationError();
if (commErrProcFut.compareAndSet(fut, newFut)) {
fut = newFut;
if (log.isInfoEnabled()) {
log.info("Created new communication error process future [errNode=" + node0.id() +
", err=" + err + ']');
}
try {
checkState();
}
catch (Exception e) {
fut.onError(e);
throw e;
}
fut.scheduleCheckOnTimeout();
}
else {
fut = commErrProcFut.get();
if (fut == null)
continue;
}
}
nodeStatusFut = fut.nodeStatusFuture(node);
if (nodeStatusFut != null)
break;
else {
try {
fut.get();
}
catch (IgniteCheckedException e) {
U.warn(log, "Previous communication error process future failed: " + e);
}
}
}
try {
if (!nodeStatusFut.get())
throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id()));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException(e);
}
}
/**
* @param nodeId Node ID.
* @return Ping result.
*/
public boolean pingNode(UUID nodeId) {
checkState();
ZkRuntimeState rtState = this.rtState;
ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId);
if (node == null)
return false;
if (node.isLocal())
return true;
PingFuture fut = pingFuts.get(node.order());
if (fut == null) {
fut = new PingFuture(rtState, node);
PingFuture old = pingFuts.putIfAbsent(node.order(), fut);
if (old == null) {
if (fut.checkNodeAndState())
spi.getSpiContext().addTimeoutObject(fut);
else
assert fut.isDone();
}
else
fut = old;
}
try {
return fut.get();
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException(e);
}
}
/**
* @param nodeId Node ID.
* @param warning Warning.
*/
public void failNode(UUID nodeId, @Nullable String warning) {
ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId);
if (node == null) {
if (log.isDebugEnabled())
log.debug("Ignore forcible node fail request, node does not exist: " + nodeId);
return;
}
if (!node.isClient()) {
U.warn(log, "Ignore forcible node fail request for non-client node: " + node);
return;
}
sendCustomMessage(new ZkForceNodeFailMessage(node.internalId(), warning));
}
/**
*
*/
public void reconnect() {
assert clientReconnectEnabled;
synchronized (stateMux) {
if (connState == ConnectionState.STARTED) {
connState = ConnectionState.DISCONNECTED;
rtState.onCloseStart(disconnectError());
}
else
return;
}
busyLock.block();
busyLock.unblock();
rtState.zkClient.close();
UUID newId = UUID.randomUUID();
U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
"newId=" + newId +
", prevId=" + locNode.id() +
", locNode=" + locNode + ']');
runInWorkerThread(new ReconnectClosure(newId));
}
/**
* @param newId New ID.
*/
private void doReconnect(UUID newId) {
if (rtState.joined) {
assert rtState.evtsData != null;
lsnr.onDiscovery(
new DiscoveryNotification(
EVT_CLIENT_NODE_DISCONNECTED,
rtState.evtsData.topVer,
locNode,
rtState.top.topologySnapshot(),
Collections.emptyMap(),
null,
null
)
).get();
}
try {
locNode.onClientDisconnected(newId);
joinTopology(rtState);
}
catch (Exception e) {
if (stopping()) {
if (log.isDebugEnabled())
log.debug("Reconnect failed, node is stopping [err=" + e + ']');
return;
}
U.error(log, "Failed to reconnect: " + e, e);
onSegmented(e);
}
}
/**
* @return {@code True} if started to stop.
*/
private boolean stopping() {
if (stop.get())
return true;
synchronized (stateMux) {
if (connState == ConnectionState.STOPPED)
return true;
}
return false;
}
/**
* @param e Error.
*/
private void onSegmented(Exception e) {
rtState.errForClose = e;
if (rtState.joined || joinFut.isDone()) {
synchronized (stateMux) {
connState = ConnectionState.STOPPED;
}
notifySegmented();
}
else
joinFut.onDone(e);
}
/**
*
*/
private void notifySegmented() {
List<ClusterNode> nodes = rtState.top.topologySnapshot();
if (nodes.isEmpty())
nodes = Collections.singletonList(locNode);
lsnr.onDiscovery(
new DiscoveryNotification(
EVT_NODE_SEGMENTED,
rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
locNode,
nodes,
Collections.emptyMap(),
null,
null
)
).get();
}
/**
* @return Remote nodes.
*/
public Collection<ClusterNode> remoteNodes() {
checkState();
return rtState.top.remoteNodes();
}
/**
* @param feature Feature to check.
* @return {@code true} if all nodes support the given feature, {@code false} otherwise.
*/
public boolean allNodesSupport(IgniteFeatures feature) {
checkState();
return rtState != null && rtState.top.isAllNodes(n -> IgniteFeatures.nodeSupports(n, feature));
}
/**
*
*/
private void checkState() {
switch (connState) {
case STARTED:
break;
case STOPPED:
throw new IgniteSpiException("Node stopped.");
case DISCONNECTED:
throw new IgniteClientDisconnectedException(null, "Client is disconnected.");
}
}
/**
* @param nodeId Node ID.
* @return {@code True} if node joined or joining topology.
*/
public boolean knownNode(UUID nodeId) {
while (!busyLock.enterBusy())
checkState();
try {
List<String> children = rtState.zkClient.getChildren(zkPaths.aliveNodesDir);
for (int i = 0; i < children.size(); i++) {
UUID id = ZkIgnitePaths.aliveNodeId(children.get(i));
if (nodeId.equals(id))
return true;
}
return false;
}
catch (ZookeeperClientFailedException e) {
if (clientReconnectEnabled)
throw new IgniteClientDisconnectedException(null, "Client is disconnected.");
throw new IgniteException(e);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInterruptedException(e);
}
finally {
busyLock.leaveBusy();
}
}
/**
* @param msg Message.
*/
public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
assert msg != null;
List<ClusterNode> nodes = rtState.top.topologySnapshot();
boolean hasServerNode = false;
for (int i = 0, size = nodes.size(); i < size; i++) {
ClusterNode node = nodes.get(i);
if (!node.isClient())
hasServerNode = true;
}
if (!hasServerNode)
throw new IgniteException("Failed to send custom message: no server nodes in topology.");
byte[] msgBytes;
try {
msgBytes = marshalZip(msg);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom message: " + msg, e);
}
while (!busyLock.enterBusy())
checkState();
try {
ZookeeperClient zkClient = rtState.zkClient;
saveCustomMessage(zkClient, msgBytes);
}
catch (ZookeeperClientFailedException e) {
if (clientReconnectEnabled)
throw new IgniteClientDisconnectedException(null, "Client is disconnected.");
throw new IgniteException(e);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInterruptedException(e);
}
finally {
busyLock.leaveBusy();
}
}
/**
* @param zkClient Client.
* @param msgBytes Marshalled message.
* @throws ZookeeperClientFailedException If connection to zk was lost.
* @throws InterruptedException If interrupted.
*/
private void saveCustomMessage(ZookeeperClient zkClient, byte[] msgBytes)
throws ZookeeperClientFailedException, InterruptedException {
String prefix = UUID.randomUUID().toString();
int partCnt = 1;
int overhead = 10;
UUID locId = locNode.id();
String path = zkPaths.createCustomEventPath(prefix, locId, partCnt);
if (zkClient.needSplitNodeData(path, msgBytes, overhead)) {
List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, overhead);
String partsBasePath = zkPaths.customEventPartsBasePath(prefix, locId);
saveMultipleParts(zkClient, partsBasePath, parts);
msgBytes = null;
partCnt = parts.size();
}
zkClient.createSequential(prefix,
zkPaths.customEvtsDir,
zkPaths.createCustomEventPath(prefix, locId, partCnt),
msgBytes,
CreateMode.PERSISTENT_SEQUENTIAL);
}
/**
* @return Cluster start time.
*/
public long gridStartTime() {
return rtState.gridStartTime;
}
/**
* Starts join procedure and waits for {@link EventType#EVT_NODE_JOINED} event for local node.
*
* @throws InterruptedException If interrupted.
*/
public void startJoinAndWait() throws InterruptedException {
joinTopology(null);
for (;;) {
try {
joinFut.get(10_000);
break;
}
catch (IgniteFutureTimeoutCheckedException e) {
U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']');
}
catch (Exception e) {
IgniteSpiException spiErr = X.cause(e, IgniteSpiException.class);
if (spiErr != null)
throw spiErr;
throw new IgniteSpiException("Failed to join cluster", e);
}
}
}
/**
* @param prevState Previous state in case of connect retry.
* @throws InterruptedException If interrupted.
*/
private void joinTopology(@Nullable ZkRuntimeState prevState) throws InterruptedException {
if (!busyLock.enterBusy())
return;
try {
// Need fire EVT_CLIENT_NODE_RECONNECTED event if reconnect after already joined.
boolean reconnect = locNode.isClient() && prevState != null && (prevState.joined || prevState.reconnect);
IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
if (internalLsnr != null)
internalLsnr.beforeJoin(locNode, log);
if (reconnect)
locNode.setAttributes(spi.getLocNodeAttrs());
marshalCredentialsOnJoin(locNode);
synchronized (stateMux) {
if (connState == ConnectionState.STOPPED)
return;
connState = ConnectionState.STARTED;
}
ZkRuntimeState rtState = this.rtState = new ZkRuntimeState(reconnect);
DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id(), locNode.isClient());
exchange.collect(discoDataBag);
ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, discoDataBag.joiningNodeData());
byte[] joinDataBytes;
try {
joinDataBytes = marshalZip(joinData);
}
catch (Exception e) {
throw new IgniteSpiException("Failed to marshal joining node data", e);
}
try {
rtState.zkClient = new ZookeeperClient(
igniteInstanceName,
log,
connectString,
sesTimeout,
new ConnectionLossListener());
}
catch (Exception e) {
throw new IgniteSpiException("Failed to create Zookeeper client", e);
}
startJoin(rtState, prevState, joinDataBytes);
}
finally {
busyLock.leaveBusy();
}
}
/**
* @throws InterruptedException If interrupted.
*/
private void initZkNodes() throws InterruptedException {
try {
ZookeeperClient client = rtState.zkClient;
if (!client.exists(zkPaths.clusterDir)) {
createRootPathParents(zkPaths.clusterDir, client);
client.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT);
}
List<String> createdDirs = client.getChildren(zkPaths.clusterDir);
String[] requiredDirs = {
zkPaths.evtsPath,
zkPaths.joinDataDir,
zkPaths.customEvtsDir,
zkPaths.customEvtsPartsDir,
zkPaths.customEvtsAcksDir,
zkPaths.aliveNodesDir,
zkPaths.stoppedNodesFlagsDir};
List<String> dirs = new ArrayList<>();
for (String dir : requiredDirs) {
String dir0 = dir.substring(zkPaths.clusterDir.length() + 1);
if (!createdDirs.contains(dir0))
dirs.add(dir);
}
if (!dirs.isEmpty())
client.createAll(dirs, PERSISTENT);
}
catch (ZookeeperClientFailedException e) {
throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e);
}
}
/**
* @param rootDir Root directory.
* @param client Client.
* @throws ZookeeperClientFailedException If connection to zk was lost.
* @throws InterruptedException If interrupted.
*/
private void createRootPathParents(String rootDir, ZookeeperClient client)
throws ZookeeperClientFailedException, InterruptedException {
int startIdx = 0;
for (;;) {
int separatorIdx = rootDir.indexOf(ZkIgnitePaths.PATH_SEPARATOR, startIdx);
if (separatorIdx == -1)
break;
if (separatorIdx > 0) {
String path = rootDir.substring(0, separatorIdx);
client.createIfNeeded(path, null, CreateMode.PERSISTENT);
}
startIdx = separatorIdx + 1;
}
}
/**
* @param zkClient Client.
* @param basePath Base path.
* @param partCnt Parts count.
*/
private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt) {
for (int i = 0; i < partCnt; i++) {
String path = multipartPathName(basePath, i);
zkClient.deleteIfExistsAsync(path);
}
}
/**
* @param zkClient Client.
* @param basePath Base path.
* @param partCnt Parts count.
* @return Read parts.
* @throws Exception If failed.
*/
private byte[] readMultipleParts(ZookeeperClient zkClient, String basePath, int partCnt)
throws Exception {
assert partCnt >= 1;
if (partCnt > 1) {
List<byte[]> parts = new ArrayList<>(partCnt);
int totSize = 0;
for (int i = 0; i < partCnt; i++) {
byte[] part = zkClient.getData(multipartPathName(basePath, i));
parts.add(part);
totSize += part.length;
}
byte[] res = new byte[totSize];
int pos = 0;
for (int i = 0; i < partCnt; i++) {
byte[] part = parts.get(i);
System.arraycopy(part, 0, res, pos, part.length);
pos += part.length;
}
return res;
}
else
return zkClient.getData(multipartPathName(basePath, 0));
}
/**
* @param zkClient Client.
* @param basePath Base path.
* @param parts Data parts.
* @return Number of parts.
* @throws ZookeeperClientFailedException If client failed.
* @throws InterruptedException If interrupted.
*/
private int saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts)
throws ZookeeperClientFailedException, InterruptedException {
assert parts.size() > 1;
for (int i = 0; i < parts.size(); i++) {
byte[] part = parts.get(i);
String path = multipartPathName(basePath, i);
zkClient.createIfNeeded(path, part, PERSISTENT);
}
return parts.size();
}
/**
* @param basePath Base path.
* @param part Part number.
* @return Path.
*/
private static String multipartPathName(String basePath, int part) {
return basePath + String.format("%04d", part);
}
/**
* @param rtState Runtime state.
* @param joinDataBytes Joining node data.
* @param prevState Previous state in case of connect retry.
* @throws InterruptedException If interrupted.
*/
private void startJoin(ZkRuntimeState rtState, @Nullable ZkRuntimeState prevState, final byte[] joinDataBytes)
throws InterruptedException {
try {
long startTime = System.currentTimeMillis();
initZkNodes();
String prefix = UUID.randomUUID().toString();
rtState.init(new ZkWatcher(rtState), new AliveNodeDataWatcher(rtState));
ZookeeperClient zkClient = rtState.zkClient;
final int OVERHEAD = 5;
// TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193
String joinDataPath = ZkIgnitePaths.join(zkPaths.joinDataDir, prefix + ":" + locNode.id());
if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, OVERHEAD)) {
List<byte[]> parts = zkClient.splitNodeData(joinDataPath, joinDataBytes, OVERHEAD);
rtState.joinDataPartCnt = parts.size();
saveMultipleParts(zkClient, joinDataPath + ":", parts);
joinDataPath = zkClient.createIfNeeded(
joinDataPath,
marshalZip(new ZkJoiningNodeData(parts.size())),
PERSISTENT);
}
else {
joinDataPath = zkClient.createIfNeeded(
joinDataPath,
joinDataBytes,
PERSISTENT);
}
rtState.locNodeZkPath = zkClient.createSequential(
prefix,
zkPaths.aliveNodesDir,
zkPaths.aliveNodePathForCreate(prefix, locNode),
null,
EPHEMERAL_SEQUENTIAL);
rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath);
if (log.isInfoEnabled()) {
log.info("Node started join [nodeId=" + locNode.id() +
", instanceName=" + locNode.attribute(ATTR_IGNITE_INSTANCE_NAME) +
", zkSessionId=0x" + Long.toHexString(rtState.zkClient.zk().getSessionId()) +
", joinDataSize=" + joinDataBytes.length +
(rtState.joinDataPartCnt > 1 ? (", joinDataPartCnt=" + rtState.joinDataPartCnt) : "") +
", consistentId=" + locNode.consistentId() +
", initTime=" + (System.currentTimeMillis() - startTime) +
", nodePath=" + rtState.locNodeZkPath + ']');
}
/*
If node can not join due to validation error this error is reported in join data,
As a minor optimization do not start watch join data immediately, but only if do not receive
join event after some timeout.
*/
CheckJoinErrorWatcher joinErrorWatcher = new CheckJoinErrorWatcher(5000, joinDataPath, rtState);
rtState.joinErrTo = joinErrorWatcher.timeoutObj;
if (locNode.isClient() && spi.getJoinTimeout() > 0) {
ZkTimeoutObject joinTimeoutObj = prevState != null ? prevState.joinTo : null;
if (joinTimeoutObj == null) {
joinTimeoutObj = new JoinTimeoutObject(spi.getJoinTimeout());
spi.getSpiContext().addTimeoutObject(joinTimeoutObj);
}
rtState.joinTo = joinTimeoutObj;
}
if (!locNode.isClient())
zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState));
zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, rtState.watcher);
spi.getSpiContext().addTimeoutObject(rtState.joinErrTo);
}
catch (IgniteCheckedException | ZookeeperClientFailedException e) {
throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e);
}
}
/**
* Authenticate local node.
*
* @param nodeAuth Authenticator.
* @param locCred Local security credentials for authentication.
* @throws IgniteSpiException If any error occurs.
*/
private void localAuthentication(DiscoverySpiNodeAuthenticator nodeAuth, SecurityCredentials locCred) {
try {
locNode.setAttributes(withSecurityContext(
authenticateLocalNode(locNode, locCred, nodeAuth),
locNode.attributes(),
marsh
));
}
catch (Exception e) {
throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e);
}
}
/**
* @param node Node.
* @param zipBytes Zip-compressed marshalled security subject.
* @throws Exception If failed.
*/
private void setNodeSecuritySubject(ZookeeperClusterNode node, byte[] zipBytes) throws Exception {
assert zipBytes != null;
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
attrs.put(ATTR_SECURITY_SUBJECT_V2, unzip(zipBytes));
node.setAttributes(attrs);
}
/**
* @param node Node.
* @return Credentials.
* @throws IgniteCheckedException If failed to unmarshal.
*/
private SecurityCredentials unmarshalCredentials(ZookeeperClusterNode node) throws Exception {
byte[] credBytes = (byte[])node.getAttributes().get(ATTR_SECURITY_CREDENTIALS);
if (credBytes == null)
return null;
return unmarshalZip(credBytes);
}
/**
* Marshalls credentials with discovery SPI marshaller (will replace attribute value).
*
* @param node Node to marshall credentials for.
* @throws IgniteSpiException If marshalling failed.
*/
private void marshalCredentialsOnJoin(ZookeeperClusterNode node) throws IgniteSpiException {
try {
// Use security-unsafe getter.
Map<String, Object> attrs0 = node.getAttributes();
Object creds = attrs0.get(ATTR_SECURITY_CREDENTIALS);
if (creds != null) {
Map<String, Object> attrs = new HashMap<>(attrs0);
assert !(creds instanceof byte[]);
attrs.put(ATTR_SECURITY_CREDENTIALS, marshalZip(creds));
node.setAttributes(attrs);
}
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
}
}
/**
*
*/
private class UpdateProcessedEventsTimeoutObject extends ZkTimeoutObject {
/** */
private final ZkRuntimeState rtState;
/**
* @param rtState Runtime state.
* @param timeout Timeout.
*/
UpdateProcessedEventsTimeoutObject(ZkRuntimeState rtState, long timeout) {
super(timeout);
this.rtState = rtState;
}
/** {@inheritDoc} */
@Override public void onTimeout() {
runInWorkerThread(new ZkRunnable(rtState, ZookeeperDiscoveryImpl.this) {
@Override protected void run0() throws Exception {
updateProcessedEventsOnTimeout(rtState, UpdateProcessedEventsTimeoutObject.this);
}
});
}
}
/**
*
*/
private class JoinTimeoutObject extends ZkTimeoutObject {
/**
* @param timeout Timeout.
*/
JoinTimeoutObject(long timeout) {
super(timeout);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (cancelled || rtState.joined)
return;
runInWorkerThread(new Runnable() {
@Override public void run() {
synchronized (stateMux) {
if (cancelled || rtState.joined)
return;
if (connState == ConnectionState.STOPPED)
return;
connState = ConnectionState.STOPPED;
}
U.warn(log, "Failed to connect to cluster, either connection to ZooKeeper can not be established or there " +
"are no alive server nodes (consider increasing 'joinTimeout' configuration property) [" +
"joinTimeout=" + spi.getJoinTimeout() + ']');
// Note: exception message is checked in tests.
onSegmented(new IgniteSpiException("Failed to connect to cluster within configured timeout"));
}
});
}
}
/**
*
*/
private class CheckJoinErrorWatcher extends ZkAbstractWatcher implements AsyncCallback.DataCallback {
/** */
private final String joinDataPath;
/** */
private ZkTimeoutObject timeoutObj;
/**
* @param timeout Timeout.
* @param joinDataPath0 Node joined data path.
* @param rtState0 State.
*/
CheckJoinErrorWatcher(long timeout, String joinDataPath0, ZkRuntimeState rtState0) {
super(rtState0, ZookeeperDiscoveryImpl.this);
this.joinDataPath = joinDataPath0;
timeoutObj = new ZkTimeoutObject(timeout) {
@Override public void onTimeout() {
if (rtState.errForClose != null || rtState.joined)
return;
synchronized (stateMux) {
if (connState != ConnectionState.STARTED)
return;
}
rtState.zkClient.getDataAsync(joinDataPath,
CheckJoinErrorWatcher.this,
CheckJoinErrorWatcher.this);
// Client alive node can be deleted if join request wasn't processed between cluster restarts.
if (locNode.isClient()) {
ClientLocalNodeWatcher watcher = new ClientLocalNodeWatcher(rtState,
CheckJoinErrorWatcher.this);
rtState.zkClient.existsAsync(rtState.locNodeZkPath, watcher, watcher);
}
}
};
}
/** {@inheritDoc} */
@Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (rc != 0)
return;
processData(data);
}
/**
* Checks for join error synchronously.
*/
void checkJoinError() {
try {
byte[] data = rtState.zkClient.getData(joinDataPath);
processData(data);
}
catch (Exception ignored) {
// No-op.
}
}
/**
* @param data Data.
*/
private void processData(byte[] data) {
if (!onProcessStart())
return;
try {
Object obj = unmarshalZip(data);
if (obj instanceof ZkInternalJoinErrorMessage) {
ZkInternalJoinErrorMessage joinErr = (ZkInternalJoinErrorMessage)obj;
onSegmented(new IgniteSpiException(joinErr.err));
}
onProcessEnd();
}
catch (Throwable e) {
onProcessError(e);
}
}
/** {@inheritDoc} */
@Override public void process0(WatchedEvent evt) {
if (rtState.errForClose != null || rtState.joined)
return;
if (evt.getType() == Event.EventType.NodeDataChanged)
rtState.zkClient.getDataAsync(evt.getPath(), this, this);
}
}
/**
* @param aliveNodes Alive nodes.
* @throws Exception If failed.
*/
private void checkIsCoordinator(final List<String> aliveNodes) throws Exception {
assert !locNode.isClient();
TreeMap<Long, String> aliveSrvs = new TreeMap<>();
long locInternalOrder = rtState.internalOrder;
for (String aliveNodePath : aliveNodes) {
if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
continue;
Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
aliveSrvs.put(internalId, aliveNodePath);
}
assert !aliveSrvs.isEmpty();
Map.Entry<Long, String> crdE = aliveSrvs.firstEntry();
if (locInternalOrder == crdE.getKey())
onBecomeCoordinator(aliveNodes);
else {
assert aliveSrvs.size() > 1 : aliveSrvs;
Map.Entry<Long, String> prevE = aliveSrvs.floorEntry(locInternalOrder - 1);
assert prevE != null;
if (log.isInfoEnabled()) {
log.info("Discovery coordinator already exists, watch for previous server node [" +
"locId=" + locNode.id() +
", watchPath=" + prevE.getValue() + ']');
}
PreviousNodeWatcher watcher = new ServerPreviousNodeWatcher(rtState);
rtState.zkClient.existsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, prevE.getValue()), watcher, watcher);
}
}
/**
* @param aliveNodes Alive nodes.
* @throws Exception If failed.
*/
private void checkClientsStatus(final List<String> aliveNodes) throws Exception {
assert locNode.isClient() : locNode;
assert rtState.joined;
assert rtState.evtsData != null;
TreeMap<Long, String> aliveClients = new TreeMap<>();
String srvPath = null;
Long srvInternalOrder = null;
long locInternalOrder = rtState.internalOrder;
for (String aliveNodePath : aliveNodes) {
Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
aliveClients.put(internalId, aliveNodePath);
else {
if (srvInternalOrder == null || internalId < srvInternalOrder) {
srvPath = aliveNodePath;
srvInternalOrder = internalId;
}
}
}
// This situation may appear while reconnection and this callback can be skipped.
if (!aliveClients.containsKey(locInternalOrder))
return;
Map.Entry<Long, String> oldest = aliveClients.firstEntry();
boolean oldestClient = locInternalOrder == oldest.getKey();
if (srvPath == null) {
if (oldestClient) {
Stat stat = new Stat();
ZkDiscoveryEventsData prevEvts = rtState.evtsData;
byte[] evtsBytes = rtState.zkClient.getData(zkPaths.evtsPath, stat);
assert evtsBytes.length > 0;
ZkDiscoveryEventsData newEvts = unmarshalZip(evtsBytes);
if (prevEvts.clusterId.equals(newEvts.clusterId)) {
U.warn(log, "All server nodes failed, notify all clients [locId=" + locNode.id() + ']');
try {
generateNoServersEvent(newEvts, stat);
}
catch (KeeperException.BadVersionException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to save no servers message. Path version changed.");
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null,
new CheckClientsStatusCallback(rtState));
}
}
else
U.warn(log, "All server nodes failed (received events from new cluster).");
}
}
else {
String watchPath;
if (oldestClient) {
watchPath = srvPath;
if (log.isInfoEnabled()) {
log.info("Servers exists, watch for server node [locId=" + locNode.id() +
", watchPath=" + watchPath + ']');
}
}
else {
assert aliveClients.size() > 1 : aliveClients;
Map.Entry<Long, String> prevE = aliveClients.floorEntry(locInternalOrder - 1);
assert prevE != null;
watchPath = prevE.getValue();
if (log.isInfoEnabled()) {
log.info("Servers exists, watch for previous node [locId=" + locNode.id() +
", watchPath=" + watchPath + ']');
}
}
PreviousNodeWatcher watcher = new ClientPreviousNodeWatcher(rtState);
rtState.zkClient.existsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, watchPath), watcher, watcher);
}
}
/**
* @param evtsData Events data.
* @param evtsStat Events zookeeper state.
* @throws Exception If failed.
*/
private void generateNoServersEvent(ZkDiscoveryEventsData evtsData, Stat evtsStat) throws Exception {
evtsData.evtIdGen++;
ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
evtsData.evtIdGen,
0L,
evtsData.topVer,
locNode.id(),
new ZkNoServersMessage(),
null);
Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();
evtsData.addEvent(nodesToAck, evtData);
byte[] newEvtsBytes = marshalZip(evtsData);
rtState.zkClient.setData(zkPaths.evtsPath, newEvtsBytes, evtsStat.getVersion());
}
/**
* @param lastEvts Last events from previous coordinator.
* @throws Exception If failed.
*/
private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) throws Exception {
for (String stoppedFlagPath : rtState.zkClient.getChildren(zkPaths.stoppedNodesFlagsDir)) {
long leftIntId = ZkIgnitePaths.stoppedFlagNodeInternalId(stoppedFlagPath);
if (!rtState.top.nodesByInternalId.containsKey(leftIntId)) {
rtState.zkClient.deleteIfExistsAsync(
ZkIgnitePaths.join(zkPaths.stoppedNodesFlagsDir, stoppedFlagPath)
);
}
}
for (ZkDiscoveryEventData evtData : lastEvts.evts.values()) {
if (evtData instanceof ZkDiscoveryCustomEventData) {
ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData;
// It is possible previous coordinator failed before finished cleanup.
if (evtData0.msg instanceof ZkCommunicationErrorResolveFinishMessage) {
try {
ZkCommunicationErrorResolveFinishMessage msg =
(ZkCommunicationErrorResolveFinishMessage)evtData0.msg;
ZkCommunicationErrorResolveResult res = unmarshalZip(
ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, msg.futId));
deleteAliveNodes(res.killedNodes);
}
catch (KeeperException.NoNodeException ignore) {
// No-op.
}
}
else if (evtData0.resolvedMsg instanceof ZkForceNodeFailMessage)
deleteAliveNode(((ZkForceNodeFailMessage)evtData0.resolvedMsg).nodeInternalId);
}
}
}
/**
* @param aliveNodes Alive nodes paths.
* @throws Exception If failed.
*/
private void onBecomeCoordinator(List<String> aliveNodes) throws Exception {
ZkDiscoveryEventsData prevEvts = processNewEvents(rtState.zkClient.getData(zkPaths.evtsPath));
rtState.crd = true;
if (rtState.joined) {
if (log.isInfoEnabled())
log.info("Node is new discovery coordinator [locId=" + locNode.id() + ']');
assert locNode.order() > 0 : locNode;
assert rtState.evtsData != null;
previousCoordinatorCleanup(rtState.evtsData);
UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
if (futId != null) {
if (log.isInfoEnabled()) {
log.info("New discovery coordinator will handle already started cluster-wide communication " +
"error resolve [reqId=" + futId + ']');
}
ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
ZkDistributedCollectDataFuture collectResFut = collectCommunicationStatusFuture(futId);
if (fut != null)
fut.nodeResultCollectFuture(collectResFut);
}
for (ZkDiscoveryEventData evtData : rtState.evtsData.evts.values())
evtData.initRemainingAcks(rtState.top.nodesByOrder.values());
handleProcessedEvents("crd");
}
else {
String locAlivePath = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
deleteJoiningNodeData(locNode.id(),
ZkIgnitePaths.aliveNodePrefixId(locAlivePath),
rtState.joinDataPartCnt);
DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
if (nodeAuth != null) {
try {
if (log.isInfoEnabled()) {
log.info("Node is first server node in cluster, try authenticate local node " +
"[locId=" + locNode.id() + ']');
}
localAuthentication(nodeAuth, unmarshalCredentials(locNode));
}
catch (Exception e) {
U.warn(log, "Local node authentication failed: " + e, e);
onSegmented(e);
// Stop any further processing.
throw new ZookeeperClientFailedException("Local node authentication failed: " + e);
}
}
newClusterStarted(prevEvts);
}
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher);
rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, rtState.watcher, rtState.watcher);
for (String alivePath : aliveNodes)
watchAliveNodeData(alivePath);
}
/**
* @param alivePath Node path.
*/
private void watchAliveNodeData(String alivePath) {
assert rtState.locNodeZkPath != null;
String path = ZkIgnitePaths.join(zkPaths.aliveNodesDir, alivePath);
if (!path.equals(rtState.locNodeZkPath))
rtState.zkClient.getDataAsync(path, rtState.aliveNodeDataWatcher, rtState.aliveNodeDataWatcher);
}
/**
* @param aliveNodes ZK nodes representing alive cluster nodes.
* @throws Exception If failed.
*/
private void generateTopologyEvents(List<String> aliveNodes) throws Exception {
assert rtState.crd;
if (log.isInfoEnabled())
log.info("Process alive nodes change [alives=" + aliveNodes.size() + "]");
if (rtState.updateAlives) {
aliveNodes = rtState.zkClient.getChildren(zkPaths.aliveNodesDir);
rtState.updateAlives = false;
}
Set<Long> stoppedNodes = new HashSet<>();
for (String stoppedFlagPath : rtState.zkClient.getChildren(zkPaths.stoppedNodesFlagsDir))
stoppedNodes.add(ZkIgnitePaths.stoppedFlagNodeInternalId(stoppedFlagPath));
TreeMap<Long, String> alives = new TreeMap<>();
for (String child : aliveNodes) {
Long internalId = ZkIgnitePaths.aliveInternalId(child);
Object old = alives.put(internalId, child);
assert old == null;
}
TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(rtState.top.nodesByOrder);
int newEvts = 0;
final int MAX_NEW_EVTS = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS,
DFLT_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS);
List<ZookeeperClusterNode> failedNodes = null;
for (Map.Entry<Long, ZookeeperClusterNode> e : rtState.top.nodesByInternalId.entrySet()) {
if (!alives.containsKey(e.getKey())) {
ZookeeperClusterNode failedNode = e.getValue();
if (failedNodes == null)
failedNodes = new ArrayList<>();
failedNodes.add(failedNode);
generateNodeLeave(curTop, failedNode, !stoppedNodes.contains(failedNode.internalId()));
newEvts++;
if (newEvts == MAX_NEW_EVTS) {
saveAndProcessNewEvents();
if (log.isInfoEnabled()) {
log.info("Delay alive nodes change process, max event threshold reached [newEvts=" + newEvts +
", totalEvts=" + rtState.evtsData.evts.size() + ']');
}
handleProcessedEventsOnNodesFail(failedNodes);
throttleNewEventsGeneration();
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher);
return;
}
}
}
// Process failures before processing join, otherwise conflicts are possible in case of fast node stop/re-start.
if (newEvts > 0) {
saveAndProcessNewEvents();
handleProcessedEventsOnNodesFail(failedNodes);
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher);
return;
}
generateJoinEvents(curTop, alives, MAX_NEW_EVTS);
if (failedNodes != null)
handleProcessedEventsOnNodesFail(failedNodes);
}
/**
* @param curTop Current topology.
* @param alives Alive znodes.
* @param MAX_NEW_EVTS Max event to process.
* @throws Exception If failed.
*/
private void generateJoinEvents(TreeMap<Long, ZookeeperClusterNode> curTop,
TreeMap<Long, String> alives,
final int MAX_NEW_EVTS) throws Exception {
ZkBulkJoinContext joinCtx = new ZkBulkJoinContext();
for (Map.Entry<Long, String> e : alives.entrySet()) {
Long internalId = e.getKey();
if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
UUID rslvFutId = rtState.evtsData.communicationErrorResolveFutureId();
if (rslvFutId != null) {
if (log.isInfoEnabled()) {
log.info("Delay alive nodes change process while communication error resolve " +
"is in progress [reqId=" + rslvFutId + ']');
}
break;
}
processJoinOnCoordinator(joinCtx, curTop, internalId, e.getValue());
if (joinCtx.nodes() == MAX_NEW_EVTS) {
generateBulkJoinEvent(curTop, joinCtx);
if (log.isInfoEnabled()) {
log.info("Delay alive nodes change process, max event threshold reached [" +
"newEvts=" + joinCtx.nodes() +
", totalEvts=" + rtState.evtsData.evts.size() + ']');
}
throttleNewEventsGeneration();
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher);
return;
}
}
}
if (joinCtx.nodes() > 0)
generateBulkJoinEvent(curTop, joinCtx);
}
/**
* @param curTop Current topology.
* @param joinCtx Joined nodes context.
* @throws Exception If failed.
*/
private void generateBulkJoinEvent(TreeMap<Long, ZookeeperClusterNode> curTop, ZkBulkJoinContext joinCtx)
throws Exception {
rtState.evtsData.evtIdGen++;
long evtId = rtState.evtsData.evtIdGen;
List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes = joinCtx.nodes;
assert nodes != null && !nodes.isEmpty();
int nodeCnt = nodes.size();
List<ZkJoinedNodeEvtData> joinedNodes = new ArrayList<>(nodeCnt);
Map<Long, byte[]> discoDataMap = U.newHashMap(nodeCnt);
Map<Long, Long> dupDiscoData = null;
for (int i = 0; i < nodeCnt; i++) {
T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>> nodeEvtData = nodes.get(i);
Map<Integer, Serializable> discoData = nodeEvtData.get2();
byte[] discoDataBytes = U.marshal(marsh, discoData);
Long dupDataNode = null;
for (Map.Entry<Long, byte[]> e : discoDataMap.entrySet()) {
if (Arrays.equals(discoDataBytes, e.getValue())) {
dupDataNode = e.getKey();
break;
}
}
long nodeTopVer = nodeEvtData.get1().topVer;
if (dupDataNode != null) {
if (dupDiscoData == null)
dupDiscoData = new HashMap<>();
Long old = dupDiscoData.put(nodeTopVer, dupDataNode);
assert old == null : old;
}
else
discoDataMap.put(nodeTopVer, discoDataBytes);
joinedNodes.add(nodeEvtData.get1());
}
int overhead = 5;
ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined(
new ArrayList<>(curTop.values()),
discoDataMap,
dupDiscoData);
byte[] dataForJoinedBytes = marshalZip(dataForJoined);
long addDataStart = System.currentTimeMillis();
int dataForJoinedPartCnt = saveData(zkPaths.joinEventDataPathForJoined(evtId),
dataForJoinedBytes,
overhead);
long addDataTime = System.currentTimeMillis() - addDataStart;
ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData(
evtId,
rtState.evtsData.topVer,
joinedNodes,
dataForJoinedPartCnt);
rtState.evtsData.addEvent(curTop.values(), evtData);
if (log.isInfoEnabled()) {
if (nodeCnt > 1) {
log.info("Generated NODE_JOINED bulk event [" +
"nodeCnt=" + nodeCnt +
", dataForJoinedSize=" + dataForJoinedBytes.length +
", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
", addDataTime=" + addDataTime +
", evt=" + evtData + ']');
}
else {
log.info("Generated NODE_JOINED event [" +
"dataForJoinedSize=" + dataForJoinedBytes.length +
", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
", addDataTime=" + addDataTime +
", evt=" + evtData + ']');
}
}
saveAndProcessNewEvents();
}
/**
*
*/
private void throttleNewEventsGeneration() {
long delay = IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE,
DFLT_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE);
if (delay > 0) {
if (log.isInfoEnabled())
log.info("Sleep delay before generate new events [delay=" + delay + ']');
try {
Thread.sleep(delay);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* @param nodeId Node ID.
* @param prefixId Path prefix.
* @return Join data.
* @throws Exception If failed.
*/
private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) throws Exception {
String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
byte[] joinData = rtState.zkClient.getData(joinDataPath);
Object dataObj = unmarshalZip(joinData);
if (!(dataObj instanceof ZkJoiningNodeData))
throw new Exception("Invalid joined node data: " + dataObj);
ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
if (joiningNodeData.partCount() > 1) {
joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount());
joiningNodeData = unmarshalZip(joinData);
}
return joiningNodeData;
}
/**
* @param nodeId Node ID.
* @param prefixId Path prefix.
* @param aliveNodePath Node path.
* @return Join data.
* @throws Exception If failed.
*/
private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, String aliveNodePath) throws Exception {
String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
byte[] joinData = rtState.zkClient.getData(joinDataPath);
Object dataObj;
try {
dataObj = unmarshalZip(joinData);
if (dataObj instanceof ZkInternalJoinErrorMessage)
return dataObj;
}
catch (Exception e) {
U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e);
return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
"Failed to unmarshal join data: " + e);
}
assert dataObj instanceof ZkJoiningNodeData : dataObj;
ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
if (joiningNodeData.partCount() > 1) {
joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount());
try {
joiningNodeData = unmarshalZip(joinData);
}
catch (Exception e) {
U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e);
return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
"Failed to unmarshal join data: " + e);
}
}
assert joiningNodeData.node() != null : joiningNodeData;
return joiningNodeData;
}
/**
* @param joinCtx Joined nodes context.
* @param curTop Current nodes.
* @param internalId Joined node internal ID.
* @param aliveNodePath Joined node path.
* @throws Exception If failed.
*/
private void processJoinOnCoordinator(
ZkBulkJoinContext joinCtx,
TreeMap<Long, ZookeeperClusterNode> curTop,
long internalId,
String aliveNodePath
) throws Exception {
UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
UUID prefixId = ZkIgnitePaths.aliveNodePrefixId(aliveNodePath);
Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, aliveNodePath);
if (data instanceof ZkJoiningNodeData) {
ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)data;
ZkNodeValidateResult validateRes = validateJoiningNode(joiningNodeData);
if (validateRes.err == null) {
ZookeeperClusterNode joinedNode = joiningNodeData.node();
assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
addJoinedNode(
joinCtx,
curTop,
joiningNodeData,
internalId,
prefixId,
validateRes.secSubjZipBytes);
watchAliveNodeData(aliveNodePath);
}
else {
ZkInternalJoinErrorMessage joinErr = new ZkInternalJoinErrorMessage(
ZkIgnitePaths.aliveInternalId(aliveNodePath),
validateRes.err);
processJoinError(aliveNodePath, nodeId, prefixId, joinErr);
}
}
else {
assert data instanceof ZkInternalJoinErrorMessage : data;
processJoinError(aliveNodePath, nodeId, prefixId, (ZkInternalJoinErrorMessage)data);
}
}
/**
* @param aliveNodePath Joined node path.
* @param nodeId Node ID.
* @param prefixId Path prefix ID.
* @param joinErr Join error message.
* @throws Exception If failed.
*/
private void processJoinError(String aliveNodePath,
UUID nodeId,
UUID prefixId,
ZkInternalJoinErrorMessage joinErr) throws Exception {
ZookeeperClient client = rtState.zkClient;
if (joinErr.notifyNode) {
String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
client.setData(joinDataPath, marshalZip(joinErr), -1);
}
else if (log.isInfoEnabled())
log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath);
client.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, aliveNodePath), -1);
}
/**
* @param joiningNodeData Joining node data.
* @return Validation result.
*/
private ZkNodeValidateResult validateJoiningNode(ZkJoiningNodeData joiningNodeData) {
ZookeeperClusterNode node = joiningNodeData.node();
ZookeeperClusterNode node0 = rtState.top.nodesById.get(node.id());
if (node0 != null) {
U.error(log, "Failed to include node in cluster, node with the same ID already exists [joiningNode=" + node +
", existingNode=" + node0 + ']');
// Note: exception message is checked in tests.
return new ZkNodeValidateResult("Node with the same ID already exists: " + node0);
}
ZkNodeValidateResult res = authenticateNode(node);
if (res.err != null)
return res;
IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
if (err == null) {
DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(node.id(), joiningNodeData.node().isClient());
joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
err = spi.getSpiContext().validateNode(node, joiningNodeBag);
}
if (err != null) {
spi.getSpiContext().recordEvent(new NodeValidationFailedEvent(locNode, node, err));
LT.warn(log, err.message());
res.err = err.sendMessage();
}
return res;
}
/**
* @param node Node.
* @return Validation result.
*/
private ZkNodeValidateResult authenticateNode(ZookeeperClusterNode node) {
DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
if (nodeAuth == null)
return new ZkNodeValidateResult((byte[])null);
SecurityCredentials cred;
try {
cred = unmarshalCredentials(node);
}
catch (Exception e) {
U.error(log, "Failed to unmarshal node credentials: " + e, e);
return new ZkNodeValidateResult("Failed to unmarshal node credentials");
}
SecurityContext subj = nodeAuth.authenticateNode(node, cred);
if (subj == null) {
U.warn(log, "Authentication failed [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']');
// Note: exception message test is checked in tests.
return new ZkNodeValidateResult("Authentication failed");
}
if (!(subj instanceof Serializable)) {
U.warn(log, "Authentication subject is not Serializable [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']');
return new ZkNodeValidateResult("Authentication subject is not serializable");
}
byte[] secSubjZipBytes;
try {
secSubjZipBytes = marshalZip(subj);
node.setAttributes(withSecurityContext(subj, node.getAttributes(), marsh));
}
catch (Exception e) {
U.error(log, "Failed to marshal node security subject: " + e, e);
return new ZkNodeValidateResult("Failed to marshal node security subject");
}
return new ZkNodeValidateResult(secSubjZipBytes);
}
/**
* @throws Exception If failed.
*/
private void saveAndProcessNewEvents() throws Exception {
if (stopping())
return;
long start = System.currentTimeMillis();
byte[] evtsBytes = marshalZip(rtState.evtsData);
rtState.zkClient.setData(zkPaths.evtsPath, evtsBytes, -1);
long time = System.currentTimeMillis() - start;
if (prevSavedEvtsTopVer != rtState.evtsData.topVer) {
if (log.isInfoEnabled()) {
log.info("Discovery coordinator saved new topology events [topVer=" + rtState.evtsData.topVer +
", size=" + evtsBytes.length +
", evts=" + rtState.evtsData.evts.size() +
", lastEvt=" + rtState.evtsData.evtIdGen +
", saveTime=" + time + ']');
}
prevSavedEvtsTopVer = rtState.evtsData.topVer;
}
else if (log.isDebugEnabled()) {
log.debug("Discovery coordinator saved new topology events [topVer=" + rtState.evtsData.topVer +
", size=" + evtsBytes.length +
", evts=" + rtState.evtsData.evts.size() +
", lastEvt=" + rtState.evtsData.evtIdGen +
", saveTime=" + time + ']');
}
processNewEvents(rtState.evtsData);
}
/**
* @param curTop Current topology.
* @param leftNode Failed node.
* @param failed Whether node failed or stopped.
*/
private void generateNodeLeave(
TreeMap<Long, ZookeeperClusterNode> curTop,
ZookeeperClusterNode leftNode,
boolean failed
) {
Object rmvd = curTop.remove(leftNode.order());
assert rmvd != null;
rtState.evtsData.topVer++;
rtState.evtsData.evtIdGen++;
ZkDiscoveryNodeLeaveEventData evtData = new ZkDiscoveryNodeLeaveEventData(
rtState.evtsData.evtIdGen,
rtState.evtsData.topVer,
leftNode.internalId(),
failed
);
rtState.evtsData.addEvent(curTop.values(), evtData);
if (log.isInfoEnabled()) {
String evtName = failed ? "NODE_FAILED" : "NODE_LEFT";
log.info("Generated " + evtName + " event [evt=" + evtData + ']');
}
}
/**
* @param curTop Current nodes.
* @param joiningNodeData Join data.
* @param internalId Joined node internal ID.
* @param prefixId Unique path prefix.
* @param secSubjZipBytes Marshalled security subject.
* @throws Exception If failed.
*/
private void addJoinedNode(
ZkBulkJoinContext joinCtx,
TreeMap<Long, ZookeeperClusterNode> curTop,
ZkJoiningNodeData joiningNodeData,
long internalId,
UUID prefixId,
@Nullable byte[] secSubjZipBytes
) throws Exception {
ZookeeperClusterNode joinedNode = joiningNodeData.node();
UUID nodeId = joinedNode.id();
rtState.evtsData.topVer++;
joinedNode.order(rtState.evtsData.topVer);
joinedNode.internalId(internalId);
DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId, joiningNodeData.node().isClient());
joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
exchange.onExchange(joiningNodeBag);
DiscoveryDataBag collectBag = new DiscoveryDataBag(nodeId,
new HashSet<Integer>(),
joiningNodeData.node().isClient());
collectBag.joiningNodeData(joiningNodeBag.joiningNodeData());
exchange.collect(collectBag);
Map<Integer, Serializable> commonData = collectBag.commonData();
Object old = curTop.put(joinedNode.order(), joinedNode);
assert old == null;
int overhead = 5;
int secSubjPartCnt = 0;
if (secSubjZipBytes != null) {
secSubjPartCnt = saveData(zkPaths.joinEventSecuritySubjectPath(joinedNode.order()),
secSubjZipBytes,
overhead);
assert secSubjPartCnt > 0 : secSubjPartCnt;
if (spi.getAuthenticator() == null)
setNodeSecuritySubject(joinedNode, secSubjZipBytes);
}
ZkJoinedNodeEvtData nodeEvtData = new ZkJoinedNodeEvtData(
rtState.evtsData.topVer,
joinedNode.id(),
joinedNode.internalId(),
prefixId,
joiningNodeData.partCount(),
secSubjPartCnt);
nodeEvtData.joiningNodeData = joiningNodeData;
joinCtx.addJoinedNode(nodeEvtData, commonData);
rtState.evtsData.onNodeJoin(joinedNode);
}
/**
* @param path Path to save.
* @param bytes Bytes to save.
* @param overhead Extra overhead.
* @return Parts count.
* @throws Exception If failed.
*/
private int saveData(String path, byte[] bytes, int overhead) throws Exception {
int dataForJoinedPartCnt = 1;
if (rtState.zkClient.needSplitNodeData(path, bytes, overhead)) {
dataForJoinedPartCnt = saveMultipleParts(rtState.zkClient,
path,
rtState.zkClient.splitNodeData(path, bytes, overhead));
}
else {
rtState.zkClient.createIfNeeded(multipartPathName(path, 0),
bytes,
PERSISTENT);
}
return dataForJoinedPartCnt;
}
/**
* @param prevEvts Events from previous cluster.
* @throws Exception If failed.
*/
private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) throws Exception {
assert !locNode.isClient() : locNode;
long locInternalId = rtState.internalOrder;
assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId;
spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
cleanupPreviousClusterData(prevEvts != null ? prevEvts.maxInternalOrder + 1 : -1L);
rtState.joined = true;
rtState.gridStartTime = System.currentTimeMillis();
rtState.evtsData = ZkDiscoveryEventsData.createForNewCluster(rtState.gridStartTime);
if (log.isInfoEnabled()) {
log.info("New cluster started [locId=" + locNode.id() +
", clusterId=" + rtState.evtsData.clusterId +
", startTime=" + rtState.evtsData.clusterStartTime + ']');
}
locNode.internalId(locInternalId);
locNode.order(1);
rtState.evtsData.onNodeJoin(locNode);
rtState.top.addNode(locNode);
final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
try {
lsnr.onDiscovery(
new DiscoveryNotification(
EVT_NODE_JOINED,
1L,
locNode,
topSnapshot,
Collections.emptyMap(),
null,
null
)
).get();
}
catch (IgniteException e) {
joinFut.onDone(e);
throw new IgniteException("Failed to wait for discovery listener notification on node join", e);
}
// Reset events (this is also notification for clients left from previous cluster).
rtState.zkClient.setData(zkPaths.evtsPath, marshalZip(rtState.evtsData), -1);
joinFut.onDone();
}
/**
* @param startInternalOrder Starting internal order for cluster (znodes having lower order belong
* to clients from previous cluster and should be removed).
* @throws Exception If failed.
*/
private void cleanupPreviousClusterData(long startInternalOrder) throws Exception {
long start = System.currentTimeMillis();
ZookeeperClient client = rtState.zkClient;
List<String> batch = new LinkedList<>();
List<String> evtChildren = client.getChildrenPaths(zkPaths.evtsPath);
for (String evtPath : evtChildren)
batch.addAll(client.getChildrenPaths(evtPath));
batch.addAll(evtChildren);
batch.addAll(client.getChildrenPaths(zkPaths.customEvtsDir));
batch.addAll(client.getChildrenPaths(zkPaths.customEvtsPartsDir));
batch.addAll(client.getChildrenPaths(zkPaths.customEvtsAcksDir));
batch.addAll(client.getChildrenPaths(zkPaths.stoppedNodesFlagsDir));
client.deleteAll(batch, -1);
if (startInternalOrder > 0) {
for (String alive : client.getChildren(zkPaths.aliveNodesDir)) {
if (ZkIgnitePaths.aliveInternalId(alive) < startInternalOrder)
client.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, alive), -1);
}
}
long time = System.currentTimeMillis() - start;
if (time > 0) {
if (log.isInfoEnabled())
log.info("Previous cluster data cleanup time: " + time);
}
}
/**
* @param zkClient Client.
* @param evtPath Event path.
* @param sndNodeId Sender node ID.
* @return Event data.
* @throws Exception If failed.
*/
private byte[] readCustomEventData(ZookeeperClient zkClient, String evtPath, UUID sndNodeId) throws Exception {
int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
if (partCnt > 1) {
String partsBasePath = zkPaths.customEventPartsBasePath(
ZkIgnitePaths.customEventPrefix(evtPath), sndNodeId);
return readMultipleParts(zkClient, partsBasePath, partCnt);
}
else
return zkClient.getData(ZkIgnitePaths.join(zkPaths.customEvtsDir, evtPath));
}
/**
* @param customEvtNodes ZK nodes representing custom events to process.
* @throws Exception If failed.
*/
private void generateCustomEvents(List<String> customEvtNodes) throws Exception {
assert rtState.crd;
ZookeeperClient zkClient = rtState.zkClient;
ZkDiscoveryEventsData evtsData = rtState.evtsData;
TreeMap<Integer, String> unprocessedEvts = null;
for (int i = 0; i < customEvtNodes.size(); i++) {
String evtPath = customEvtNodes.get(i);
int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
if (evtSeq > evtsData.procCustEvt) {
if (unprocessedEvts == null)
unprocessedEvts = new TreeMap<>();
unprocessedEvts.put(evtSeq, evtPath);
}
}
if (unprocessedEvts == null)
return;
for (Map.Entry<Integer, String> evtE : unprocessedEvts.entrySet()) {
evtsData.procCustEvt = evtE.getKey();
String evtPath = evtE.getValue();
UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId);
if (sndNode != null) {
byte[] evtBytes = readCustomEventData(zkClient, evtPath, sndNodeId);
DiscoverySpiCustomMessage msg;
try {
msg = unmarshalZip(evtBytes);
}
catch (Exception e) {
U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
deleteCustomEventDataAsync(rtState.zkClient, evtPath);
continue;
}
generateAndProcessCustomEventOnCoordinator(evtPath, sndNode, msg);
}
else {
U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
deleteCustomEventDataAsync(rtState.zkClient, evtPath);
}
}
}
/**
* @param evtPath Event data path.
* @param sndNode Sender node.
* @param msg Message instance.
* @throws Exception If failed.
*/
private void generateAndProcessCustomEventOnCoordinator(String evtPath,
ZookeeperClusterNode sndNode,
DiscoverySpiCustomMessage msg
) throws Exception {
ZookeeperClient zkClient = rtState.zkClient;
ZkDiscoveryEventsData evtsData = rtState.evtsData;
ZookeeperClusterNode failedNode = null;
if (msg instanceof ZkForceNodeFailMessage) {
ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg;
failedNode = rtState.top.nodesByInternalId.get(msg0.nodeInternalId);
if (failedNode != null)
evtsData.topVer++;
else {
if (log.isDebugEnabled())
log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeInternalId);
deleteCustomEventDataAsync(zkClient, evtPath);
return;
}
}
else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
ZkCommunicationErrorResolveStartMessage msg0 =
(ZkCommunicationErrorResolveStartMessage)msg;
if (evtsData.communicationErrorResolveFutureId() != null) {
if (log.isInfoEnabled()) {
log.info("Ignore communication error resolve message, resolve process " +
"already started [sndNode=" + sndNode + ']');
}
deleteCustomEventDataAsync(zkClient, evtPath);
return;
}
else {
if (log.isInfoEnabled()) {
log.info("Start cluster-wide communication error resolve [sndNode=" + sndNode +
", reqId=" + msg0.id +
", topVer=" + evtsData.topVer + ']');
}
zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id),
null,
PERSISTENT);
evtsData.communicationErrorResolveFutureId(msg0.id);
}
}
evtsData.evtIdGen++;
ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
evtsData.evtIdGen,
0L,
evtsData.topVer,
sndNode.id(),
null,
evtPath);
evtData.resolvedMsg = msg;
if (log.isDebugEnabled())
log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
if (msg instanceof ZkInternalMessage)
processInternalMessage(evtData, (ZkInternalMessage)msg);
else
notifyCustomEvent(evtData, msg);
if (evtData != null) {
evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
rtState.locNodeInfo.lastProcEvt = evtData.eventId();
saveAndProcessNewEvents();
if (failedNode != null) {
deleteAliveNode(failedNode.internalId());
handleProcessedEventsOnNodesFail(Collections.singletonList(failedNode));
rtState.updateAlives = true;
}
}
}
/**
* @param internalId Node internal ID.
* @throws Exception If failed.
*/
private void deleteAliveNode(long internalId) throws Exception {
for (String child : rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
if (ZkIgnitePaths.aliveInternalId(child) == internalId) {
// Need use sync delete to do not process again join of this node again.
rtState.zkClient.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, child), -1);
return;
}
}
}
/**
* @param zkClient Client.
* @param evtPath Event path.
*/
private void deleteCustomEventDataAsync(ZookeeperClient zkClient, String evtPath) {
if (log.isDebugEnabled())
log.debug("Delete custom event data: " + evtPath);
String prefix = ZkIgnitePaths.customEventPrefix(evtPath);
UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
assert partCnt >= 1 : partCnt;
if (partCnt > 1) {
for (int i = 0; i < partCnt; i++) {
String path0 = zkPaths.customEventPartPath(prefix, sndNodeId, i);
zkClient.deleteIfExistsAsync(path0);
}
}
zkClient.deleteIfExistsAsync(ZkIgnitePaths.join(zkPaths.customEvtsDir, evtPath));
}
/**
* @param data Marshalled events.
* @throws Exception If failed.
* @return Events.
*/
@Nullable private ZkDiscoveryEventsData processNewEvents(byte[] data) throws Exception {
ZkDiscoveryEventsData newEvts = data.length > 0 ? (ZkDiscoveryEventsData)unmarshalZip(data) : null;
if (rtState.joined && (newEvts == null || !rtState.evtsData.clusterId.equals(newEvts.clusterId))) {
assert locNode.isClient() : locNode;
throw localNodeFail("All server nodes failed, client node disconnected (received events from new custer) " +
"[locId=" + locNode.id() + ']', true);
}
if (newEvts == null)
return null;
assert !rtState.crd;
// Need keep processed custom events since they contain message object which is needed to create ack.
if (!locNode.isClient() && rtState.evtsData != null) {
for (Map.Entry<Long, ZkDiscoveryEventData> e : rtState.evtsData.evts.entrySet()) {
ZkDiscoveryEventData evtData = e.getValue();
if (evtData.eventType() == ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT) {
ZkDiscoveryCustomEventData evtData0 =
(ZkDiscoveryCustomEventData)newEvts.evts.get(evtData.eventId());
if (evtData0 != null)
evtData0.resolvedMsg = ((ZkDiscoveryCustomEventData)evtData).resolvedMsg;
}
}
}
processNewEvents(newEvts);
return newEvts;
}
/**
* @param evtsData Events.
* @throws Exception If failed.
*/
private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception {
TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
ZookeeperClient zkClient = rtState.zkClient;
boolean evtProcessed = false;
boolean updateNodeInfo = false;
try {
for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
if (log.isDebugEnabled())
log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
switch (evtData.eventType()) {
case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
evtProcessed = processBulkJoin(evtsData, (ZkDiscoveryNodeJoinEventData)evtData);
break;
}
case ZkDiscoveryEventData.ZK_EVT_NODE_LEFT: {
if (!rtState.joined)
break;
evtProcessed = true;
notifyNodeLeave((ZkDiscoveryNodeLeaveEventData)evtData);
break;
}
case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
if (!rtState.joined)
break;
evtProcessed = true;
ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData;
if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order())
break;
DiscoverySpiCustomMessage msg;
if (rtState.crd) {
assert evtData0.resolvedMsg != null : evtData0;
msg = evtData0.resolvedMsg;
}
else {
if (evtData0.msg == null) {
if (evtData0.ackEvent()) {
String path = zkPaths.ackEventDataPath(evtData0.origEvtId);
msg = unmarshalZip(zkClient.getData(path));
}
else {
assert evtData0.evtPath != null : evtData0;
byte[] msgBytes = readCustomEventData(zkClient,
evtData0.evtPath,
evtData0.sndNodeId);
msg = unmarshalZip(msgBytes);
}
}
else
msg = evtData0.msg;
evtData0.resolvedMsg = msg;
}
if (msg instanceof ZkInternalMessage)
processInternalMessage(evtData0, (ZkInternalMessage)msg);
else {
notifyCustomEvent(evtData0, msg);
if (!evtData0.ackEvent())
updateNodeInfo = true;
}
break;
}
default:
assert false : "Invalid event: " + evtData;
}
if (rtState.joined) {
rtState.locNodeInfo.lastProcEvt = evtData.eventId();
rtState.procEvtCnt++;
if (rtState.procEvtCnt % evtsAckThreshold == 0)
updateNodeInfo = true;
}
}
}
catch (KeeperException.NoNodeException e) {
// Can get NoNodeException if local node was forcible failed,
// in this case coordinator does not wait when this node process all events.
boolean exists;
try {
exists = rtState.zkClient.exists(rtState.locNodeZkPath);
}
catch (Exception e0) {
if (log.isDebugEnabled())
log.debug("Failed to check is local node is alive:" + e0);
exists = true;
}
if (!exists) {
U.warn(log, "Failed to process discovery event, local node was forced to stop.", e);
throw localNodeFail("Local node was forced to stop.", true);
}
throw e;
}
if (rtState.joined) {
rtState.evtsData = evtsData;
stats.onTopologyChanged(rtState.evtsData.topVer);
}
if (rtState.crd)
handleProcessedEvents("procEvt");
else
onEventProcessed(rtState, updateNodeInfo, evtProcessed);
ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
if (commErrFut != null)
commErrFut.onTopologyChange(rtState.top); // This can add new event, notify out of event process loop.
}
/** */
private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData)
throws Exception {
boolean evtProcessed = false;
for (int i = 0; i < evtData.joinedNodes.size(); i++) {
ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i);
if (!rtState.joined) {
UUID joinedId = joinedEvtData.nodeId;
boolean locJoin = joinedEvtData.joinedInternalId == rtState.internalOrder;
if (locJoin) {
assert locNode.id().equals(joinedId);
processLocalJoin(evtsData, joinedEvtData, evtData);
evtProcessed = true;
}
}
else {
ZkJoiningNodeData joiningData;
if (rtState.crd) {
assert joinedEvtData.joiningNodeData != null;
joiningData = joinedEvtData.joiningNodeData;
}
else {
joiningData = unmarshalJoinData(joinedEvtData.nodeId, joinedEvtData.joinDataPrefixId);
DiscoveryDataBag dataBag = new DiscoveryDataBag(joinedEvtData.nodeId, joiningData.node().isClient());
dataBag.joiningNodeData(joiningData.discoveryData());
exchange.onExchange(dataBag);
}
if (joinedEvtData.secSubjPartCnt > 0 && joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null)
readAndInitSecuritySubject(joiningData.node(), joinedEvtData);
notifyNodeJoin(joinedEvtData, joiningData);
}
}
return evtProcessed;
}
/**
* @param rtState Runtime state.
* @param updateNodeInfo {@code True} if need update processed events without delay.
* @param evtProcessed {@code True} if new event was processed.
* @throws Exception If failed.
*/
private void onEventProcessed(ZkRuntimeState rtState,
boolean updateNodeInfo,
boolean evtProcessed
) throws Exception {
synchronized (stateMux) {
if (updateNodeInfo) {
assert rtState.locNodeZkPath != null;
if (log.isDebugEnabled())
log.debug("Update processed events: " + rtState.locNodeInfo.lastProcEvt);
updateProcessedEvents(rtState);
if (rtState.procEvtsUpdateTo != null) {
spi.getSpiContext().removeTimeoutObject(rtState.procEvtsUpdateTo);
rtState.procEvtsUpdateTo = null;
}
}
else if (evtProcessed) {
rtState.locNodeInfo.needUpdate = true;
if (rtState.procEvtsUpdateTo == null) {
long updateTimeout = IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT,
DFLT_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT);
if (updateTimeout > 0) {
rtState.procEvtsUpdateTo = new UpdateProcessedEventsTimeoutObject(rtState, updateTimeout);
spi.getSpiContext().addTimeoutObject(rtState.procEvtsUpdateTo);
}
}
}
}
}
/**
* @param rtState Runtime state.
* @param procEvtsUpdateTo Timeout object.
* @throws Exception If failed.
*/
private void updateProcessedEventsOnTimeout(ZkRuntimeState rtState, ZkTimeoutObject procEvtsUpdateTo)
throws Exception {
synchronized (stateMux) {
if (rtState.procEvtsUpdateTo == procEvtsUpdateTo && rtState.locNodeInfo.needUpdate) {
if (log.isDebugEnabled())
log.debug("Update processed events on timeout: " + rtState.locNodeInfo.lastProcEvt);
updateProcessedEvents(rtState);
}
}
}
/**
* @param rtState Runtime state.
* @throws Exception If failed.
*/
private void updateProcessedEvents(ZkRuntimeState rtState) throws Exception {
try {
rtState.zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1);
rtState.locNodeInfo.needUpdate = false;
}
catch (KeeperException.NoNodeException e) {
// Possible if node is forcible failed.
if (log.isDebugEnabled())
log.debug("Failed to update processed events, no node: " + rtState.locNodeInfo.lastProcEvt);
}
}
/**
* @param node Node.
* @param joinedEvtData Joined node information.
* @throws Exception If failed.
*/
private void readAndInitSecuritySubject(ZookeeperClusterNode node, ZkJoinedNodeEvtData joinedEvtData) throws Exception {
if (joinedEvtData.secSubjPartCnt > 0) {
byte[] zipBytes = readMultipleParts(rtState.zkClient,
zkPaths.joinEventSecuritySubjectPath(joinedEvtData.topVer),
joinedEvtData.secSubjPartCnt);
setNodeSecuritySubject(node, zipBytes);
}
}
/**
* @param evtsData Events data.
* @param evtData Local join event data.
* @throws Exception If failed.
*/
private void processLocalJoin(ZkDiscoveryEventsData evtsData,
ZkJoinedNodeEvtData joinedEvtData,
ZkDiscoveryNodeJoinEventData evtData
) throws Exception {
synchronized (stateMux) {
if (connState == ConnectionState.STOPPED)
return;
if (rtState.joinTo != null) {
spi.getSpiContext().removeTimeoutObject(rtState.joinTo);
rtState.joinTo.cancelled = true;
rtState.joinTo = null;
}
spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
if (log.isInfoEnabled())
log.info("Local join event data: " + joinedEvtData + ']');
String path = zkPaths.joinEventDataPathForJoined(evtData.eventId());
byte[] dataForJoinedBytes = readMultipleParts(rtState.zkClient, path, evtData.dataForJoinedPartCnt);
ZkJoinEventDataForJoined dataForJoined = unmarshalZip(dataForJoinedBytes);
rtState.gridStartTime = evtsData.clusterStartTime;
locNode.internalId(joinedEvtData.joinedInternalId);
locNode.order(joinedEvtData.topVer);
readAndInitSecuritySubject(locNode, joinedEvtData);
byte[] discoDataBytes = dataForJoined.discoveryDataForNode(locNode.order());
Map<Integer, Serializable> commonDiscoData =
marsh.unmarshal(discoDataBytes, U.resolveClassLoader(spi.ignite().configuration()));
DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id(), locNode.isClient());
dataBag.commonData(commonDiscoData);
exchange.onExchange(dataBag);
List<ZookeeperClusterNode> allNodes = dataForJoined.topology();
for (int i = 0; i < allNodes.size(); i++) {
ZookeeperClusterNode node = allNodes.get(i);
// Need filter since ZkJoinEventDataForJoined contains single topology snapshot for all joined nodes.
if (node.order() >= locNode.order())
break;
rtState.top.addNode(node);
}
rtState.top.addNode(locNode);
final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
lsnr.onDiscovery(
new DiscoveryNotification(
EVT_NODE_JOINED,
joinedEvtData.topVer,
locNode,
topSnapshot,
Collections.emptyMap(),
null,
null
)
).get();
if (rtState.reconnect) {
lsnr.onDiscovery(
new DiscoveryNotification(
EVT_CLIENT_NODE_RECONNECTED,
joinedEvtData.topVer,
locNode,
topSnapshot,
Collections.emptyMap(),
null,
null
)
).get();
U.quietAndWarn(log, "Client node was reconnected after it was already considered failed [locId=" + locNode.id() + ']');
}
rtState.joined = true;
}
joinFut.onDone();
if (locNode.isClient())
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckClientsStatusCallback(rtState));
}
/**
* @param evtData Event daa.
* @param msg Message.
* @throws Exception If failed.
*/
private void processInternalMessage(ZkDiscoveryCustomEventData evtData, ZkInternalMessage msg) throws Exception {
if (msg instanceof ZkForceNodeFailMessage)
processForceNodeFailMessage((ZkForceNodeFailMessage)msg, evtData);
else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
processCommunicationErrorResolveStartMessage(
(ZkCommunicationErrorResolveStartMessage)msg,
evtData);
}
else if (msg instanceof ZkCommunicationErrorResolveFinishMessage) {
processCommunicationErrorResolveFinishMessage(
(ZkCommunicationErrorResolveFinishMessage)msg);
}
else if (msg instanceof ZkNoServersMessage)
processNoServersMessage((ZkNoServersMessage)msg);
}
/**
* @param msg Message.
* @throws Exception If failed.
*/
private void processNoServersMessage(ZkNoServersMessage msg) throws Exception {
assert locNode.isClient() : locNode;
throw localNodeFail("All server nodes failed, client node disconnected " +
"(received 'no-servers' message) [locId=" + locNode.id() + ']', true);
}
/**
* @param msg Message.
* @param evtData Event data.
* @throws Exception If failed.
*/
private void processForceNodeFailMessage(ZkForceNodeFailMessage msg, ZkDiscoveryCustomEventData evtData)
throws Exception {
ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId);
ZookeeperClusterNode node = rtState.top.nodesByInternalId.get(msg.nodeInternalId);
assert node != null : msg.nodeInternalId;
if (msg.warning != null) {
U.warn(log, "Received force EVT_NODE_FAILED event with warning [" +
"nodeId=" + node.id() +
", msg=" + msg.warning +
", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + ']');
}
else {
U.warn(log, "Received force EVT_NODE_FAILED event [" +
"nodeId=" + node.id() +
", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + ']');
}
if (node.isLocal())
throw localNodeFail("Received force EVT_NODE_FAILED event for local node.", true);
else
notifyNodeFail(node.internalId(), evtData.topologyVersion());
}
/**
* @param msg Message.
* @throws Exception If failed.
*/
private void processCommunicationErrorResolveFinishMessage(ZkCommunicationErrorResolveFinishMessage msg)
throws Exception {
UUID futId = msg.futId;
assert futId != null;
if (log.isInfoEnabled())
log.info("Received communication error resolve finish message [reqId=" + futId + ']');
rtState.commErrProcNodes = null;
ZkCommunicationErrorResolveResult res = msg.res;
if (res == null)
res = unmarshalZip(ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, futId));
ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
assert fut != null;
Set<Long> failedNodes = null;
if (res.err != null)
U.error(log, "Communication error resolve failed: " + res.err, res.err);
else {
if (res.killedNodes != null) {
failedNodes = U.newHashSet(res.killedNodes.size());
for (int i = 0; i < res.killedNodes.size(); i++) {
long internalId = res.killedNodes.get(i);
if (internalId == locNode.internalId()) {
fut.onError(new IgniteCheckedException("Local node is forced to stop " +
"by communication error resolver"));
if (rtState.crd)
deleteAliveNodes(res.killedNodes);
throw localNodeFail("Local node is forced to stop by communication error resolver " +
"[nodeId=" + locNode.id() + ']', false);
}
ZookeeperClusterNode node = rtState.top.nodesByInternalId.get(internalId);
assert node != null : internalId;
failedNodes.add(node.order());
}
long topVer = msg.topVer;
for (int i = 0; i < res.killedNodes.size(); i++) {
long nodeInternalId = res.killedNodes.get(i);
ClusterNode node = rtState.top.nodesByInternalId.get(nodeInternalId);
assert node != null : nodeInternalId;
if (log.isInfoEnabled())
log.info("Node stop is forced by communication error resolver [nodeId=" + node.id() + ']');
notifyNodeFail(nodeInternalId, ++topVer);
}
}
}
fut.onFinishResolve(failedNodes);
if (rtState.crd)
deleteAliveNodes(res.killedNodes);
}
/**
* @param internalIds Nodes internal IDs.
* @throws Exception If failed.
*/
private void deleteAliveNodes(@Nullable GridLongList internalIds) throws Exception {
if (internalIds == null)
return;
List<String> alives = rtState.zkClient.getChildren(zkPaths.aliveNodesDir);
for (int i = 0; i < alives.size(); i++) {
String alive = alives.get(i);
if (internalIds.contains(ZkIgnitePaths.aliveInternalId(alive)))
rtState.zkClient.deleteIfExistsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, alive));
}
}
/**
* @param msg Message.
* @param evtData Event data.
* @throws Exception If failed.
*/
private void processCommunicationErrorResolveStartMessage(ZkCommunicationErrorResolveStartMessage msg,
ZkDiscoveryCustomEventData evtData) throws Exception {
ZkCommunicationErrorProcessFuture fut;
for (;;) {
fut = commErrProcFut.get();
if (fut == null || fut.isDone()) {
ZkCommunicationErrorProcessFuture newFut =
ZkCommunicationErrorProcessFuture.createOnStartResolveRequest(this);
if (commErrProcFut.compareAndSet(fut, newFut))
fut = newFut;
else
fut = commErrProcFut.get();
}
if (fut.onStartResolveRequest(evtData.topologyVersion()))
break;
else {
try {
fut.get();
}
catch (Exception e) {
U.warn(log, "Previous communication error process future failed: " + e);
}
}
}
if (log.isInfoEnabled()) {
log.info("Received communication error resolve request [reqId=" + msg.id +
", topVer=" + rtState.top.topologySnapshot() + ']');
}
assert !fut.isDone() : fut;
final String futPath = zkPaths.distributedFutureBasePath(msg.id);
final ZkCommunicationErrorProcessFuture fut0 = fut;
rtState.commErrProcNodes = rtState.top.topologySnapshot();
if (rtState.crd) {
ZkDistributedCollectDataFuture nodeResFut = collectCommunicationStatusFuture(msg.id);
fut.nodeResultCollectFuture(nodeResFut);
}
runInWorkerThread(new ZkRunnable(rtState, this) {
@Override protected void run0() throws Exception {
fut0.checkConnection(rtState, futPath, rtState.commErrProcNodes);
}
});
}
/**
* @param futId Future ID.
* @return Future.
* @throws Exception If failed.
*/
private ZkDistributedCollectDataFuture collectCommunicationStatusFuture(UUID futId) throws Exception {
return new ZkDistributedCollectDataFuture(this, rtState, zkPaths.distributedFutureBasePath(futId),
new Callable<Void>() {
@Override public Void call() throws Exception {
// Future is completed from ZK event thread.
onCommunicationErrorResolveStatusReceived(rtState);
return null;
}
}
);
}
/**
* @param rtState Runtime state.
* @throws Exception If failed.
*/
private void onCommunicationErrorResolveStatusReceived(final ZkRuntimeState rtState) throws Exception {
ZkDiscoveryEventsData evtsData = rtState.evtsData;
UUID futId = evtsData.communicationErrorResolveFutureId();
if (log.isInfoEnabled())
log.info("Received communication status from all nodes [reqId=" + futId + ']');
assert futId != null;
String futPath = zkPaths.distributedFutureBasePath(futId);
List<ClusterNode> initialNodes = rtState.commErrProcNodes;
assert initialNodes != null;
rtState.commErrProcNodes = null;
List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
Map<UUID, BitSet> nodesRes = U.newHashMap(topSnapshot.size());
Exception err = null;
for (ClusterNode node : topSnapshot) {
byte[] stateBytes = ZkDistributedCollectDataFuture.readNodeResult(futPath,
rtState.zkClient,
node.order());
ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes);
if (nodeState.err != null) {
if (err == null)
err = new Exception("Failed to resolve communication error.");
err.addSuppressed(nodeState.err);
}
else {
assert nodeState.commState != null;
nodesRes.put(node.id(), nodeState.commState);
}
}
long topVer = evtsData.topVer;
GridLongList killedNodesList = null;
if (err == null) {
boolean fullyConnected = true;
for (Map.Entry<UUID, BitSet> e : nodesRes.entrySet()) {
if (!checkFullyConnected(e.getValue(), initialNodes, rtState.top)) {
fullyConnected = false;
break;
}
}
if (fullyConnected) {
if (log.isInfoEnabled()) {
log.info("Finish communication error resolve process automatically, there are no " +
"communication errors [reqId=" + futId + ']');
}
}
else {
CommunicationFailureResolver rslvr = spi.ignite().configuration().getCommunicationFailureResolver();
if (rslvr != null) {
if (log.isInfoEnabled()) {
log.info("Call communication error resolver [reqId=" + futId +
", rslvr=" + rslvr.getClass().getSimpleName() + ']');
}
ZkCommunicationFailureContext ctx = new ZkCommunicationFailureContext(
((IgniteKernal)spi.ignite()).context().cache().context(),
topSnapshot,
initialNodes,
nodesRes);
try {
rslvr.resolve(ctx);
Set<ClusterNode> killedNodes = ctx.killedNodes();
if (killedNodes != null) {
if (log.isInfoEnabled()) {
log.info("Communication error resolver forced nodes stop [reqId=" + futId +
", killNodeCnt=" + killedNodes.size() +
", nodeIds=" + U.nodeIds(killedNodes) + ']');
}
killedNodesList = new GridLongList(killedNodes.size());
for (ClusterNode killedNode : killedNodes) {
killedNodesList.add(((ZookeeperClusterNode)killedNode).internalId());
evtsData.topVer++;
}
}
}
catch (Exception e) {
err = e;
U.error(log, "Failed to resolve communication error with configured resolver [reqId=" + futId + ']', e);
}
}
}
}
evtsData.communicationErrorResolveFutureId(null);
ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(killedNodesList, err);
ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId, topVer);
msg.res = res;
ZkDistributedCollectDataFuture.saveResult(zkPaths.distributedFutureResultPath(futId),
rtState.zkClient,
marshalZip(res));
evtsData.evtIdGen++;
ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
evtsData.evtIdGen,
0L,
topVer,
locNode.id(),
msg,
null);
evtData.resolvedMsg = msg;
evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
saveAndProcessNewEvents();
// Need re-check alive nodes in case join was delayed.
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher);
}
/**
* @param commState Node communication state.
* @param initialNodes Topology snapshot when communication error resolve started.
* @param top Current topology.
* @return {@code True} if node has connection to all alive nodes.
*/
private boolean checkFullyConnected(BitSet commState, List<ClusterNode> initialNodes, ZkClusterNodes top) {
int startIdx = 0;
for (;;) {
int idx = commState.nextClearBit(startIdx);
if (idx >= initialNodes.size())
return true;
ClusterNode node = initialNodes.get(idx);
if (top.nodesById.containsKey(node.id()))
return false;
startIdx = idx + 1;
}
}
/**
*
*/
public void simulateNodeFailure() {
ZkRuntimeState rtState = this.rtState;
ZookeeperClient client = rtState.zkClient;
client.deleteIfExistsAsync(zkPaths.aliveNodesDir);
rtState.onCloseStart(new IgniteCheckedException("Simulate node failure error."));
rtState.zkClient.close();
}
/**
* @param evtData Event data.
* @param msg Custom message.
*/
private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
assert !(msg instanceof ZkInternalMessage) : msg;
if (log.isDebugEnabled())
log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
final ZookeeperClusterNode sndNode = rtState.top.nodesById.get(evtData.sndNodeId);
assert sndNode != null : evtData;
final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
IgniteFuture<?> fut = lsnr.onDiscovery(
new DiscoveryNotification(
DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
evtData.topologyVersion(),
sndNode,
topSnapshot,
Collections.emptyMap(),
msg,
null
)
);
if (msg != null && msg.isMutable())
fut.get();
}
/**
* @param joinedEvtData Event data.
* @param joiningData Joining node data.
*/
private void notifyNodeJoin(ZkJoinedNodeEvtData joinedEvtData, ZkJoiningNodeData joiningData) {
final ZookeeperClusterNode joinedNode = joiningData.node();
joinedNode.order(joinedEvtData.topVer);
joinedNode.internalId(joinedEvtData.joinedInternalId);
rtState.top.addNode(joinedNode);
final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
lsnr.onDiscovery(
new DiscoveryNotification(
EVT_NODE_JOINED,
joinedEvtData.topVer,
joinedNode,
topSnapshot,
Collections.emptyMap(),
null,
null
)
).get();
stats.onNodeJoined();
}
/**
* @param evtData Event data.
*/
private void notifyNodeLeave(final ZkDiscoveryNodeLeaveEventData evtData) {
notifyNodeLeave(evtData.leftNodeInternalId(), evtData.topologyVersion(), evtData.failed());
}
/**
* @param nodeInternalOrder Node order.
* @param topVer Topology version.
*/
private void notifyNodeFail(long nodeInternalOrder, long topVer) {
notifyNodeLeave(nodeInternalOrder, topVer, true);
}
/**
* @param nodeInternalOrder Node order.
* @param topVer Topology version.
* @param failed {@code true} if node failed, {@code false} otherwise.
*/
private void notifyNodeLeave(long nodeInternalOrder, long topVer, boolean failed) {
final ZookeeperClusterNode leftNode = rtState.top.removeNode(nodeInternalOrder);
assert leftNode != null && !leftNode.isLocal() : leftNode;
if (!failed && rtState.crd)
rtState.zkClient.deleteIfExistsAsync(zkPaths.nodeStoppedFlag(leftNode));
PingFuture pingFut = pingFuts.get(leftNode.order());
if (pingFut != null)
pingFut.onDone(false);
final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
lsnr.onDiscovery(
new DiscoveryNotification(
failed ? EVT_NODE_FAILED : EVT_NODE_LEFT,
topVer,
leftNode,
topSnapshot,
Collections.emptyMap(),
null,
null
)
).get();
if (failed)
stats.onNodeFailed();
else
stats.onNodeLeft();
}
/**
* @param msg Message to log.
* @param clientReconnect {@code True} if allow client reconnect.
* @return Exception to be thrown.
*/
private ZookeeperClientFailedException localNodeFail(String msg, boolean clientReconnect) {
U.warn(log, msg);
// if (locNode.isClient() && rtState.zkClient.connected()) {
// String path = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
//
// String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), ZkIgnitePaths.aliveNodePrefixId(path));
//
// try {
// if (rtState.zkClient.existsNoRetry(joinDataPath))
// rtState.zkClient.deleteIfExistsNoRetry(joinDataPath, -1);
// }
// catch (Exception e) {
// if (log.isDebugEnabled())
// log.debug("Failed to clean local node's join data on stop: " + e);
// }
// }
if (rtState.zkClient.connected())
rtState.zkClient.close();
if (clientReconnect && clientReconnectEnabled) {
assert locNode.isClient() : locNode;
boolean reconnect = false;
synchronized (stateMux) {
if (connState == ConnectionState.STARTED) {
reconnect = true;
connState = ConnectionState.DISCONNECTED;
rtState.onCloseStart(disconnectError());
}
}
if (reconnect) {
UUID newId = UUID.randomUUID();
U.quietAndWarn(log, "Client node will try to reconnect with new id [" +
"newId=" + newId +
", prevId=" + locNode.id() +
", locNode=" + locNode + ']');
runInWorkerThread(new ReconnectClosure(newId));
}
}
else {
rtState.errForClose = new IgniteCheckedException(msg);
notifySegmented();
}
// Stop any further processing.
return new ZookeeperClientFailedException(msg);
}
/**
* @param ctx Context for logging.
* @throws Exception If failed.
*/
private void handleProcessedEvents(String ctx) throws Exception {
Iterator<ZkDiscoveryEventData> it = rtState.evtsData.evts.values().iterator();
List<ZkDiscoveryCustomEventData> newEvts = null;
ZkDiscoveryEventData prevEvtData = null;
while (it.hasNext()) {
ZkDiscoveryEventData evtData = it.next();
if (evtData.allAcksReceived()) {
if (prevEvtData != null) {
if (log.isInfoEnabled()) {
log.info("Previous event is not acked [" +
"evtId=" + evtData.eventId() +
", prevEvtData=" + prevEvtData +
", remaining=" + prevEvtData.remainingAcks() + ']');
}
}
prevEvtData = null;
switch (evtData.eventType()) {
case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
handleProcessedJoinEventAsync((ZkDiscoveryNodeJoinEventData)evtData);
break;
}
case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
DiscoverySpiCustomMessage ack = handleProcessedCustomEvent(ctx,
(ZkDiscoveryCustomEventData)evtData);
if (ack != null) {
ZkDiscoveryCustomEventData ackEvtData = createAckEvent(
ack,
(ZkDiscoveryCustomEventData)evtData);
if (newEvts == null)
newEvts = new ArrayList<>();
newEvts.add(ackEvtData);
}
break;
}
case ZkDiscoveryEventData.ZK_EVT_NODE_LEFT: {
if (log.isDebugEnabled())
log.debug("All nodes processed node left [evtData=" + evtData + ']');
break;
}
}
it.remove();
}
else
prevEvtData = evtData;
}
if (newEvts != null) {
Collection<ZookeeperClusterNode> nodes = rtState.top.nodesByOrder.values();
for (int i = 0; i < newEvts.size(); i++)
rtState.evtsData.addEvent(nodes, newEvts.get(i));
saveAndProcessNewEvents();
}
}
/**
* @param ack Ack message.
* @param origEvt Original custom event.
* @return Event data.
* @throws Exception If failed.
*/
private ZkDiscoveryCustomEventData createAckEvent(
DiscoverySpiCustomMessage ack,
ZkDiscoveryCustomEventData origEvt) throws Exception {
assert ack != null;
rtState.evtsData.evtIdGen++;
long evtId = rtState.evtsData.evtIdGen;
byte[] ackBytes = marshalZip(ack);
String path = zkPaths.ackEventDataPath(origEvt.eventId());
if (log.isDebugEnabled())
log.debug("Create ack event: " + path);
// TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8194
rtState.zkClient.createIfNeeded(
path,
ackBytes,
CreateMode.PERSISTENT);
ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData(
evtId,
origEvt.eventId(),
rtState.evtsData.topVer, // Use actual topology version because topology version must be growing.
locNode.id(),
null,
null);
ackEvtData.resolvedMsg = ack;
if (log.isDebugEnabled()) {
log.debug("Generated CUSTOM event ack [origEvtId=" + origEvt.eventId() +
", evt=" + ackEvtData +
", evtSize=" + ackBytes.length +
", msg=" + ack + ']');
}
return ackEvtData;
}
/**
* @param failedNodes Failed nodes.
* @throws Exception If failed.
*/
private void handleProcessedEventsOnNodesFail(List<ZookeeperClusterNode> failedNodes) throws Exception {
boolean processed = false;
for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = rtState.evtsData.evts.entrySet().iterator(); it.hasNext();) {
Map.Entry<Long, ZkDiscoveryEventData> e = it.next();
ZkDiscoveryEventData evtData = e.getValue();
for (int i = 0; i < failedNodes.size(); i++) {
ZookeeperClusterNode failedNode = failedNodes.get(i);
if (evtData.onNodeFail(failedNode))
processed = true;
}
}
if (processed)
handleProcessedEvents("fail-" + U.nodeIds(failedNodes));
}
/**
* @param evtData Event data.
* @throws Exception If failed.
*/
private void handleProcessedJoinEventAsync(ZkDiscoveryNodeJoinEventData evtData) throws Exception {
if (log.isDebugEnabled())
log.debug("All nodes processed node join [evtData=" + evtData + ']');
for (int i = 0; i < evtData.joinedNodes.size(); i++) {
ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i);
deleteJoiningNodeData(joinedEvtData.nodeId, joinedEvtData.joinDataPrefixId, joinedEvtData.joinDataPartCnt);
if (joinedEvtData.secSubjPartCnt > 0) {
deleteMultiplePartsAsync(rtState.zkClient,
zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
joinedEvtData.secSubjPartCnt);
}
}
deleteDataForJoinedAsync(evtData);
}
/**
* @param nodeId Node ID.
* @param joinDataPrefixId Path prefix.
* @param partCnt Parts count.
*/
private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) {
String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, joinDataPrefixId);
if (log.isDebugEnabled())
log.debug("Delete joining node data [path=" + evtDataPath + ']');
rtState.zkClient.deleteIfExistsAsync(evtDataPath);
if (partCnt > 1)
deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt);
}
/**
* @param evtData Event data.
*/
private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData evtData) {
String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId());
if (log.isDebugEnabled())
log.debug("Delete data for joined node [path=" + dataForJoinedPath + ']');
deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath, evtData.dataForJoinedPartCnt);
}
/**
* @param ctx Context for log.
* @param evtData Event data.
* @return Ack message.
* @throws Exception If failed.
*/
@Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
throws Exception {
if (log.isDebugEnabled())
log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
if (!evtData.ackEvent()) {
if (evtData.evtPath != null)
deleteCustomEventDataAsync(rtState.zkClient, evtData.evtPath);
else {
if (evtData.resolvedMsg instanceof ZkCommunicationErrorResolveFinishMessage) {
UUID futId = ((ZkCommunicationErrorResolveFinishMessage)evtData.resolvedMsg).futId;
ZkDistributedCollectDataFuture.deleteFutureData(rtState.zkClient, zkPaths, futId, log);
}
}
assert evtData.resolvedMsg != null || locNode.order() > evtData.topologyVersion() : evtData;
if (evtData.resolvedMsg != null)
return evtData.resolvedMsg.ackMessage();
}
else {
String path = zkPaths.ackEventDataPath(evtData.origEvtId);
if (log.isDebugEnabled())
log.debug("Delete path: " + path);
rtState.zkClient.deleteIfExistsAsync(path);
}
return null;
}
/**
* @param c Closure to run.
*/
void runInWorkerThread(Runnable c) {
IgniteThreadPoolExecutor pool;
synchronized (stateMux) {
if (connState == ConnectionState.STOPPED) {
LT.warn(log, "Do not run closure, node is stopped.");
return;
}
if (utilityPool == null) {
utilityPool = new IgniteThreadPoolExecutor("zk-discovery-pool",
igniteInstanceName,
0,
1,
2000,
new LinkedBlockingQueue<Runnable>());
}
pool = utilityPool;
}
pool.submit(c);
}
/**
*
*/
public void stop() {
stop0(null);
}
/**
* @param e Error.
*/
private void stop0(Throwable e) {
if (!stop.compareAndSet(false, true))
return;
ZkRuntimeState rtState = this.rtState;
if (rtState.zkClient != null && rtState.locNodeZkPath != null && rtState.zkClient.connected()) {
try {
if (e == null && rtState.joined) {
rtState.zkClient.createIfNeededNoRetry(
zkPaths.nodeStoppedFlag(locNode),
null,
PERSISTENT
);
}
rtState.zkClient.deleteIfExistsNoRetry(rtState.locNodeZkPath, -1);
}
catch (Exception err) {
if (log.isDebugEnabled())
log.debug("Failed to delete local node's znode on stop: " + err);
}
}
IgniteCheckedException err = new IgniteCheckedException("Node stopped.");
synchronized (stateMux) {
connState = ConnectionState.STOPPED;
rtState.onCloseStart(err);
}
IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, log);
busyLock.block();
busyLock.unblock();
joinFut.onDone(e);
ZookeeperClient zkClient = rtState.zkClient;
if (zkClient != null)
zkClient.close();
finishFutures(err);
}
/**
* @param err Error.
*/
private void finishFutures(IgniteCheckedException err) {
ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
if (commErrFut != null)
commErrFut.onError(err);
for (PingFuture fut : pingFuts.values())
fut.onDone(err);
}
/**
* @param busyLock Busy lock.
* @param err Error.
*/
void onFatalError(GridSpinBusyLock busyLock, Throwable err) {
busyLock.leaveBusy();
if (err instanceof ZookeeperClientFailedException)
return; // Processed by ZookeeperClient listener.
Ignite ignite = spi.ignite();
if (stopping() || ignite == null)
return;
U.error(log, "Fatal error in ZookeeperDiscovery. " +
"Stopping the node in order to prevent cluster wide instability.", err);
stop0(err);
new Thread(new Runnable() {
@Override public void run() {
try {
IgnitionEx.stop(igniteInstanceName, true, ShutdownPolicy.IMMEDIATE, true);
U.log(log, "Stopped the node successfully in response to fatal error in ZookeeperDiscoverySpi.");
}
catch (Throwable e) {
U.error(log, "Failed to stop the node successfully in response to fatal error in " +
"ZookeeperDiscoverySpi.", e);
}
}
}, "node-stop-thread").start();
if (err instanceof Error)
throw (Error)err;
}
/**
* @param zipBytes Zip-compressed bytes.
* @return Unmarshalled object.
* @throws IgniteCheckedException If failed.
*/
private <T> T unmarshalZip(byte[] zipBytes) throws Exception {
assert zipBytes != null && zipBytes.length > 0;
InflaterInputStream in = new InflaterInputStream(new ByteArrayInputStream(zipBytes));
return marsh.unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
}
/**
* @param obj Object.
* @return Bytes.
* @throws IgniteCheckedException If failed.
*/
byte[] marshalZip(Object obj) throws IgniteCheckedException {
assert obj != null;
return zip(U.marshal(marsh, obj));
}
/**
* @param bytes Bytes to compress.
* @return Zip-compressed bytes.
*/
private static byte[] zip(byte[] bytes) {
Deflater deflater = new Deflater();
deflater.setInput(bytes);
deflater.finish();
GridByteArrayOutputStream out = new GridByteArrayOutputStream(bytes.length);
final byte[] buf = new byte[bytes.length];
while (!deflater.finished()) {
int cnt = deflater.deflate(buf);
out.write(buf, 0, cnt);
}
return out.toByteArray();
}
/**
* @param zipBytes Zip-compressed bytes.
* @return Uncompressed bytes.
* @throws DataFormatException If compressed data format is invalid.
*/
public static byte[] unzip(byte[] zipBytes) throws DataFormatException {
Inflater inflater = new Inflater();
inflater.setInput(zipBytes);
GridByteArrayOutputStream out = new GridByteArrayOutputStream(zipBytes.length * 2);
final byte[] buf = new byte[zipBytes.length];
while (!inflater.finished()) {
int cnt = inflater.inflate(buf);
out.write(buf, 0, cnt);
}
return out.toByteArray();
}
/**
*
*/
private class ReconnectClosure implements Runnable {
/** */
private final UUID newId;
/**
* @param newId New ID.
*/
ReconnectClosure(UUID newId) {
assert newId != null;
this.newId = newId;
}
/** {@inheritDoc} */
@Override public void run() {
finishFutures(disconnectError());
busyLock.block();
busyLock.unblock();
doReconnect(newId);
}
}
/**
*
*/
private class ConnectionLossListener implements IgniteRunnable {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public void run() {
if (clientReconnectEnabled) {
synchronized (stateMux) {
if (connState == ConnectionState.STARTED) {
connState = ConnectionState.DISCONNECTED;
rtState.onCloseStart(disconnectError());
}
else
return;
}
UUID newId = UUID.randomUUID();
U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
"newId=" + newId +
", prevId=" + locNode.id() +
", locNode=" + locNode + ']');
runInWorkerThread(new ReconnectClosure(newId));
}
else {
U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
onSegmented(new IgniteSpiException("Zookeeper connection loss."));
}
}
}
/**
*
*/
private class ZkWatcher extends ZkAbstractWatcher implements ZkRuntimeState.ZkWatcher {
/**
* @param rtState Runtime state.
*/
ZkWatcher(ZkRuntimeState rtState) {
super(rtState, ZookeeperDiscoveryImpl.this);
}
/** {@inheritDoc} */
@Override public void process0(WatchedEvent evt) {
if (evt.getType() == Event.EventType.NodeDataChanged) {
if (evt.getPath().equals(zkPaths.evtsPath)) {
if (!rtState.crd)
rtState.zkClient.getDataAsync(evt.getPath(), this, this);
}
else
U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath());
}
else if (evt.getType() == Event.EventType.NodeChildrenChanged) {
if (evt.getPath().equals(zkPaths.aliveNodesDir))
rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
else if (evt.getPath().equals(zkPaths.customEvtsDir))
rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
else
U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath());
}
}
/** {@inheritDoc} */
@Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (!onProcessStart())
return;
try {
assert rc == 0 : KeeperException.Code.get(rc);
if (path.equals(zkPaths.aliveNodesDir))
generateTopologyEvents(children);
else if (path.equals(zkPaths.customEvtsDir))
generateCustomEvents(children);
else
U.warn(log, "Children callback for unexpected path: " + path);
onProcessEnd();
}
catch (Throwable e) {
onProcessError(e);
}
}
/** {@inheritDoc} */
@Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (!onProcessStart())
return;
try {
assert rc == 0 : KeeperException.Code.get(rc);
if (path.equals(zkPaths.evtsPath)) {
if (!rtState.crd)
processNewEvents(data);
}
else
U.warn(log, "Data callback for unknown path: " + path);
onProcessEnd();
}
catch (Throwable e) {
onProcessError(e);
}
}
}
/**
*
*/
private class AliveNodeDataWatcher extends ZkAbstractWatcher implements ZkRuntimeState.ZkAliveNodeDataWatcher {
/**
* @param rtState Runtime state.
*/
AliveNodeDataWatcher(ZkRuntimeState rtState) {
super(rtState, ZookeeperDiscoveryImpl.this);
}
/** {@inheritDoc} */
@Override public void process0(WatchedEvent evt) {
if (evt.getType() == Event.EventType.NodeDataChanged)
rtState.zkClient.getDataAsync(evt.getPath(), this, this);
}
/** {@inheritDoc} */
@Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (!onProcessStart())
return;
try {
assert rtState.crd;
processResult0(rc, path, data);
onProcessEnd();
}
catch (Throwable e) {
onProcessError(e);
}
}
/**
* @param rc Result code.
* @param path Path.
* @param data Data.
* @throws Exception If failed.
*/
private void processResult0(int rc, String path, byte[] data) throws Exception {
if (rc == KeeperException.Code.NONODE.intValue()) {
if (log.isDebugEnabled())
log.debug("Alive node callaback, no node: " + path);
return;
}
assert rc == 0 : KeeperException.Code.get(rc);
if (data.length > 0) {
ZkAliveNodeData nodeData = unmarshalZip(data);
Long nodeInternalId = ZkIgnitePaths.aliveInternalId(path);
Iterator<ZkDiscoveryEventData> it = rtState.evtsData.evts.values().iterator();
boolean processed = false;
while (it.hasNext()) {
ZkDiscoveryEventData evtData = it.next();
if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt))
processed = true;
}
if (processed)
handleProcessedEvents("ack-" + nodeInternalId);
}
}
}
/**
*
*/
private abstract class PreviousNodeWatcher extends ZkAbstractWatcher implements AsyncCallback.StatCallback {
/**
* @param rtState Runtime state.
*/
PreviousNodeWatcher(ZkRuntimeState rtState) {
super(rtState, ZookeeperDiscoveryImpl.this);
}
/** {@inheritDoc} */
@Override public void process0(WatchedEvent evt) {
if (evt.getType() == Event.EventType.NodeDeleted)
onPreviousNodeFail();
else {
if (evt.getType() != Event.EventType.None)
rtState.zkClient.existsAsync(evt.getPath(), this, this);
}
}
/** {@inheritDoc} */
@Override public void processResult(int rc, String path, Object ctx, Stat stat) {
if (!onProcessStart())
return;
try {
assert rc == 0 || rc == KeeperException.Code.NONODE.intValue() : KeeperException.Code.get(rc);
if (rc == KeeperException.Code.NONODE.intValue() || stat == null)
onPreviousNodeFail();
onProcessEnd();
}
catch (Throwable e) {
onProcessError(e);
}
}
/**
*
*/
abstract void onPreviousNodeFail();
}
/**
*
*/
private class ServerPreviousNodeWatcher extends PreviousNodeWatcher {
/**
* @param rtState Runtime state.
*/
ServerPreviousNodeWatcher(ZkRuntimeState rtState) {
super(rtState);
assert !locNode.isClient() : locNode;
}
/** {@inheritDoc} */
@Override void onPreviousNodeFail() {
if (log.isInfoEnabled())
log.info("Previous server node failed, check is node new coordinator [locId=" + locNode.id() + ']');
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState));
}
}
/**
*
*/
private class ClientPreviousNodeWatcher extends PreviousNodeWatcher {
/**
* @param rtState Runtime state.
*/
ClientPreviousNodeWatcher(ZkRuntimeState rtState) {
super(rtState);
assert locNode.isClient() : locNode;
}
/** {@inheritDoc} */
@Override void onPreviousNodeFail() {
if (log.isInfoEnabled())
log.info("Watched node failed, check if there are alive servers [locId=" + locNode.id() + ']');
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckClientsStatusCallback(rtState));
}
}
/**
* Watcher for the local node. The local node can be deleted in case of cluster restarts.
* See {@link #cleanupPreviousClusterData}.
*/
private class ClientLocalNodeWatcher extends PreviousNodeWatcher {
/** */
final CheckJoinErrorWatcher joinErrorWatcher;
/**
* @param rtState Runtime state.
*/
ClientLocalNodeWatcher(ZkRuntimeState rtState, CheckJoinErrorWatcher joinErrorWatcher) {
super(rtState);
assert locNode.isClient() : locNode;
this.joinErrorWatcher = joinErrorWatcher;
}
/** {@inheritDoc} */
@Override void onPreviousNodeFail() {
// Check that there are no errors in join data.
joinErrorWatcher.checkJoinError();
if (rtState.errForClose != null || rtState.joined)
return;
synchronized (stateMux) {
if (connState != ConnectionState.STARTED)
return;
}
if (log.isInfoEnabled())
log.info("Watched local node failed [locId=" + locNode.id() + ']');
localNodeFail("Local node was forced to stop.", true);
}
}
/**
*
*/
class CheckCoordinatorCallback extends ZkAbstractChildrenCallback {
/**
* @param rtState Runtime state.
*/
CheckCoordinatorCallback(ZkRuntimeState rtState) {
super(rtState, ZookeeperDiscoveryImpl.this);
}
/** {@inheritDoc} */
@Override public void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat)
throws Exception {
assert rc == 0 : KeeperException.Code.get(rc);
checkIsCoordinator(children);
}
}
/**
*
*/
class CheckClientsStatusCallback extends ZkAbstractChildrenCallback {
/**
* @param rtState Runtime state.
*/
CheckClientsStatusCallback(ZkRuntimeState rtState) {
super(rtState, ZookeeperDiscoveryImpl.this);
}
/** {@inheritDoc} */
@Override void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat)
throws Exception {
assert rc == 0 : KeeperException.Code.get(rc);
checkClientsStatus(children);
}
}
/**
*
*/
private class PingFuture extends GridFutureAdapter<Boolean> implements IgniteSpiTimeoutObject {
/** */
private final ZookeeperClusterNode node;
/** */
private final long endTime;
/** */
private final IgniteUuid id;
/** */
private final ZkRuntimeState rtState;
/**
* @param rtState Runtime state.
* @param node Node.
*/
PingFuture(ZkRuntimeState rtState, ZookeeperClusterNode node) {
this.rtState = rtState;
this.node = node;
id = IgniteUuid.fromUuid(node.id());
endTime = System.currentTimeMillis() + node.sessionTimeout() + 1000;
}
/** {@inheritDoc} */
@Override public IgniteUuid id() {
return id;
}
/** {@inheritDoc} */
@Override public long endTime() {
return endTime;
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (checkNodeAndState()) {
runInWorkerThread(new ZkRunnable(rtState, ZookeeperDiscoveryImpl.this) {
@Override protected void run0() throws Exception {
if (checkNodeAndState()) {
try {
for (String path : rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
if (node.internalId() == ZkIgnitePaths.aliveInternalId(path)) {
onDone(true);
return;
}
}
onDone(false);
}
catch (Exception e) {
onDone(e);
throw e;
}
}
}
@Override void onStartFailed() {
onDone(rtState.errForClose);
}
});
}
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
pingFuts.remove(node.order(), this);
return true;
}
return false;
}
/**
* @return {@code False} if future was completed.
*/
boolean checkNodeAndState() {
if (isDone())
return false;
Exception err = rtState.errForClose;
if (err != null) {
onDone(err);
return false;
}
ConnectionState connState = ZookeeperDiscoveryImpl.this.connState;
if (connState == ConnectionState.DISCONNECTED) {
onDone(new IgniteClientDisconnectedException(null, "Client is disconnected."));
return false;
}
else if (connState == ConnectionState.STOPPED) {
onDone(new IgniteException("Node stopped."));
return false;
}
if (node(node.id()) == null) {
onDone(false);
return false;
}
return true;
}
}
/**
*
*/
enum ConnectionState {
/** */
STARTED,
/** */
DISCONNECTED,
/** */
STOPPED
}
/** */
public UUID getCoordinator() {
ZkRuntimeState rtState0 = rtState;
if (rtState0 == null)
return null;
return rtState0.top.nodesByOrder.values().stream()
.filter(n -> !n.isClient() && !n.isDaemon())
.map(ZookeeperClusterNode::id)
.findFirst()
.orElse(null);
}
/** */
public String getSpiState() {
return connState.toString();
}
/** */
public String getZkSessionId() {
ZkRuntimeState rtState0 = rtState;
if (rtState0 != null && rtState0.zkClient != null)
return Long.toHexString(rtState.zkClient.zk().getSessionId());
else
return null;
}
}