blob: e63d7d4d28bc1e43a8c1a85b069c6fdf1588bce9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.datastreamer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.OomExceptionHandler;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
/**
* Data stream processor.
*/
public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
/** Loaders map (access is not supposed to be highly concurrent). */
private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** Flushing thread. */
private Thread flusher;
/** */
private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
/** Marshaller. */
private final Marshaller marsh;
/** */
private byte[] marshErrBytes;
/**
* @param ctx Kernal context.
*/
public DataStreamProcessor(GridKernalContext ctx) {
super(ctx);
if (!ctx.clientNode()) {
ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof DataStreamerRequest;
processRequest(nodeId, (DataStreamerRequest)msg);
}
});
}
marsh = ctx.config().getMarshaller();
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (ctx.config().isDaemon())
return;
marshErrBytes = U.marshal(marsh, new IgniteCheckedException("Failed to marshal response error, " +
"see node log for details."));
flusher = new IgniteThread(new GridWorker(ctx.igniteInstanceName(), "grid-data-loader-flusher", log) {
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
DataStreamerImpl<K, V> ldr = flushQ.take();
if (!busyLock.enterBusy())
return;
try {
if (ldr.isClosed())
continue;
ldr.tryFlush();
flushQ.offer(ldr);
}
finally {
busyLock.leaveBusy();
}
}
}
});
flusher.setUncaughtExceptionHandler(new OomExceptionHandler(ctx));
flusher.start();
if (log.isDebugEnabled())
log.debug("Started data streamer processor.");
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
if (ctx.config().isDaemon())
return;
if (!ctx.clientNode())
ctx.io().removeMessageListener(TOPIC_DATASTREAM);
busyLock.block();
U.interrupt(flusher);
U.join(flusher, log);
for (DataStreamerImpl<?, ?> ldr : ldrs) {
if (log.isDebugEnabled())
log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
try {
ldr.closeEx(cancel);
}
catch (IgniteInterruptedCheckedException e) {
U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to close data streamer: " + ldr, e);
}
}
if (log.isDebugEnabled())
log.debug("Stopped data streamer processor.");
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
for (DataStreamerImpl<?, ?> ldr : ldrs)
ldr.onDisconnected(reconnectFut);
}
/**
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
*/
public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
try {
final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<>(ctx, cacheName, flushQ);
ldrs.add(ldr);
ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
boolean b = ldrs.remove(ldr);
assert b : "Loader has not been added to set: " + ldr;
if (log.isDebugEnabled())
log.debug("Loader has been completed: " + ldr);
}
});
return ldr;
}
finally {
busyLock.leaveBusy();
}
}
/**
* @param nodeId Sender ID.
* @param req Request.
*/
private void processRequest(final UUID nodeId, final DataStreamerRequest req) {
if (!busyLock.enterBusy()) {
if (log.isDebugEnabled())
log.debug("Ignoring data load request (node is stopping): " + req);
return;
}
try {
if (log.isDebugEnabled())
log.debug("Processing data load request: " + req);
AffinityTopologyVersion locAffVer = ctx.cache().context().exchange().readyAffinityVersion();
AffinityTopologyVersion rmtAffVer = req.topologyVersion();
if (locAffVer.compareTo(rmtAffVer) < 0) {
if (log.isDebugEnabled())
log.debug("Received request has higher affinity topology version [request=" + req +
", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']');
IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
if (fut != null && !fut.isDone()) {
final byte plc = threadIoPolicy();
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
ctx.closure().runLocalSafe(new Runnable() {
@Override public void run() {
processRequest(nodeId, req);
}
}, plc);
}
});
return;
}
}
Object topic;
try {
topic = U.unmarshal(marsh, req.responseTopicBytes(), U.resolveClassLoader(null, ctx.config()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal topic from request: " + req, e);
return;
}
ClassLoader clsLdr;
if (req.forceLocalDeployment())
clsLdr = U.gridClassLoader();
else {
GridDeployment dep = ctx.deploy().getGlobalDeployment(
req.deploymentMode(),
req.sampleClassName(),
req.sampleClassName(),
req.userVersion(),
nodeId,
req.classLoaderId(),
req.participants(),
null);
if (dep == null) {
sendResponse(nodeId,
topic,
req.requestId(),
new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
", req=" + req + ']'),
false);
return;
}
clsLdr = dep.classLoader();
}
StreamReceiver<K, V> updater;
try {
updater = U.unmarshal(marsh, req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
if (updater != null)
ctx.resource().injectGeneric(updater);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
sendResponse(nodeId, topic, req.requestId(), e, false);
return;
}
localUpdate(nodeId, req, updater, topic);
}
finally {
busyLock.leaveBusy();
}
}
/**
* @param nodeId Node id.
* @param req Request.
* @param updater Updater.
* @param topic Topic.
*/
private void localUpdate(final UUID nodeId,
final DataStreamerRequest req,
final StreamReceiver<K, V> updater,
final Object topic) {
final boolean allowOverwrite = !(updater instanceof DataStreamerImpl.IsolatedUpdater);
try {
GridCacheAdapter cache = ctx.cache().internalCache(req.cacheName());
if (cache == null) {
throw new IgniteCheckedException("Cache not created or already destroyed: " +
req.cacheName());
}
GridCacheContext cctx = cache.context();
DataStreamerUpdateJob job = null;
GridFutureAdapter waitFut = null;
if (!allowOverwrite)
cctx.topology().readLock();
GridDhtTopologyFuture topWaitFut = null;
try {
Exception remapErr = null;
AffinityTopologyVersion streamerFutTopVer = null;
if (!allowOverwrite) {
GridDhtTopologyFuture topFut = cctx.topologyVersionFuture();
AffinityTopologyVersion topVer = topFut.isDone() ? topFut.topologyVersion() :
topFut.initialVersion();
if (topVer.compareTo(req.topologyVersion()) > 0) {
remapErr = new ClusterTopologyCheckedException("DataStreamer will retry " +
"data transfer at stable topology [reqTop=" + req.topologyVersion() +
", topVer=" + topFut.initialVersion() + ", node=remote]");
}
else if (!topFut.isDone())
topWaitFut = topFut;
else
streamerFutTopVer = topFut.topologyVersion();
}
if (remapErr != null) {
sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment());
return;
}
else if (topWaitFut == null) {
job = new DataStreamerUpdateJob(ctx,
log,
req.cacheName(),
req.entries(),
req.ignoreDeploymentOwnership(),
req.skipStore(),
req.keepBinary(),
updater);
waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(streamerFutTopVer);
}
}
finally {
if (!allowOverwrite)
cctx.topology().readUnlock();
}
if (topWaitFut != null) {
// Need call 'listen' after topology read lock is released.
topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
localUpdate(nodeId, req, updater, topic);
}
});
return;
}
try {
job.call();
sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
}
finally {
if (waitFut != null)
waitFut.onDone();
}
}
catch (Throwable e) {
sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment());
if (e instanceof Error)
throw (Error)e;
}
}
/**
* @param nodeId Node ID.
* @param resTopic Response topic.
* @param reqId Request ID.
* @param err Error.
* @param forceLocDep Force local deployment.
*/
private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
boolean forceLocDep) {
byte[] errBytes;
try {
errBytes = err != null ? U.marshal(marsh, err) : null;
}
catch (Exception e) {
U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
errBytes = marshErrBytes;
}
DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
try {
ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy());
}
catch (IgniteCheckedException e) {
if (ctx.discovery().alive(nodeId))
U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
else if (log.isDebugEnabled())
log.debug("Node has left the grid: " + nodeId);
}
}
/**
* Get IO policy.
*
* @return IO policy.
*/
private static byte threadIoPolicy() {
Byte plc = GridIoManager.currentPolicy();
if (plc == null)
plc = DATA_STREAMER_POOL;
return plc;
}
/**
* Get IO policy for particular node with provided resolver.
*
* @param rslvr Resolver.
* @param node Node.
* @return IO policy.
*/
public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> rslvr, ClusterNode node) {
assert node != null;
Byte res = null;
if (rslvr != null)
res = rslvr.apply(node);
if (res == null)
res = DATA_STREAMER_POOL;
return res;
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> Data streamer processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
X.println(">>> ldrsSize: " + ldrs.size());
}
}