blob: 2a5f7cae1224badebe64057005da0091993132e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.managers.deployment;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteNotPeerDeployable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CLASSLOAD;
/**
* Communication helper class. Provides request and response sending methods.
* It uses communication manager as a way of sending and receiving requests.
*/
@GridToStringExclude
class GridDeploymentCommunication {
/** */
private final IgniteLogger log;
/** */
private final GridKernalContext ctx;
/** */
private final GridMessageListener peerLsnr;
/** */
private final ThreadLocal<Collection<UUID>> activeReqNodeIds = new ThreadLocal<>();
/** */
private final GridBusyLock busyLock = new GridBusyLock();
/** */
private final Marshaller marsh;
/**
* Creates new instance of deployment communication.
*
* @param ctx Kernal context.
* @param log Logger.
*/
GridDeploymentCommunication(final GridKernalContext ctx, IgniteLogger log) {
assert log != null;
this.ctx = ctx;
this.log = log.getLogger(getClass());
peerLsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
processDeploymentRequest(nodeId, msg);
}
};
marsh = ctx.config().getMarshaller();
}
/**
* Starts deployment communication.
*/
void start() {
ctx.io().addMessageListener(TOPIC_CLASSLOAD, peerLsnr);
if (log.isDebugEnabled())
log.debug("Started deployment communication.");
}
/**
* Stops deployment communication.
*/
void stop() {
if (log.isDebugEnabled())
log.debug("Stopping deployment communication.");
busyLock.block();
ctx.io().removeMessageListener(TOPIC_CLASSLOAD, peerLsnr);
}
/**
* @param nodeId Node ID.
* @param msg Request.
*/
private void processDeploymentRequest(UUID nodeId, Object msg) {
assert nodeId != null;
assert msg != null;
if (!busyLock.enterBusy()) {
if (log.isDebugEnabled())
log.debug("Ignoring deployment request since grid is stopping " +
"[nodeId=" + nodeId + ", msg=" + msg + ']');
return;
}
try {
GridDeploymentRequest req = (GridDeploymentRequest)msg;
if (req.isUndeploy())
processUndeployRequest(nodeId, req);
else {
assert activeReqNodeIds.get() == null;
Collection<UUID> nodeIds = req.nodeIds();
nodeIds = nodeIds == null ? new HashSet<UUID>() : new HashSet<>(nodeIds);
boolean b = nodeIds.add(nodeId);
assert b;
activeReqNodeIds.set(nodeIds);
try {
processResourceRequest(nodeId, req);
}
finally {
activeReqNodeIds.set(null);
}
}
}
finally {
busyLock.leaveBusy();
}
}
/**
* @param nodeId Sender node ID.
* @param req Undeploy request.
*/
private void processUndeployRequest(UUID nodeId, GridDeploymentRequest req) {
if (log.isDebugEnabled())
log.debug("Received undeploy request [nodeId=" + nodeId + ", req=" + req + ']');
ctx.deploy().undeployTask(nodeId, req.resourceName());
}
/**
* Handles classes/resources requests.
*
* @param nodeId Originating node id.
* @param req Request.
*/
private void processResourceRequest(UUID nodeId, GridDeploymentRequest req) {
if (log.isDebugEnabled())
log.debug("Received peer class/resource loading request [node=" + nodeId + ", req=" + req + ']');
if (req.responseTopic() == null) {
try {
req.responseTopic(U.unmarshal(marsh, req.responseTopicBytes(), U.resolveClassLoader(ctx.config())));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process deployment request (will ignore): " + req, e);
return;
}
}
GridDeploymentResponse res = new GridDeploymentResponse();
GridDeployment dep = ctx.deploy().getDeployment(req.classLoaderId());
// Null class loader means failure here.
if (dep != null) {
ClassLoader ldr = dep.classLoader();
// In case the class loader is ours - skip the check
// since it was already performed before (and was successful).
if (!(ldr instanceof GridDeploymentClassLoader)) {
// First check for @GridNotPeerDeployable annotation.
try {
String clsName = req.resourceName().replace('/', '.');
int idx = clsName.indexOf(".class");
if (idx >= 0)
clsName = clsName.substring(0, idx);
Class<?> cls = Class.forName(clsName, true, ldr);
if (U.getAnnotation(cls, IgniteNotPeerDeployable.class) != null) {
String errMsg = "Attempt to peer deploy class that has @GridNotPeerDeployable " +
"annotation: " + clsName;
U.error(log, errMsg);
res.errorMessage(errMsg);
res.success(false);
sendResponse(nodeId, req.responseTopic(), res);
return;
}
}
catch (ClassNotFoundException ignore) {
// Safely ignore it here - resource wasn't a class name.
}
}
InputStream in = ldr.getResourceAsStream(req.resourceName());
if (in == null) {
String errMsg = "Requested resource not found (ignoring locally): " + req.resourceName();
// Java requests the same class with BeanInfo suffix during
// introspection automatically. Usually nobody uses this kind
// of classes. Thus we print it out with DEBUG level.
// Also we print it with DEBUG level because of the
// frameworks which ask some classes just in case - for
// example to identify whether certain framework is available.
// Remote node will throw an exception if needs.
if (log.isDebugEnabled())
log.debug(errMsg);
res.success(false);
res.errorMessage(errMsg);
}
else {
try {
GridByteArrayList bytes = new GridByteArrayList(1024);
bytes.readAll(in);
res.success(true);
res.byteSource(bytes);
}
catch (IOException e) {
String errMsg = "Failed to read resource due to IO failure: " + req.resourceName();
U.error(log, errMsg, e);
res.errorMessage(errMsg);
res.success(false);
}
finally {
U.close(in, log);
}
}
}
else {
String errMsg = "Failed to find local deployment for peer request: " + req;
U.warn(log, errMsg);
res.success(false);
res.errorMessage(errMsg);
}
sendResponse(nodeId, req.responseTopic(), res);
}
/**
* @param nodeId Destination node ID.
* @param topic Response topic.
* @param res Response.
*/
private void sendResponse(UUID nodeId, Object topic, Message res) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null) {
try {
ctx.io().sendToCustomTopic(node, topic, res, GridIoPolicy.P2P_POOL);
if (log.isDebugEnabled())
log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send peer class loading response to node " +
"(node does not exist): " + nodeId);
}
catch (IgniteCheckedException e) {
if (ctx.discovery().pingNodeNoError(nodeId))
U.error(log, "Failed to send peer class loading response to node: " + nodeId, e);
else if (log.isDebugEnabled())
log.debug("Failed to send peer class loading response to node " +
"(node does not exist): " + nodeId);
}
}
else if (log.isDebugEnabled())
log.debug("Failed to send peer class loading response to node " +
"(node does not exist): " + nodeId);
}
/**
* @param rsrcName Resource to undeploy.
* @param rmtNodes Nodes to send request to.
* @throws IgniteCheckedException If request could not be sent.
*/
void sendUndeployRequest(String rsrcName, Collection<ClusterNode> rmtNodes) throws IgniteCheckedException {
assert !rmtNodes.contains(ctx.discovery().localNode());
Message req = new GridDeploymentRequest(null, null, rsrcName, true);
if (!rmtNodes.isEmpty()) {
ctx.io().sendToGridTopic(
rmtNodes,
TOPIC_CLASSLOAD,
req,
GridIoPolicy.P2P_POOL);
}
}
/**
* Sends request to the remote node and wait for response. If there is
* no response until threshold time, method returns null.
*
*
* @param rsrcName Resource name.
* @param clsLdrId Class loader ID.
* @param dstNode Remote node request should be sent to.
* @param threshold Time in milliseconds when request is decided to
* be obsolete.
* @return Either response value or {@code null} if timeout occurred.
* @throws IgniteCheckedException Thrown if there is no connection with remote node.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
GridDeploymentResponse sendResourceRequest(final String rsrcName, IgniteUuid clsLdrId,
final ClusterNode dstNode, long threshold) throws IgniteCheckedException {
assert rsrcName != null;
assert dstNode != null;
assert clsLdrId != null;
Collection<UUID> nodeIds = activeReqNodeIds.get();
if (nodeIds != null && nodeIds.contains(dstNode.id())) {
if (log.isDebugEnabled())
log.debug("Node attempts to load resource from one of the requesters " +
"[rsrcName=" + rsrcName + ", dstNodeId=" + dstNode.id() +
", requesters=" + nodeIds + ']');
GridDeploymentResponse fake = new GridDeploymentResponse();
fake.success(false);
fake.errorMessage("Node attempts to load resource from one of the requesters " +
"[rsrcName=" + rsrcName + ", dstNodeId=" + dstNode.id() +
", requesters=" + nodeIds + ']');
return fake;
}
Object resTopic = TOPIC_CLASSLOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
GridDeploymentRequest req = new GridDeploymentRequest(resTopic, clsLdrId, rsrcName, false);
// Send node IDs chain with request.
req.nodeIds(nodeIds);
final Object qryMux = new Object();
final GridTuple<GridDeploymentResponse> res = new GridTuple<>();
GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent;
assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
UUID nodeId = discoEvt.eventNode().id();
if (!nodeId.equals(dstNode.id()))
// Not a destination node.
return;
GridDeploymentResponse fake = new GridDeploymentResponse();
String errMsg = "Originating node left grid (resource will not be peer loaded) " +
"[nodeId=" + dstNode.id() + ", rsrc=" + rsrcName + ']';
U.warn(log, errMsg);
fake.success(false);
fake.errorMessage(errMsg);
// We put fake result here to interrupt waiting peer-to-peer thread
// because originating node has left grid.
synchronized (qryMux) {
res.set(fake);
qryMux.notifyAll();
}
}
};
GridMessageListener resLsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
synchronized (qryMux) {
if (!(msg instanceof GridDeploymentResponse)) {
U.error(log, "Received unknown peer class loading response [node=" + nodeId + ", msg=" +
msg + ']');
}
else
res.set((GridDeploymentResponse)msg);
qryMux.notifyAll();
}
}
};
try {
ctx.io().addMessageListener(resTopic, resLsnr);
// The destination node has potentially left grid here but in this case
// Communication manager will throw the exception while sending message.
ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
long start = U.currentTimeMillis();
if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id()))
req.responseTopicBytes(U.marshal(marsh, req.responseTopic()));
ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
if (log.isDebugEnabled())
log.debug("Sent peer class loading request [node=" + dstNode.id() + ", req=" + req + ']');
synchronized (qryMux) {
try {
long timeout = threshold - start;
if (log.isDebugEnabled()) {
log.debug("Waiting for peer response from node [node=" + dstNode.id() +
", timeout=" + timeout + ']');
}
while (res.get() == null && timeout > 0) {
qryMux.wait(timeout);
timeout = threshold - U.currentTimeMillis();
}
}
catch (InterruptedException e) {
// Interrupt again to get it in the users code.
Thread.currentThread().interrupt();
throw new IgniteCheckedException("Got interrupted while waiting for response from node: " +
dstNode.id(), e);
}
}
if (res.get() == null) {
U.warn(log, "Failed to receive peer response from node within duration [node=" + dstNode.id() +
", duration=" + (U.currentTimeMillis() - start) + ']');
}
else if (log.isDebugEnabled())
log.debug("Received peer loading response [node=" + dstNode.id() + ", res=" + res.get() + ']');
return res.get();
}
finally {
ctx.event().removeLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
ctx.io().removeMessageListener(resTopic, resLsnr);
}
}
}