blob: df45930a7c75415b1e2a47dbbecc3c23422ce7c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_DISCOVERY_METRICS_QNT_WARN;
/**
*
*/
abstract class TcpDiscoveryImpl {
/** Response OK. */
protected static final int RES_OK = 1;
/** Response CONTINUE JOIN. */
protected static final int RES_CONTINUE_JOIN = 100;
/** Response WAIT. */
protected static final int RES_WAIT = 200;
/** Response join impossible. */
protected static final int RES_JOIN_IMPOSSIBLE = 255;
/** How often the warning message should occur in logs to prevent log spam. */
public static final long LOG_WARN_MSG_TIMEOUT = 60 * 60 * 1000L;
/** Debug log date formatter. */
private static final DateTimeFormatter DEBUG_FORMATTER =
DateTimeFormatter.ofPattern("[HH:mm:ss,SSS]").withZone(ZoneId.systemDefault());
/** */
protected final TcpDiscoverySpi spi;
/** */
protected final IgniteLogger log;
/** */
protected volatile TcpDiscoveryNode locNode;
/** Debug mode. */
protected boolean debugMode;
/** Debug messages history. */
private int debugMsgHist = 512;
/** Received messages. */
protected ConcurrentLinkedDeque<String> debugLogQ;
/** Logging a warning message when metrics quantity exceeded a specified number. */
protected int METRICS_QNT_WARN = getInteger(IGNITE_DISCOVERY_METRICS_QNT_WARN, DFLT_DISCOVERY_METRICS_QNT_WARN);
/** */
protected long endTimeMetricsSizeProcessWait = System.currentTimeMillis();
/** */
protected final ServerImpl.DebugLogger debugLog = new DebugLogger() {
/** {@inheritDoc} */
@Override public boolean isDebugEnabled() {
return log.isDebugEnabled();
}
/** {@inheritDoc} */
@Override public void debug(String msg) {
log.debug(msg);
}
};
/** */
protected final ServerImpl.DebugLogger traceLog = new DebugLogger() {
/** {@inheritDoc} */
@Override public boolean isDebugEnabled() {
return log.isTraceEnabled();
}
/** {@inheritDoc} */
@Override public void debug(String msg) {
log.trace(msg);
}
};
/** Tracing. */
protected Tracing tracing;
/**
* Upcasts collection type.
*
* @param c Initial collection.
* @return Resulting collection.
*/
protected static <T extends R, R> Collection<R> upcast(Collection<T> c) {
A.notNull(c, "c");
return (Collection<R>)c;
}
/**
* @param spi Adapter.
*/
TcpDiscoveryImpl(TcpDiscoverySpi spi) {
this.spi = spi;
log = spi.log;
if (spi.ignite() instanceof IgniteEx)
tracing = ((IgniteEx) spi.ignite()).context().tracing();
else
tracing = new NoopTracing();
}
/**
* This method is intended for troubleshooting purposes only.
*
* @param debugMode {code True} to start SPI in debug mode.
*/
public void setDebugMode(boolean debugMode) {
this.debugMode = debugMode;
}
/**
* This method is intended for troubleshooting purposes only.
*
* @param debugMsgHist Message history log size.
*/
public void setDebugMessageHistory(int debugMsgHist) {
this.debugMsgHist = debugMsgHist;
}
/**
* @param discoMsg Discovery message.
* @param msg Message.
*/
protected void debugLog(@Nullable TcpDiscoveryAbstractMessage discoMsg, String msg) {
assert debugMode;
String msg0 = DEBUG_FORMATTER.format(Instant.now()) +
'[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
"-" + locNode.internalOrder() + "] " +
msg;
debugLogQ.add(msg0);
int delta = debugLogQ.size() - debugMsgHist;
for (int i = 0; i < delta && debugLogQ.size() > debugMsgHist; i++)
debugLogQ.poll();
}
/**
* @return Local node ID.
*/
public UUID getLocalNodeId() {
return spi.locNode.id();
}
/**
* @return Configured node ID (actual node ID can be different if client reconnects).
*/
public UUID getConfiguredNodeId() {
return spi.cfgNodeId;
}
/**
* @param msg Error message.
* @param e Exception.
*/
protected void onException(String msg, Exception e) {
spi.getExceptionRegistry().onException(msg, e);
}
/**
* Called when a local node either received from or sent to a remote node a message.
*/
protected void onMessageExchanged() {
// No-op
}
/**
* @param log Logger.
*/
public abstract void dumpDebugInfo(IgniteLogger log);
/**
* @return SPI state string.
*/
public abstract String getSpiState();
/**
* @return Message worker queue current size.
*/
public abstract int getMessageWorkerQueueSize();
/**
* @return Coordinator ID.
*/
public abstract UUID getCoordinator();
/**
* @return Collection of remote nodes.
*/
public abstract Collection<ClusterNode> getRemoteNodes();
/**
* @param feature Feature to check.
* @return {@code true} if all nodes support the given feature, {@code false} otherwise.
*/
public abstract boolean allNodesSupport(IgniteFeatures feature);
/**
* @param nodeId Node id.
* @return Node with given ID or {@code null} if node is not found.
*/
@Nullable public abstract ClusterNode getNode(UUID nodeId);
/**
* @param nodeId Node id.
* @return {@code true} if node alive, {@code false} otherwise.
*/
public abstract boolean pingNode(UUID nodeId);
/**
* Tells discovery SPI to disconnect from topology.
*
* @throws IgniteSpiException If failed.
*/
public abstract void disconnect() throws IgniteSpiException;
/**
* @param msg Message.
* @throws IgniteException If failed.
*/
public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
/**
* @param nodeId Node id.
* @param warning Warning message to be shown on all nodes.
*/
public abstract void failNode(UUID nodeId, @Nullable String warning);
/**
* Dumps ring structure to logger.
*
* @param log Logger.
*/
public abstract void dumpRingStructure(IgniteLogger log);
/**
* Get current topology version.
*
* @return Current topology version.
*/
public abstract long getCurrentTopologyVersion();
/**
* @param igniteInstanceName Ignite instance name.
* @throws IgniteSpiException If failed.
*/
public abstract void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException;
/**
* Will start TCP server if applicable and not started yet.
*
* @return Port this instance bound to.
* @throws IgniteSpiException If failed.
*/
public int boundPort() throws IgniteSpiException {
return 0;
}
/**
* @return connection check interval.
*/
public long connectionCheckInterval() {
return 0;
}
/**
* @throws IgniteSpiException If failed.
*/
public abstract void spiStop() throws IgniteSpiException;
/**
* @param spiCtx Spi context.
* @throws IgniteSpiException If failed.
*/
public abstract void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException;
/**
* @param t Thread.
* @return Status as string.
*/
protected static String threadStatus(Thread t) {
if (t == null)
return "N/A";
return t.isAlive() ? "alive" : "dead";
}
/**
* Leave cluster and try to join again.
*
* @throws IgniteSpiException If failed.
*/
public abstract void reconnect() throws IgniteSpiException;
/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
* Simulates this node failure by stopping service threads. So, node will become
* unresponsive.
* <p>
* This method is intended for test purposes only.
*/
abstract void simulateNodeFailure();
/**
* FOR TEST PURPOSE ONLY!
*/
public abstract void brakeConnection();
/**
* @param maxHops Maximum hops for {@link TcpDiscoveryRingLatencyCheckMessage}.
*/
public abstract void checkRingLatency(int maxHops);
/**
* <strong>FOR TEST ONLY!!!</strong>
*
* @return Worker threads.
*/
protected abstract Collection<IgniteSpiThread> threads();
/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
*/
public abstract void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos);
/**
* @throws IgniteSpiException If failed.
*/
protected final void registerLocalNodeAddress() throws IgniteSpiException {
long spiJoinTimeout = spi.getJoinTimeout();
// Make sure address registration succeeded.
// ... but limit it if join timeout is configured.
long startNanos = spiJoinTimeout > 0 ? System.nanoTime() : 0;
while (true) {
try {
spi.ipFinder.initializeLocalAddresses(
U.resolveAddresses(spi.getAddressResolver(), locNode.socketAddresses()));
// Success.
break;
}
catch (IllegalStateException e) {
throw new IgniteSpiException("Failed to register local node address with IP finder: " +
locNode.socketAddresses(), e);
}
catch (IgniteSpiException e) {
LT.error(log, e, "Failed to register local node address in IP finder on start " +
"(retrying every " + spi.getReconnectDelay() + " ms; " +
"change 'reconnectDelay' to configure the frequency of retries).");
};
if (spiJoinTimeout > 0 && U.millisSinceNanos(startNanos) > spiJoinTimeout)
throw new IgniteSpiException(
"Failed to register local addresses with IP finder within join timeout " +
"(make sure IP finder configuration is correct, and operating system firewalls are disabled " +
"on all host machines, or consider increasing 'joinTimeout' configuration property) " +
"[joinTimeout=" + spiJoinTimeout + ']');
try {
U.sleep(spi.getReconnectDelay());
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
}
}
/**
* @param ackTimeout Acknowledgement timeout.
* @return {@code True} if acknowledgement timeout is less or equal to
* maximum acknowledgement timeout, {@code false} otherwise.
*/
protected boolean checkAckTimeout(long ackTimeout) {
if (ackTimeout > spi.getMaxAckTimeout()) {
LT.warn(log, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
"(consider increasing 'maxAckTimeout' configuration property) " +
"[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']');
return false;
}
return true;
}
/** */
public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();
TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();
Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.emptyMap();
if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
&& cacheMetrics.size() >= METRICS_QNT_WARN)
{
log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" +
"To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");
endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT;
}
updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);
for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
}
}
/**
* @param addrs Addresses.
*/
protected static List<String> toOrderedList(Collection<InetSocketAddress> addrs) {
List<String> res = new ArrayList<>(addrs.size());
for (InetSocketAddress addr : addrs)
res.add(addr.toString());
Collections.sort(res);
return res;
}
/**
* @param msg Message.
* @return Message logger.
*/
protected final DebugLogger messageLogger(TcpDiscoveryAbstractMessage msg) {
return msg.traceLogLevel() ? traceLog : debugLog;
}
/**
*
*/
interface DebugLogger {
/**
* @return {@code True} if debug logging is enabled.
*/
boolean isDebugEnabled();
/**
* @param msg Message to log.
*/
void debug(String msg);
}
}