blob: 5f2c66ce7fe59df5a87abaf573ba2be3534c5133 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cluster;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDiagnosticInfo;
import org.apache.ignite.internal.IgniteDiagnosticMessage;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.IgniteClusterImpl;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.GridTimerTask;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC;
import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC;
import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
/**
*
*/
public class ClusterProcessor extends GridProcessorAdapter {
/** */
private static final String ATTR_UPDATE_NOTIFIER_STATUS = "UPDATE_NOTIFIER_STATUS";
/** Periodic version check delay. */
private static final long PERIODIC_VER_CHECK_DELAY = 1000 * 60 * 60; // Every hour.
/** Periodic version check delay. */
private static final long PERIODIC_VER_CHECK_CONN_TIMEOUT = 10 * 1000; // 10 seconds.
/** */
private IgniteClusterImpl cluster;
/** */
private final AtomicBoolean notifyEnabled = new AtomicBoolean();
/** */
@GridToStringExclude
private Timer updateNtfTimer;
/** Version checker. */
@GridToStringExclude
private GridUpdateNotifier verChecker;
/** */
private final AtomicReference<ConcurrentHashMap<Long, InternalDiagnosticFuture>> diagnosticFutMap =
new AtomicReference<>();
/** */
private final AtomicLong diagFutId = new AtomicLong();
/**
* @param ctx Kernal context.
*/
public ClusterProcessor(GridKernalContext ctx) {
super(ctx);
notifyEnabled.set(IgniteSystemProperties.getBoolean(IGNITE_UPDATE_NOTIFIER, true));
cluster = new IgniteClusterImpl(ctx);
}
/**
* @return Diagnostic flag.
*/
public boolean diagnosticEnabled() {
return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true);
}
/** */
private final JdkMarshaller marsh = new JdkMarshaller();
/**
* @throws IgniteCheckedException If failed.
*/
public void initDiagnosticListeners() throws IgniteCheckedException {
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent;
assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
UUID nodeId = discoEvt.eventNode().id();
ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get();
if (futs != null) {
for (InternalDiagnosticFuture fut : futs.values()) {
if (fut.nodeId.equals(nodeId))
fut.onDone(new IgniteDiagnosticInfo("Target node failed: " + nodeId));
}
}
}
},
EVT_NODE_FAILED, EVT_NODE_LEFT);
ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgniteDiagnosticMessage) {
IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
if (msg0.request()) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
if (diagnosticLog.isDebugEnabled()) {
diagnosticLog.debug("Skip diagnostic request, sender node left " +
"[node=" + nodeId + ", msg=" + msg + ']');
}
return;
}
byte[] diagRes;
IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c;
try {
c = msg0.unmarshal(marsh);
diagRes = marsh.marshal(c.apply(ctx));
}
catch (Exception e) {
U.error(diagnosticLog, "Failed to run diagnostic closure: " + e, e);
try {
IgniteDiagnosticInfo errInfo =
new IgniteDiagnosticInfo("Failed to run diagnostic closure: " + e);
diagRes = marsh.marshal(errInfo);
}
catch (Exception e0) {
U.error(diagnosticLog, "Failed to marshal diagnostic closure result: " + e, e);
diagRes = null;
}
}
IgniteDiagnosticMessage res = IgniteDiagnosticMessage.createResponse(diagRes, msg0.futureId());
try {
ctx.io().sendToGridTopic(node, TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
if (diagnosticLog.isDebugEnabled()) {
diagnosticLog.debug("Failed to send diagnostic response, node left " +
"[node=" + nodeId + ", msg=" + msg + ']');
}
}
catch (IgniteCheckedException e) {
U.error(diagnosticLog, "Failed to send diagnostic response [msg=" + msg0 + "]", e);
}
}
else {
InternalDiagnosticFuture fut = diagnosticFuturesMap().get(msg0.futureId());
if (fut != null) {
IgniteDiagnosticInfo res;
try {
res = msg0.unmarshal(marsh);
if (res == null)
res = new IgniteDiagnosticInfo("Remote node failed to marshal response.");
}
catch (Exception e) {
U.error(diagnosticLog, "Failed to unmarshal diagnostic response: " + e, e);
res = new IgniteDiagnosticInfo("Failed to unmarshal diagnostic response: " + e);
}
fut.onResponse(res);
}
else
U.warn(diagnosticLog, "Failed to find diagnostic message future [msg=" + msg0 + ']');
}
}
else
U.warn(diagnosticLog, "Received unexpected message: " + msg);
}
});
}
/**
* @return Logger for diagnostic category.
*/
public IgniteLogger diagnosticLog() {
return diagnosticLog;
}
/**
* @return Cluster.
*/
public IgniteClusterImpl get() {
return cluster;
}
/**
* @return Client reconnect future.
*/
public IgniteFuture<?> clientReconnectFuture() {
IgniteFuture<?> fut = cluster.clientReconnectFuture();
return fut != null ? fut : new IgniteFinishedFutureImpl<>();
}
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return CLUSTER_PROC;
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData());
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData());
}
/**
* @return Discovery data.
*/
private Serializable getDiscoveryData() {
HashMap<String, Object> map = new HashMap<>();
map.put(ATTR_UPDATE_NOTIFIER_STATUS, notifyEnabled.get());
return map;
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
if (nodeSpecData != null) {
Boolean lstFlag = findLastFlag(nodeSpecData.values());
if (lstFlag != null)
notifyEnabled.set(lstFlag);
}
}
/**
* @param vals collection to seek through.
*/
@SuppressWarnings("unchecked")
private Boolean findLastFlag(Collection<Serializable> vals) {
Boolean flag = null;
for (Serializable ser : vals) {
if (ser != null) {
Map<String, Object> map = (Map<String, Object>)ser;
if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS);
}
}
return flag;
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
if (notifyEnabled.get()) {
try {
verChecker = new GridUpdateNotifier(ctx.igniteInstanceName(), VER_STR, false);
updateNtfTimer = new Timer("ignite-update-notifier-timer", true);
// Setup periodic version check.
updateNtfTimer.scheduleAtFixedRate(
new UpdateNotifierTimerTask((IgniteKernal)ctx.grid(), verChecker, notifyEnabled),
0, PERIODIC_VER_CHECK_DELAY);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to create GridUpdateNotifier: " + e);
}
}
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
// Cancel update notification timer.
if (updateNtfTimer != null)
updateNtfTimer.cancel();
if (verChecker != null)
verChecker.stop();
// Io manager can be null, if invoke stop before create io manager, for example
// exception on start.
if (ctx.io() != null)
ctx.io().removeMessageListener(TOPIC_INTERNAL_DIAGNOSTIC);
}
/**
* Disables update notifier.
*/
public void disableUpdateNotifier() {
notifyEnabled.set(false);
}
/**
* @return Update notifier status.
*/
public boolean updateNotifierEnabled() {
return notifyEnabled.get();
}
/**
* @return Latest version string.
*/
public String latestVersion() {
return verChecker != null ? verChecker.latestVersion() : null;
}
/**
* Sends diagnostic message closure to remote node. When response received dumps
* remote message and local communication info about connection(s) with remote node.
*
* @param nodeId Target node ID.
* @param c Closure to send.
* @param baseMsg Local message to log.
* @return Message future.
*/
public IgniteInternalFuture<String> requestDiagnosticInfo(final UUID nodeId,
IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c,
final String baseMsg) {
final GridFutureAdapter<String> infoFut = new GridFutureAdapter<>();
final IgniteInternalFuture<IgniteDiagnosticInfo> rmtFut = sendDiagnosticMessage(nodeId, c);
rmtFut.listen(new CI1<IgniteInternalFuture<IgniteDiagnosticInfo>>() {
@Override public void apply(IgniteInternalFuture<IgniteDiagnosticInfo> fut) {
String rmtMsg;
try {
rmtMsg = fut.get().message();
}
catch (Exception e) {
rmtMsg = "Diagnostic processing error: " + e;
}
final String rmtMsg0 = rmtMsg;
IgniteInternalFuture<String> locFut = IgniteDiagnosticMessage.dumpCommunicationInfo(ctx, nodeId);
locFut.listen(new CI1<IgniteInternalFuture<String>>() {
@Override public void apply(IgniteInternalFuture<String> locFut) {
String locMsg;
try {
locMsg = locFut.get();
}
catch (Exception e) {
locMsg = "Failed to get info for local node: " + e;
}
String msg = baseMsg + U.nl() +
"Remote node information:" + U.nl() + rmtMsg0 +
U.nl() + "Local communication statistics:" + U.nl() +
locMsg;
infoFut.onDone(msg);
}
});
}
});
return infoFut;
}
/**
* @param nodeId Target node ID.
* @param c Message closure.
* @return Message future.
*/
private IgniteInternalFuture<IgniteDiagnosticInfo> sendDiagnosticMessage(UUID nodeId,
IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c) {
try {
IgniteDiagnosticMessage msg = IgniteDiagnosticMessage.createRequest(marsh,
c,
diagFutId.getAndIncrement());
InternalDiagnosticFuture fut = new InternalDiagnosticFuture(nodeId, msg.futureId());
diagnosticFuturesMap().put(msg.futureId(), fut);
ctx.io().sendToGridTopic(nodeId, TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL);
return fut;
}
catch (Exception e) {
U.error(diagnosticLog, "Failed to send diagnostic message: " + e);
return new GridFinishedFuture<>(new IgniteDiagnosticInfo("Failed to send diagnostic message: " + e));
}
}
/**
* @return Diagnostic messages futures map.
*/
private ConcurrentHashMap<Long, InternalDiagnosticFuture> diagnosticFuturesMap() {
ConcurrentHashMap<Long, InternalDiagnosticFuture> map = diagnosticFutMap.get();
if (map == null) {
if (!diagnosticFutMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
map = diagnosticFutMap.get();
}
return map;
}
/**
* Update notifier timer task.
*/
private static class UpdateNotifierTimerTask extends GridTimerTask {
/** Logger. */
private final IgniteLogger log;
/** Version checker. */
private final GridUpdateNotifier verChecker;
/** Whether this is the first run. */
private boolean first = true;
/** */
private final AtomicBoolean notifyEnabled;
/**
* Constructor.
*
* @param kernal Kernal.
* @param verChecker Version checker.
*/
private UpdateNotifierTimerTask(
IgniteKernal kernal,
GridUpdateNotifier verChecker,
AtomicBoolean notifyEnabled
) {
log = kernal.context().log(UpdateNotifierTimerTask.class);
this.verChecker = verChecker;
this.notifyEnabled = notifyEnabled;
}
/** {@inheritDoc} */
@Override public void safeRun() throws InterruptedException {
if (!notifyEnabled.get())
return;
verChecker.checkForNewVersion(log, first);
// Just wait for 10 secs.
Thread.sleep(PERIODIC_VER_CHECK_CONN_TIMEOUT);
// Just wait another 60 secs in order to get
// version info even on slow connection.
for (int i = 0; i < 60 && verChecker.latestVersion() == null; i++)
Thread.sleep(1000);
// Report status if one is available.
// No-op if status is NOT available.
verChecker.reportStatus(log);
if (first && verChecker.error() == null) {
first = false;
verChecker.reportOnlyNew(true);
}
}
}
/**
*
*/
class InternalDiagnosticFuture extends GridFutureAdapter<IgniteDiagnosticInfo> {
/** */
private final long id;
/** */
private final UUID nodeId;
/**
* @param nodeId Target node ID.
* @param id Future ID.
*/
InternalDiagnosticFuture(UUID nodeId, long id) {
this.nodeId = nodeId;
this.id = id;
}
/**
* @param res Response.
*/
public void onResponse(IgniteDiagnosticInfo res) {
onDone(res);
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable IgniteDiagnosticInfo res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
diagnosticFuturesMap().remove(id);
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(InternalDiagnosticFuture.class, this);
}
}
}