| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.continuous; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import javax.cache.event.CacheEntryUpdatedListener; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.CacheEntryEventSerializableFilter; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; |
| import org.apache.ignite.internal.IgniteDeploymentCheckedException; |
| import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.deployment.GridDeployment; |
| import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; |
| import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; |
| import org.apache.ignite.internal.managers.discovery.CustomEventListener; |
| import org.apache.ignite.internal.managers.discovery.DiscoCache; |
| import org.apache.ignite.internal.managers.discovery.DiscoveryMessageResultsCollector; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; |
| import org.apache.ignite.internal.managers.systemview.walker.ContinuousQueryViewWalker; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheProcessor; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; |
| import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.lang.GridPlainRunnable; |
| import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.spi.discovery.DiscoveryDataBag; |
| import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; |
| import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; |
| import org.apache.ignite.spi.systemview.view.ContinuousQueryView; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.apache.ignite.thread.OomExceptionHandler; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; |
| import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap; |
| import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK; |
| import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_NOTIFICATION; |
| import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; |
| |
| /** |
| * Processor for continuous routines. |
| */ |
| public class GridContinuousProcessor extends GridProcessorAdapter { |
| /** */ |
| public static final String CQ_SYS_VIEW = metricName("continuous", "queries"); |
| |
| /** */ |
| public static final String CQ_SYS_VIEW_DESC = "Continuous queries"; |
| |
| /** Local infos. */ |
| private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap<>(); |
| |
| /** Local infos. */ |
| private final ConcurrentMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos = new ConcurrentHashMap<>(); |
| |
| /** Remote infos. */ |
| private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap<>(); |
| |
| /** Start futures. */ |
| private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap<>(); |
| |
| /** Stop futures. */ |
| private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap<>(); |
| |
| /** Threads started by this processor. */ |
| private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap<>(); |
| |
| /** Stopped IDs. */ |
| private final Collection<UUID> stopped = new HashSet<>(); |
| |
| /** Lock for stop process. */ |
| private final Lock stopLock = new ReentrantLock(); |
| |
| /** Marshaller. */ |
| private Marshaller marsh; |
| |
| /** Delay in milliseconds between retries. */ |
| private long retryDelay = 1000; |
| |
| /** Number of retries using to send messages. */ |
| private int retryCnt = 3; |
| |
| /** */ |
| private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock(); |
| |
| /** */ |
| private boolean processorStopped; |
| |
| /** Query sequence number for message topic. */ |
| private final AtomicLong seq = new AtomicLong(); |
| |
| /** */ |
| private ContinuousRoutinesInfo routinesInfo; |
| |
| /** */ |
| private int discoProtoVer; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public GridContinuousProcessor(GridKernalContext ctx) { |
| super(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| ctx.systemView().registerView(CQ_SYS_VIEW, CQ_SYS_VIEW_DESC, |
| new ContinuousQueryViewWalker(), |
| new ReadOnlyCollectionView2X<>(rmtInfos.entrySet(), locInfos.entrySet()), |
| e -> new ContinuousQueryView(e.getKey(), e.getValue())); |
| |
| discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2; |
| |
| if (discoProtoVer == 2) |
| routinesInfo = new ContinuousRoutinesInfo(); |
| |
| if (ctx.config().isDaemon()) |
| return; |
| |
| retryDelay = ctx.config().getNetworkSendRetryDelay(); |
| retryCnt = ctx.config().getNetworkSendRetryCount(); |
| |
| marsh = ctx.config().getMarshaller(); |
| |
| ctx.event().addLocalEventListener(new DiscoveryListener(), EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| ctx.event().addLocalEventListener(new GridLocalEventListener() { |
| @Override public void onEvent(Event evt) { |
| cancelFutures(new IgniteCheckedException("Topology segmented")); |
| } |
| }, EVT_NODE_SEGMENTED); |
| |
| ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class, |
| new CustomEventListener<StartRoutineDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StartRoutineDiscoveryMessage msg) { |
| assert discoProtoVer == 1 : discoProtoVer; |
| |
| if (ctx.isStopping()) |
| return; |
| |
| processStartRequest(snd, msg); |
| } |
| }); |
| |
| ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class, |
| new CustomEventListener<StartRoutineDiscoveryMessageV2>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StartRoutineDiscoveryMessageV2 msg) { |
| assert discoProtoVer == 2 : discoProtoVer; |
| |
| if (ctx.isStopping()) |
| return; |
| |
| processStartRequestV2(topVer, snd, msg); |
| } |
| }); |
| |
| ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, |
| new CustomEventListener<StartRoutineAckDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StartRoutineAckDiscoveryMessage msg) { |
| if (ctx.isStopping()) |
| return; |
| |
| processStartAckRequest(topVer, msg); |
| } |
| }); |
| |
| ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class, |
| new CustomEventListener<StopRoutineDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StopRoutineDiscoveryMessage msg) { |
| if (discoProtoVer == 2) |
| routinesInfo.removeRoutine(msg.routineId); |
| |
| if (ctx.isStopping()) |
| return; |
| |
| processStopRequest(snd, msg); |
| } |
| }); |
| |
| ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class, |
| new CustomEventListener<StopRoutineAckDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StopRoutineAckDiscoveryMessage msg) { |
| if (ctx.isStopping()) |
| return; |
| |
| processStopAckRequest(msg); |
| } |
| }); |
| |
| ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object obj, byte plc) { |
| if (obj instanceof ContinuousRoutineStartResultMessage) |
| processRoutineStartResultMessage(nodeId, (ContinuousRoutineStartResultMessage)obj); |
| else { |
| GridContinuousMessage msg = (GridContinuousMessage)obj; |
| |
| if (msg.data() == null && msg.dataBytes() != null) { |
| try { |
| msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to process message (ignoring): " + msg, e); |
| |
| return; |
| } |
| } |
| |
| switch (msg.type()) { |
| case MSG_EVT_NOTIFICATION: |
| processNotification(nodeId, msg); |
| |
| break; |
| |
| case MSG_EVT_ACK: |
| processMessageAck(msg); |
| |
| break; |
| |
| default: |
| assert false : "Unexpected message received: " + msg.type(); |
| } |
| } |
| } |
| }); |
| |
| ctx.cacheObjects().onContinuousProcessorStarted(ctx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Continuous processor started."); |
| } |
| |
| /** |
| * @param e Error. |
| */ |
| private void cancelFutures(IgniteCheckedException e) { |
| for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) { |
| StartFuture fut = itr.next(); |
| |
| itr.remove(); |
| |
| fut.onDone(e); |
| } |
| |
| for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) { |
| StopFuture fut = itr.next(); |
| |
| itr.remove(); |
| |
| fut.onDone(e); |
| } |
| } |
| |
| /** |
| * @return {@code true} if lock successful, {@code false} if processor already stopped. |
| */ |
| @SuppressWarnings("LockAcquiredButNotSafelyReleased") |
| public boolean lockStopping() { |
| processorStopLock.readLock().lock(); |
| |
| if (processorStopped) { |
| processorStopLock.readLock().unlock(); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * |
| */ |
| public void unlockStopping() { |
| processorStopLock.readLock().unlock(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop(boolean cancel) { |
| processorStopLock.writeLock().lock(); |
| |
| try { |
| processorStopped = true; |
| } |
| finally { |
| processorStopLock.writeLock().unlock(); |
| } |
| |
| cancelFutures(new NodeStoppingException("Failed to start continuous query (node is stopping)")); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) throws IgniteCheckedException { |
| if (ctx.config().isDaemon()) |
| return; |
| |
| ctx.io().removeMessageListener(TOPIC_CONTINUOUS); |
| |
| for (IgniteThread thread : bufCheckThreads.values()) { |
| U.interrupt(thread); |
| U.join(thread); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Continuous processor stopped."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { |
| return CONTINUOUS_PROC; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { |
| if (ctx.isDaemon()) |
| return; |
| |
| if (discoProtoVer == 2) { |
| routinesInfo.collectJoiningNodeData(dataBag); |
| |
| return; |
| } |
| |
| Serializable data = getDiscoveryData(dataBag.joiningNodeId()); |
| |
| if (data != null) |
| dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), data); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { |
| if (ctx.isDaemon()) |
| return; |
| |
| if (discoProtoVer == 2) { |
| routinesInfo.collectGridNodeData(dataBag); |
| |
| return; |
| } |
| |
| Serializable data = getDiscoveryData(dataBag.joiningNodeId()); |
| |
| if (data != null) |
| dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data); |
| } |
| |
| /** |
| * @param joiningNodeId Joining node id. |
| */ |
| private Serializable getDiscoveryData(UUID joiningNodeId) { |
| if (log.isDebugEnabled()) { |
| log.debug("collectDiscoveryData [node=" + joiningNodeId + |
| ", loc=" + ctx.localNodeId() + |
| ", locInfos=" + locInfos + |
| ", clientInfos=" + clientInfos + |
| ']'); |
| } |
| |
| if (!joiningNodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { |
| Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = copyClientInfos(clientInfos); |
| |
| if (joiningNodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) { |
| Map<UUID, LocalRoutineInfo> infos = copyLocalInfos(locInfos); |
| |
| clientInfos0.put(ctx.localNodeId(), infos); |
| } |
| |
| DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); |
| |
| // Collect listeners information (will be sent to joining node during discovery process). |
| for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { |
| UUID routineId = e.getKey(); |
| LocalRoutineInfo info = e.getValue(); |
| |
| assert !ctx.config().isPeerClassLoadingEnabled() || |
| !(info.hnd instanceof CacheContinuousQueryHandler) || |
| ((CacheContinuousQueryHandler)info.hnd).isMarshalled(); |
| |
| data.addItem(new DiscoveryDataItem(routineId, |
| info.prjPred, |
| info.hnd, |
| info.bufSize, |
| info.interval, |
| info.autoUnsubscribe)); |
| } |
| |
| return data; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param clientInfos Client infos. |
| */ |
| private Map<UUID, Map<UUID, LocalRoutineInfo>> copyClientInfos(Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { |
| Map<UUID, Map<UUID, LocalRoutineInfo>> res = U.newHashMap(clientInfos.size()); |
| |
| for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) { |
| Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size()); |
| |
| for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet()) |
| cp.put(e0.getKey(), e0.getValue()); |
| |
| res.put(e.getKey(), cp); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param locInfos Locale infos. |
| */ |
| private Map<UUID, LocalRoutineInfo> copyLocalInfos(Map<UUID, LocalRoutineInfo> locInfos) { |
| Map<UUID, LocalRoutineInfo> res = U.newHashMap(locInfos.size()); |
| |
| for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) |
| res.put(e.getKey(), e.getValue()); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { |
| if (log.isDebugEnabled()) { |
| log.debug("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() + |
| ", loc=" + ctx.localNodeId() + |
| ", data=" + data.joiningNodeData() + |
| ']'); |
| } |
| |
| if (discoProtoVer == 2) { |
| if (data.hasJoiningNodeData()) { |
| ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) |
| data.joiningNodeData(); |
| |
| for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { |
| routinesInfo.addRoutineInfo(routineInfo); |
| |
| onDiscoveryDataReceivedV2(routineInfo); |
| } |
| } |
| } |
| else { |
| if (data.hasJoiningNodeData()) |
| onDiscoveryDataReceivedV1((DiscoveryData)data.joiningNodeData()); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onGridDataReceived(GridDiscoveryData data) { |
| if (discoProtoVer == 2) { |
| if (ctx.isDaemon()) |
| return; |
| |
| if (data.commonData() != null) { |
| ContinuousRoutinesCommonDiscoveryData commonData = |
| (ContinuousRoutinesCommonDiscoveryData)data.commonData(); |
| |
| for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { |
| if (routinesInfo.routineExists(routineInfo.routineId)) |
| continue; |
| |
| routinesInfo.addRoutineInfo(routineInfo); |
| |
| onDiscoveryDataReceivedV2(routineInfo); |
| } |
| } |
| } |
| else { |
| Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData(); |
| |
| if (nodeSpecData != null) { |
| for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) |
| onDiscoveryDataReceivedV1((DiscoveryData)e.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * Processes data received in a discovery message. |
| * Used with protocol version 1. |
| * |
| * @param data received discovery data. |
| */ |
| private void onDiscoveryDataReceivedV1(DiscoveryData data) { |
| if (!ctx.isDaemon() && data != null) { |
| for (DiscoveryDataItem item : data.items) { |
| if (!locInfos.containsKey(item.routineId)) { |
| registerHandlerOnJoin(data.nodeId, item.routineId, item.prjPred, |
| item.hnd, item.bufSize, item.interval, item.autoUnsubscribe); |
| } |
| |
| if (!item.autoUnsubscribe) { |
| locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(data.nodeId, |
| item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe)); |
| } |
| } |
| |
| // Process CQs started on clients. |
| for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { |
| UUID clientNodeId = entry.getKey(); |
| |
| if (!ctx.localNodeId().equals(clientNodeId)) { |
| Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue(); |
| |
| for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) { |
| UUID routineId = e.getKey(); |
| LocalRoutineInfo info = e.getValue(); |
| |
| registerHandlerOnJoin(clientNodeId, routineId, info.prjPred, |
| info.hnd, info.bufSize, info.interval, info.autoUnsubscribe); |
| } |
| } |
| |
| Map<UUID, LocalRoutineInfo> map = |
| clientInfos.computeIfAbsent(clientNodeId, k -> new HashMap<>()); |
| |
| map.putAll(entry.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * Processes data received in a discovery message. |
| * Used with protocol version 2. |
| * |
| * @param routineInfo Routine info. |
| */ |
| private void onDiscoveryDataReceivedV2(ContinuousRoutineInfo routineInfo) { |
| IgnitePredicate<ClusterNode> nodeFilter; |
| |
| try { |
| if (routineInfo.nodeFilter != null) { |
| nodeFilter = U.unmarshal(marsh, routineInfo.nodeFilter, U.resolveClassLoader(ctx.config())); |
| |
| ctx.resource().injectGeneric(nodeFilter); |
| } |
| else |
| nodeFilter = null; |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to unmarshal continuous routine filter [" + |
| "routineId=" + routineInfo.routineId + |
| ", srcNodeId=" + routineInfo.srcNodeId + ']', e); |
| |
| ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); |
| |
| return; |
| } |
| |
| GridContinuousHandler hnd; |
| |
| try { |
| hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config())); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to unmarshal continuous routine handler [" + |
| "routineId=" + routineInfo.routineId + |
| ", srcNodeId=" + routineInfo.srcNodeId + ']', e); |
| |
| ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); |
| |
| return; |
| } |
| |
| registerHandlerOnJoin(routineInfo.srcNodeId, routineInfo.routineId, nodeFilter, |
| hnd, routineInfo.bufSize, routineInfo.interval, routineInfo.autoUnsubscribe); |
| } |
| |
| /** |
| * Register a continuous query handler on local node join. |
| * |
| * @param srcNodeId Id of the subscriber node. |
| * @param routineId Routine id. |
| * @param nodeFilter Node filter. |
| * @param hnd Continuous query handler. |
| * @param bufSize Buffer size. |
| * @param interval Time interval for buffer checker. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| */ |
| private void registerHandlerOnJoin(UUID srcNodeId, UUID routineId, IgnitePredicate<ClusterNode> nodeFilter, |
| GridContinuousHandler hnd, int bufSize, long interval, boolean autoUnsubscribe) { |
| |
| try { |
| if (nodeFilter != null) |
| ctx.resource().injectGeneric(nodeFilter); |
| |
| if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) { |
| registerHandler(srcNodeId, |
| routineId, |
| hnd, |
| bufSize, |
| interval, |
| autoUnsubscribe, |
| false); |
| |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| // Peer class loading cannot be performed before a node joins, so we delay the deployment. |
| // Run the deployment task in the system pool to avoid blocking of the discovery thread. |
| ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe((GridPlainRunnable)() -> { |
| try { |
| hnd.p2pUnmarshal(srcNodeId, ctx); |
| } |
| catch (IgniteCheckedException | IgniteException e) { |
| U.error(log, "Failed to unmarshal continuous routine handler [" + |
| "routineId=" + routineId + |
| ", srcNodeId=" + srcNodeId + ']', e); |
| |
| ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); |
| |
| unregisterHandler(routineId, hnd, false); |
| } |
| })); |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) { |
| log.debug("Do not register continuous routine, rejected by node filter [" + |
| "routineId=" + routineId + |
| ", srcNodeId=" + srcNodeId + ']'); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to register continuous routine handler [" + |
| "routineId=" + routineId + |
| ", srcNodeId=" + srcNodeId + ']', e); |
| |
| ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); |
| } |
| } |
| |
| /** |
| * Callback invoked when cache is started. |
| * |
| * @param ctx Cache context. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException { |
| for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) { |
| UUID routineId = entry.getKey(); |
| RemoteRoutineInfo rmtInfo = entry.getValue(); |
| |
| GridContinuousHandler hnd = rmtInfo.hnd; |
| |
| if (hnd.isQuery() && F.eq(ctx.name(), hnd.cacheName()) && rmtInfo.clearDelayedRegister()) { |
| GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx); |
| |
| assert status != GridContinuousHandler.RegisterStatus.DELAYED; |
| } |
| } |
| } |
| |
| /** |
| * @param ctx Callback invoked when cache is stopped. |
| */ |
| public void onCacheStop(GridCacheContext ctx) { |
| Iterator<Map.Entry<UUID, RemoteRoutineInfo>> it = rmtInfos.entrySet().iterator(); |
| |
| while (it.hasNext()) { |
| Map.Entry<UUID, RemoteRoutineInfo> entry = it.next(); |
| |
| GridContinuousHandler hnd = entry.getValue().hnd; |
| |
| if (hnd.isQuery() && F.eq(ctx.name(), hnd.cacheName())) |
| it.remove(); |
| } |
| } |
| |
| /** |
| * Registers routine info to be sent in discovery data during this node join |
| * (to be used for internal queries started from client nodes). |
| * |
| * Peer class loading is not applied to static routines. |
| * |
| * @param cacheName Cache name. |
| * @param locLsnr Local listener. |
| * @param rmtFilter Remote filter. |
| * @param prjPred Projection predicate. |
| * @return Routine ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| public UUID registerStaticRoutine( |
| String cacheName, |
| CacheEntryUpdatedListener<?, ?> locLsnr, |
| CacheEntryEventSerializableFilter rmtFilter, |
| @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException { |
| String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName; |
| |
| CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler( |
| cacheName, |
| TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), seq.incrementAndGet()), |
| locLsnr, |
| rmtFilter, |
| true, |
| false, |
| true, |
| false); |
| |
| hnd.internal(true); |
| |
| final UUID routineId = UUID.randomUUID(); |
| |
| LocalRoutineInfo routineInfo = new LocalRoutineInfo(ctx.localNodeId(), prjPred, hnd, 1, 0, true); |
| |
| if (discoProtoVer == 2) { |
| routinesInfo.addRoutineInfo(createRoutineInfo( |
| ctx.localNodeId(), |
| routineId, |
| hnd, |
| prjPred, |
| routineInfo.bufSize, |
| routineInfo.interval, |
| routineInfo.autoUnsubscribe)); |
| } |
| |
| locInfos.put(routineId, routineInfo); |
| |
| registerMessageListener(hnd); |
| |
| return routineId; |
| } |
| |
| /** |
| * @param srcNodeId Source node ID. |
| * @param routineId Routine ID. |
| * @param hnd Handler. |
| * @param nodeFilter Node filter. |
| * @param bufSize Handler buffer size. |
| * @param interval Time interval. |
| * @param autoUnsubscribe Auto unsubscribe flag. |
| * @return Routine info instance. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private ContinuousRoutineInfo createRoutineInfo( |
| UUID srcNodeId, |
| UUID routineId, |
| GridContinuousHandler hnd, |
| @Nullable IgnitePredicate<ClusterNode> nodeFilter, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe) |
| throws IgniteCheckedException { |
| byte[] hndBytes = marsh.marshal(hnd); |
| |
| byte[] filterBytes = nodeFilter != null ? marsh.marshal(nodeFilter) : null; |
| |
| return new ContinuousRoutineInfo( |
| srcNodeId, |
| routineId, |
| hndBytes, |
| filterBytes, |
| bufSize, |
| interval, |
| autoUnsubscribe); |
| } |
| |
| /** |
| * @param hnd Handler. |
| * @param bufSize Buffer size. |
| * @param interval Time interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| * @param locOnly Local only flag. |
| * @param prjPred Projection predicate. |
| * @return Future. |
| */ |
| public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd, |
| boolean locOnly, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe, |
| @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException { |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| // Generate ID. |
| final UUID routineId = UUID.randomUUID(); |
| |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| hnd.p2pMarshal(ctx); |
| |
| assert !(hnd instanceof CacheContinuousQueryHandler) || ((CacheContinuousQueryHandler)hnd).isMarshalled(); |
| } |
| |
| // Register routine locally. |
| locInfos.put(routineId, |
| new LocalRoutineInfo(ctx.localNodeId(), prjPred, hnd, bufSize, interval, autoUnsubscribe)); |
| |
| if (locOnly) { |
| try { |
| registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); |
| |
| return new GridFinishedFuture<>(routineId); |
| } |
| catch (IgniteCheckedException e) { |
| unregisterHandler(routineId, hnd, true); |
| |
| return new GridFinishedFuture<>(e); |
| } |
| } |
| |
| // Whether local node is included in routine. |
| boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); |
| |
| AbstractContinuousMessage msg; |
| |
| try { |
| msg = createStartMessage(routineId, hnd, bufSize, interval, autoUnsubscribe, prjPred); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| |
| // Register per-routine notifications listener if ordered messaging is used. |
| registerMessageListener(hnd); |
| |
| if (!lockStopping()) |
| return new GridFinishedFuture<>(new NodeStoppingException("Failed to start continuous query (node is stopping)")); |
| |
| try { |
| StartFuture fut = new StartFuture(routineId); |
| |
| startFuts.put(routineId, fut); |
| |
| try { |
| if (locIncluded || hnd.isQuery()) { |
| registerHandler(ctx.localNodeId(), |
| routineId, |
| hnd, |
| bufSize, |
| interval, |
| autoUnsubscribe, |
| true); |
| } |
| |
| ctx.discovery().sendCustomEvent(msg); |
| } |
| catch (IgniteCheckedException e) { |
| startFuts.remove(routineId); |
| locInfos.remove(routineId); |
| |
| unregisterHandler(routineId, hnd, true); |
| |
| fut.onDone(e); |
| |
| return fut; |
| } |
| |
| // Handler is registered locally. |
| fut.onLocalRegistered(); |
| |
| return fut; |
| } |
| finally { |
| unlockStopping(); |
| } |
| } |
| |
| /** |
| * @param routineId Routine ID. |
| * @param hnd Handler. |
| * @param bufSize Buffer size. |
| * @param interval Interval. |
| * @param autoUnsubscribe Auto unsubscribe flag. |
| * @param nodeFilter Node filter. |
| * @return Routine start message. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private AbstractContinuousMessage createStartMessage(UUID routineId, |
| GridContinuousHandler hnd, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe, |
| @Nullable IgnitePredicate<ClusterNode> nodeFilter |
| ) throws IgniteCheckedException { |
| hnd = hnd.clone(); |
| |
| String clsName = null; |
| GridDeploymentInfoBean dep = null; |
| |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| // Handle peer deployment for projection predicate. |
| if (nodeFilter != null && !U.isGrid(nodeFilter.getClass())) { |
| Class cls = U.detectClass(nodeFilter); |
| |
| clsName = cls.getName(); |
| |
| GridDeployment dep0 = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); |
| |
| if (dep0 == null) |
| throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + nodeFilter); |
| |
| dep = new GridDeploymentInfoBean(dep0); |
| } |
| |
| // Handle peer deployment for other handler-specific objects. |
| hnd.p2pMarshal(ctx); |
| } |
| |
| if (discoProtoVer == 1) { |
| StartRequestData reqData = new StartRequestData( |
| nodeFilter, |
| hnd, |
| bufSize, |
| interval, |
| autoUnsubscribe); |
| |
| if (clsName != null) { |
| reqData.className(clsName); |
| reqData.deploymentInfo(dep); |
| |
| reqData.p2pMarshal(marsh); |
| } |
| |
| return new StartRoutineDiscoveryMessage( |
| routineId, |
| reqData, |
| reqData.handler().keepBinary()); |
| } |
| else { |
| assert discoProtoVer == 2 : discoProtoVer; |
| |
| byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, nodeFilter) : null; |
| byte[] hndBytes = U.marshal(marsh, hnd); |
| |
| StartRequestDataV2 reqData = new StartRequestDataV2(nodeFilterBytes, |
| hndBytes, |
| bufSize, |
| interval, |
| autoUnsubscribe); |
| |
| if (clsName != null) { |
| reqData.className(clsName); |
| reqData.deploymentInfo(dep); |
| } |
| |
| return new StartRoutineDiscoveryMessageV2( |
| routineId, |
| reqData, |
| hnd.keepBinary()); |
| } |
| } |
| |
| /** |
| * @param hnd Handler. |
| */ |
| private void registerMessageListener(GridContinuousHandler hnd) { |
| if (hnd.orderedTopic() != null) { |
| ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object obj, byte plc) { |
| GridContinuousMessage msg = (GridContinuousMessage)obj; |
| |
| // Only notification can be ordered. |
| assert msg.type() == MSG_EVT_NOTIFICATION; |
| |
| if (msg.data() == null && msg.dataBytes() != null) { |
| try { |
| msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to process message (ignoring): " + msg, e); |
| |
| return; |
| } |
| } |
| |
| processNotification(nodeId, msg); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param routineId Consume ID. |
| * @return Future. |
| */ |
| public IgniteInternalFuture<?> stopRoutine(UUID routineId) { |
| assert routineId != null; |
| |
| boolean doStop = false; |
| |
| if (!lockStopping()) |
| return new GridFinishedFuture<>(new NodeStoppingException("Failed to stop continuous query (node is stopping)")); |
| |
| try { |
| StopFuture fut = stopFuts.get(routineId); |
| |
| // Only one thread will stop routine with provided ID. |
| if (fut == null) { |
| StopFuture old = stopFuts.putIfAbsent(routineId, fut = new StopFuture(ctx)); |
| |
| if (old != null) |
| fut = old; |
| else |
| doStop = true; |
| } |
| |
| if (doStop) { |
| boolean stop = false; |
| |
| // Unregister routine locally. |
| LocalRoutineInfo routine = locInfos.remove(routineId); |
| |
| if (routine != null) { |
| stop = true; |
| |
| // Unregister handler locally. |
| unregisterHandler(routineId, routine.hnd, true); |
| } |
| |
| if (!stop && discoProtoVer == 2) |
| stop = routinesInfo.routineExists(routineId); |
| |
| // Finish if routine is not found (wrong ID is provided). |
| if (!stop) { |
| stopFuts.remove(routineId); |
| |
| fut.onDone(); |
| |
| return fut; |
| } |
| |
| try { |
| ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); |
| } |
| catch (IgniteCheckedException e) { |
| fut.onDone(e); |
| } |
| |
| if (ctx.isStopping()) |
| fut.onDone(); |
| } |
| |
| return fut; |
| } |
| finally { |
| unlockStopping(); |
| } |
| } |
| |
| /** |
| * @param nodeId ID of the node that started routine. |
| * @param routineId Routine ID. |
| * @param objs Notification objects. |
| * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void addBackupNotification(UUID nodeId, |
| final UUID routineId, |
| Collection<?> objs, |
| @Nullable Object orderedTopic) |
| throws IgniteCheckedException { |
| if (processorStopped) |
| return; |
| |
| final RemoteRoutineInfo info = rmtInfos.get(routineId); |
| |
| if (info != null) { |
| final GridContinuousBatch batch = info.addAll(objs); |
| |
| Collection<Object> toSnd = batch.collect(); |
| |
| if (!toSnd.isEmpty()) |
| sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null); |
| } |
| else { |
| LocalRoutineInfo locRoutineInfo = locInfos.get(routineId); |
| |
| if (locRoutineInfo != null) |
| locRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx); |
| } |
| } |
| |
| /** |
| * @param nodeId ID of the node that started routine. |
| * @param routineId Routine ID. |
| * @param obj Notification object. |
| * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. |
| * @param sync If {@code true} then waits for event acknowledgment. |
| * @param msg If {@code true} then sent data is message. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void addNotification(UUID nodeId, |
| final UUID routineId, |
| @Nullable Object obj, |
| @Nullable Object orderedTopic, |
| boolean sync, |
| boolean msg) |
| throws IgniteCheckedException { |
| assert nodeId != null; |
| assert routineId != null; |
| assert !msg || (obj instanceof Message || obj instanceof Collection) : obj; |
| |
| assert !nodeId.equals(ctx.localNodeId()); |
| |
| if (processorStopped) |
| return; |
| |
| final RemoteRoutineInfo info = rmtInfos.get(routineId); |
| |
| if (info != null) { |
| assert info.interval == 0 || !sync; |
| |
| if (sync) { |
| SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId); |
| |
| IgniteUuid futId = IgniteUuid.randomUuid(); |
| |
| syncMsgFuts.put(futId, fut); |
| |
| try { |
| sendNotification(nodeId, |
| routineId, |
| futId, |
| obj instanceof Collection ? (Collection)obj : F.asList(obj), |
| null, |
| msg, |
| null); |
| |
| info.hnd.onBatchAcknowledged(routineId, info.add(obj), ctx); |
| } |
| catch (IgniteCheckedException e) { |
| syncMsgFuts.remove(futId); |
| |
| throw e; |
| } |
| |
| while (true) { |
| try { |
| fut.get(100, TimeUnit.MILLISECONDS); |
| |
| break; |
| } |
| catch (IgniteFutureTimeoutCheckedException ignored) { |
| // Additional failover to break waiting on node left/fail |
| // in case left/fail event processing failed, hanged or delayed. |
| if (!ctx.discovery().alive(nodeId)) { |
| SyncMessageAckFuture fut0 = syncMsgFuts.remove(futId); |
| |
| if (fut0 != null) { |
| ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( |
| "Node left grid after receiving, but before processing the message [node=" + |
| nodeId + "]"); |
| |
| fut0.onDone(err); |
| } |
| |
| break; |
| } |
| |
| LT.warn(log, "Failed to wait for ack message. [node=" + nodeId + |
| ", routine=" + routineId + "]"); |
| } |
| } |
| |
| assert fut.isDone() : "Future in not finished [fut= " + fut + "]"; |
| } |
| else { |
| final GridContinuousBatch batch = info.add(obj); |
| |
| if (batch != null) { |
| CI1<IgniteException> ackC = new CI1<IgniteException>() { |
| @Override public void apply(IgniteException e) { |
| if (e == null) |
| info.hnd.onBatchAcknowledged(routineId, batch, ctx); |
| } |
| }; |
| |
| sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC); |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { |
| cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected.")); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("onDisconnected [rmtInfos=" + rmtInfos + |
| ", locInfos=" + locInfos + |
| ", clientInfos=" + clientInfos + ']'); |
| } |
| |
| for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { |
| RemoteRoutineInfo info = e.getValue(); |
| |
| if (!ctx.localNodeId().equals(info.nodeId) || info.autoUnsubscribe) |
| unregisterRemote(e.getKey()); |
| } |
| |
| for (LocalRoutineInfo routine : locInfos.values()) |
| routine.hnd.onClientDisconnected(); |
| |
| rmtInfos.clear(); |
| |
| clientInfos.clear(); |
| |
| if (discoProtoVer == 2) |
| routinesInfo.onClientDisconnected(locInfos.keySet()); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("after onDisconnected [rmtInfos=" + rmtInfos + |
| ", locInfos=" + locInfos + |
| ", clientInfos=" + clientInfos + ']'); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param routineId Routine ID. |
| * @param futId Future ID. |
| * @param toSnd Notification object to send. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param msg If {@code true} then sent data is collection of messages. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendNotification(UUID nodeId, |
| UUID routineId, |
| @Nullable IgniteUuid futId, |
| Collection<Object> toSnd, |
| @Nullable Object orderedTopic, |
| boolean msg, |
| IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert nodeId != null; |
| assert routineId != null; |
| assert toSnd != null; |
| assert !toSnd.isEmpty(); |
| |
| sendWithRetries(nodeId, |
| new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg), |
| orderedTopic, |
| ackC); |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processStopAckRequest(StopRoutineAckDiscoveryMessage msg) { |
| StopFuture fut = stopFuts.remove(msg.routineId()); |
| |
| if (fut != null) |
| fut.onDone(); |
| } |
| |
| /** |
| * @param snd Sender node. |
| * @param msg Message/ |
| */ |
| private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg) { |
| if (!snd.id().equals(ctx.localNodeId())) { |
| UUID routineId = msg.routineId(); |
| |
| unregisterRemote(routineId); |
| } |
| |
| for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) { |
| if (clientInfo.remove(msg.routineId()) != null) |
| break; |
| } |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param msg Message. |
| */ |
| private void processStartAckRequest(AffinityTopologyVersion topVer, |
| StartRoutineAckDiscoveryMessage msg) { |
| StartFuture fut = startFuts.remove(msg.routineId()); |
| |
| if (fut != null) { |
| fut.onAllRemoteRegistered( |
| topVer, |
| msg.errs(), |
| msg.updateCountersPerNode(), |
| msg.updateCounters()); |
| } |
| } |
| |
| /** |
| * @param node Sender. |
| * @param req Start request. |
| */ |
| private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) { |
| if (node.id().equals(ctx.localNodeId())) |
| return; |
| |
| UUID routineId = req.routineId(); |
| |
| if (req.deserializationException() != null && checkNodeFilter(req)) { |
| IgniteCheckedException err = new IgniteCheckedException(req.deserializationException()); |
| |
| req.addError(node.id(), err); |
| |
| U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', err); |
| |
| return; |
| } |
| |
| StartRequestData data = req.startRequestData(); |
| |
| GridContinuousHandler hnd = data.handler(); |
| |
| if (req.keepBinary()) { |
| assert hnd instanceof CacheContinuousQueryHandler; |
| |
| ((CacheContinuousQueryHandler)hnd).keepBinary(true); |
| } |
| |
| IgniteCheckedException err = null; |
| |
| try { |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| String clsName = data.className(); |
| |
| if (clsName != null) { |
| GridDeploymentInfo depInfo = data.deploymentInfo(); |
| |
| GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, |
| depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); |
| |
| data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config())); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| |
| U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); |
| } |
| |
| if (node.isClient()) { |
| Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id()); |
| |
| if (clientRoutineMap == null) { |
| clientRoutineMap = new HashMap<>(); |
| |
| Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRoutineMap); |
| |
| assert old == null; |
| } |
| |
| clientRoutineMap.put(routineId, new LocalRoutineInfo(node.id(), |
| data.projectionPredicate(), |
| hnd, |
| data.bufferSize(), |
| data.interval(), |
| data.autoUnsubscribe())); |
| } |
| |
| if (err == null) { |
| try { |
| IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate(); |
| |
| if (prjPred != null) |
| ctx.resource().injectGeneric(prjPred); |
| |
| if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && |
| !locInfos.containsKey(routineId)) { |
| if (ctx.config().isPeerClassLoadingEnabled()) |
| hnd.p2pUnmarshal(node.id(), ctx); |
| |
| registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), |
| data.autoUnsubscribe(), false); |
| |
| // Load partition counters. |
| if (err == null && hnd.isQuery()) { |
| GridCacheProcessor proc = ctx.cache(); |
| |
| if (proc != null) { |
| GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); |
| |
| if (cache != null && cache.context().userCache()) |
| req.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); |
| } |
| } |
| } |
| |
| if (!data.autoUnsubscribe()) |
| // Register routine locally. |
| locInfos.putIfAbsent(routineId, new LocalRoutineInfo( |
| node.id(), prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe())); |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| |
| U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); |
| } |
| } |
| |
| if (err != null) |
| req.addError(ctx.localNodeId(), err); |
| } |
| |
| /** */ |
| private boolean checkNodeFilter(StartRoutineDiscoveryMessage req) { |
| StartRequestData reqData = req.startRequestData(); |
| IgnitePredicate<ClusterNode> prjPred; |
| |
| return reqData == null || (prjPred = reqData.projectionPredicate()) == null |
| || prjPred.apply(ctx.discovery().localNode()); |
| } |
| |
| /** |
| * @param sndId Sender node ID. |
| * @param msg Message. |
| */ |
| private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStartResultMessage msg) { |
| StartFuture fut = startFuts.get(msg.routineId()); |
| |
| if (fut != null) |
| fut.onResult(sndId, msg); |
| } |
| |
| /** |
| * @param topVer Current topology version. |
| * @param snd Sender. |
| * @param msg Start request. |
| */ |
| private void processStartRequestV2(final AffinityTopologyVersion topVer, |
| final ClusterNode snd, |
| final StartRoutineDiscoveryMessageV2 msg) { |
| StartRequestDataV2 reqData = msg.startRequestData(); |
| |
| ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), |
| msg.routineId(), |
| reqData.handlerBytes(), |
| reqData.nodeFilterBytes(), |
| reqData.bufferSize(), |
| reqData.interval(), |
| reqData.autoUnsubscribe()); |
| |
| routinesInfo.addRoutineInfo(routineInfo); |
| |
| final DiscoCache discoCache = ctx.discovery().discoCache(topVer); |
| |
| // Should not use marshaller and send messages from discovery thread. |
| ctx.pools().getSystemExecutorService().execute(new Runnable() { |
| @Override public void run() { |
| if (snd.id().equals(ctx.localNodeId())) { |
| StartFuture fut = startFuts.get(msg.routineId()); |
| |
| if (fut != null) |
| fut.initRemoteNodes(discoCache); |
| |
| return; |
| } |
| |
| StartRequestDataV2 reqData = msg.startRequestData(); |
| |
| Exception err = null; |
| |
| IgnitePredicate<ClusterNode> nodeFilter = null; |
| |
| byte[] cntrs = null; |
| |
| if (reqData.nodeFilterBytes() != null) { |
| try { |
| if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { |
| String clsName = reqData.className(); |
| GridDeploymentInfo depInfo = reqData.deploymentInfo(); |
| |
| GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), |
| clsName, |
| clsName, |
| depInfo.userVersion(), |
| snd.id(), |
| depInfo.classLoaderId(), |
| depInfo.participants(), |
| null); |
| |
| if (dep == null) { |
| throw new IgniteDeploymentCheckedException("Failed to obtain deployment " + |
| "for class: " + clsName); |
| } |
| |
| nodeFilter = U.unmarshal(marsh, |
| reqData.nodeFilterBytes(), |
| U.resolveClassLoader(dep.classLoader(), ctx.config())); |
| } |
| else { |
| nodeFilter = U.unmarshal(marsh, |
| reqData.nodeFilterBytes(), |
| U.resolveClassLoader(ctx.config())); |
| } |
| |
| if (nodeFilter != null) |
| ctx.resource().injectGeneric(nodeFilter); |
| } |
| catch (Exception e) { |
| err = e; |
| |
| U.error(log, "Failed to unmarshal continuous routine filter [" + |
| "routineId=" + msg.routineId + |
| ", srcNodeId=" + snd.id() + ']', e); |
| } |
| } |
| |
| boolean register = err == null && |
| (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); |
| |
| if (register) { |
| try { |
| GridContinuousHandler hnd = U.unmarshal(marsh, |
| reqData.handlerBytes(), |
| U.resolveClassLoader(ctx.config())); |
| |
| if (ctx.config().isPeerClassLoadingEnabled()) |
| hnd.p2pUnmarshal(snd.id(), ctx); |
| |
| if (msg.keepBinary()) { |
| assert hnd instanceof CacheContinuousQueryHandler : hnd; |
| |
| ((CacheContinuousQueryHandler)hnd).keepBinary(true); |
| } |
| |
| registerHandler(snd.id(), |
| msg.routineId, |
| hnd, |
| reqData.bufferSize(), |
| reqData.interval(), |
| reqData.autoUnsubscribe(), |
| false); |
| |
| if (hnd.isQuery()) { |
| GridCacheProcessor proc = ctx.cache(); |
| |
| if (proc != null) { |
| GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); |
| |
| if (cache != null && cache.context().userCache()) { |
| CachePartitionPartialCountersMap cntrsMap = |
| cache.context().topology().localUpdateCounters(false); |
| |
| cntrs = U.marshal(marsh, cntrsMap); |
| } |
| } |
| } |
| } |
| catch (Exception e) { |
| err = e; |
| |
| U.error(log, "Failed to register continuous routine handler [" + |
| "routineId=" + msg.routineId + |
| ", srcNodeId=" + snd.id() + ']', e); |
| } |
| } |
| |
| sendMessageStartResult(snd, msg.routineId(), cntrs, err); |
| } |
| }); |
| } |
| |
| /** |
| * @param node Target node. |
| * @param routineId Routine ID. |
| * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. |
| * @param err Start error if any. |
| */ |
| private void sendMessageStartResult(final ClusterNode node, |
| final UUID routineId, |
| byte[] cntrsMapBytes, |
| @Nullable final Exception err |
| ) { |
| byte[] errBytes = null; |
| |
| if (err != null) { |
| try { |
| errBytes = U.marshal(marsh, err); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to marshal routine start error: " + e, e); |
| } |
| } |
| |
| ContinuousRoutineStartResultMessage msg = new ContinuousRoutineStartResultMessage(routineId, |
| cntrsMapBytes, |
| errBytes, |
| err != null); |
| |
| try { |
| ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send routine start result, node failed: " + e); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send routine start result: " + e, e); |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processMessageAck(GridContinuousMessage msg) { |
| assert msg.futureId() != null; |
| |
| SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId()); |
| |
| if (fut != null) |
| fut.onDone(); |
| } |
| |
| /** |
| * @param nodeId Sender ID. |
| * @param msg Message. |
| */ |
| private void processNotification(UUID nodeId, GridContinuousMessage msg) { |
| assert nodeId != null; |
| assert msg != null; |
| |
| UUID routineId = msg.routineId(); |
| |
| try { |
| LocalRoutineInfo routine = locInfos.get(routineId); |
| |
| if (routine != null) |
| routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)msg.data(), ctx); |
| } |
| finally { |
| if (msg.futureId() != null) { |
| try { |
| sendWithRetries(nodeId, |
| new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false), |
| null, |
| null); |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to send event acknowledgment to node: " + nodeId, e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param routineId Consume ID. |
| * @param hnd Handler. |
| * @param bufSize Buffer size. |
| * @param interval Time interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| * @param loc Local registration flag. |
| * @return Whether listener was actually registered. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private boolean registerHandler(final UUID nodeId, |
| final UUID routineId, |
| final GridContinuousHandler hnd, |
| int bufSize, |
| final long interval, |
| boolean autoUnsubscribe, |
| boolean loc) throws IgniteCheckedException { |
| assert nodeId != null; |
| assert routineId != null; |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| final RemoteRoutineInfo info = new RemoteRoutineInfo(nodeId, hnd, bufSize, interval, autoUnsubscribe); |
| |
| boolean doRegister = loc; |
| |
| if (!doRegister) { |
| stopLock.lock(); |
| |
| try { |
| doRegister = !stopped.remove(routineId) && rmtInfos.putIfAbsent(routineId, info) == null; |
| } |
| finally { |
| stopLock.unlock(); |
| } |
| } |
| |
| if (doRegister) { |
| if (log.isDebugEnabled()) |
| log.debug("Register handler: [nodeId=" + nodeId + ", routineId=" + routineId + ", info=" + info + ']'); |
| |
| if (interval > 0) { |
| IgniteThread checker = new IgniteThread(new GridWorker(ctx.igniteInstanceName(), "continuous-buffer-checker", log) { |
| @Override protected void body() { |
| long interval0 = interval; |
| |
| while (!isCancelled()) { |
| try { |
| U.sleep(interval0); |
| } |
| catch (IgniteInterruptedCheckedException ignored) { |
| break; |
| } |
| |
| IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval(); |
| |
| final GridContinuousBatch batch = t.get1(); |
| |
| if (batch != null && batch.size() > 0) { |
| try { |
| Collection<Object> toSnd = batch.collect(); |
| |
| boolean msg = toSnd.iterator().next() instanceof Message; |
| |
| CI1<IgniteException> ackC = new CI1<IgniteException>() { |
| @Override public void apply(IgniteException e) { |
| if (e == null) |
| info.hnd.onBatchAcknowledged(routineId, batch, ctx); |
| } |
| }; |
| |
| sendNotification(nodeId, |
| routineId, |
| null, |
| toSnd, |
| hnd.orderedTopic(), |
| msg, |
| ackC); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send notification to node (is node alive?): " + nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send notification to node: " + nodeId, e); |
| } |
| } |
| |
| interval0 = t.get2(); |
| } |
| } |
| }); |
| |
| checker.setUncaughtExceptionHandler(new OomExceptionHandler(ctx)); |
| |
| bufCheckThreads.put(routineId, checker); |
| |
| checker.start(); |
| } |
| |
| GridContinuousHandler.RegisterStatus status = hnd.register(nodeId, routineId, ctx); |
| |
| if (status == GridContinuousHandler.RegisterStatus.DELAYED) { |
| info.markDelayedRegister(); |
| |
| return false; |
| } |
| else |
| return status == GridContinuousHandler.RegisterStatus.REGISTERED; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param routineId Routine ID. |
| * @param hnd Handler |
| * @param loc If Handler unregistered on master node. |
| */ |
| private void unregisterHandler(UUID routineId, GridContinuousHandler hnd, boolean loc) { |
| assert routineId != null; |
| assert hnd != null; |
| |
| if (loc && hnd.orderedTopic() != null) |
| ctx.io().removeMessageListener(hnd.orderedTopic()); |
| |
| hnd.unregister(routineId, ctx); |
| |
| IgniteThread checker = bufCheckThreads.remove(routineId); |
| |
| if (checker != null) |
| checker.interrupt(); |
| } |
| |
| /** |
| * @param routineId Routine ID. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| private void unregisterRemote(UUID routineId) { |
| RemoteRoutineInfo remote; |
| LocalRoutineInfo loc; |
| |
| stopLock.lock(); |
| |
| try { |
| remote = rmtInfos.remove(routineId); |
| |
| loc = locInfos.remove(routineId); |
| |
| if (remote == null) |
| stopped.add(routineId); |
| } |
| finally { |
| stopLock.unlock(); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("unregisterRemote [routineId=" + routineId + ", loc=" + loc + ", rmt=" + remote + ']'); |
| |
| if (remote != null) |
| unregisterHandler(routineId, remote.hnd, false); |
| else if (loc != null) { |
| // Removes routine at node started it when stopRoutine called from another node. |
| unregisterHandler(routineId, loc.hnd, false); |
| } |
| } |
| |
| /** |
| * @param nodeId Destination node ID. |
| * @param msg Message. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic, |
| IgniteInClosure<IgniteException> ackC) |
| throws IgniteCheckedException { |
| assert nodeId != null; |
| assert msg != null; |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node != null) |
| sendWithRetries(node, msg, orderedTopic, ackC); |
| else |
| throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId); |
| } |
| |
| /** |
| * @param node Destination node. |
| * @param msg Message. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic, |
| IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert node != null; |
| assert msg != null; |
| |
| sendWithRetries(F.asList(node), msg, orderedTopic, ackC); |
| } |
| |
| /** |
| * @param nodes Destination nodes. |
| * @param msg Message. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, |
| @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert !F.isEmpty(nodes); |
| assert msg != null; |
| |
| if (!msg.messages() && |
| msg.data() != null && |
| (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id()))) |
| msg.dataBytes(U.marshal(marsh, msg.data())); |
| |
| for (ClusterNode node : nodes) { |
| int cnt = 0; |
| |
| while (cnt <= retryCnt) { |
| try { |
| cnt++; |
| |
| if (orderedTopic != null) { |
| ctx.io().sendOrderedMessage( |
| node, |
| orderedTopic, |
| msg, |
| SYSTEM_POOL, |
| 0, |
| true, |
| ackC); |
| } |
| else |
| ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC); |
| |
| break; |
| } |
| catch (ClusterTopologyCheckedException | IgniteInterruptedCheckedException e) { |
| throw e; |
| } |
| catch (IgniteCheckedException e) { |
| if (!ctx.discovery().alive(node.id())) |
| throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e); |
| |
| if (cnt == retryCnt) |
| throw e; |
| else if (log.isDebugEnabled()) |
| log.debug("Failed to send message to node (will retry): " + node.id()); |
| } |
| |
| U.sleep(retryDelay); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener { |
| /** {@inheritDoc} */ |
| @Override public void onEvent(Event evt) { |
| assert evt instanceof DiscoveryEvent; |
| assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; |
| |
| UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); |
| |
| if (discoProtoVer == 2) { |
| routinesInfo.onNodeFail(nodeId); |
| |
| for (StartFuture fut : startFuts.values()) |
| fut.onNodeFail(nodeId); |
| } |
| |
| clientInfos.remove(nodeId); |
| |
| // Unregister handlers created by left node. |
| for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { |
| UUID routineId = e.getKey(); |
| RemoteRoutineInfo info = e.getValue(); |
| |
| if (nodeId.equals(info.nodeId)) { |
| if (info.autoUnsubscribe) |
| unregisterRemote(routineId); |
| |
| info.hnd.flushOnNodeLeft(); |
| } |
| } |
| |
| for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { |
| SyncMessageAckFuture fut = e.getValue(); |
| |
| if (fut.nodeId().equals(nodeId)) { |
| SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); |
| |
| if (fut0 != null) { |
| ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( |
| "Node left grid while sending message to: " + nodeId); |
| |
| fut0.onDone(err); |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int order() { |
| return 1; |
| } |
| } |
| |
| /** Routine interface info. */ |
| public static interface RoutineInfo { |
| /** @return Handler. */ |
| GridContinuousHandler handler(); |
| |
| /** @return Master node id. */ |
| UUID nodeId(); |
| |
| /** @return Buffer size. */ |
| int bufferSize(); |
| |
| /** @return Notify interval. */ |
| long interval(); |
| |
| /** @return Auto unsubscribe flag value. */ |
| boolean autoUnsubscribe(); |
| |
| /** @return Last send time. */ |
| long lastSendTime(); |
| |
| /** @return Delayed register flag. */ |
| boolean delayedRegister(); |
| } |
| |
| /** |
| * Local routine info. |
| */ |
| public static class LocalRoutineInfo implements Serializable, RoutineInfo { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Source node id. */ |
| private final UUID nodeId; |
| |
| /** Projection predicate. */ |
| private final IgnitePredicate<ClusterNode> prjPred; |
| |
| /** Continuous routine handler. */ |
| private final GridContinuousHandler hnd; |
| |
| /** Buffer size. */ |
| private final int bufSize; |
| |
| /** Time interval. */ |
| private final long interval; |
| |
| /** Automatic unsubscribe flag. */ |
| private boolean autoUnsubscribe; |
| |
| /** |
| * @param nodeId Node id. |
| * @param prjPred Projection predicate. |
| * @param hnd Continuous routine handler. |
| * @param bufSize Buffer size. |
| * @param interval Interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| */ |
| LocalRoutineInfo( |
| UUID nodeId, |
| @Nullable IgnitePredicate<ClusterNode> prjPred, |
| GridContinuousHandler hnd, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe |
| ) { |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| this.nodeId = nodeId; |
| this.prjPred = prjPred; |
| this.hnd = hnd; |
| this.bufSize = bufSize; |
| this.interval = interval; |
| this.autoUnsubscribe = autoUnsubscribe; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridContinuousHandler handler() { |
| return hnd; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int bufferSize() { |
| return bufSize; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long interval() { |
| return interval; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean autoUnsubscribe() { |
| return autoUnsubscribe; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long lastSendTime() { |
| return -1; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean delayedRegister() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(LocalRoutineInfo.class, this); |
| } |
| } |
| |
| /** |
| * Remote routine info. |
| */ |
| public static class RemoteRoutineInfo implements RoutineInfo { |
| /** Master node ID. */ |
| private final UUID nodeId; |
| |
| /** Continuous routine handler. */ |
| private final GridContinuousHandler hnd; |
| |
| /** Buffer size. */ |
| private final int bufSize; |
| |
| /** Time interval. */ |
| private final long interval; |
| |
| /** Lock. */ |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| /** Batch. */ |
| private GridContinuousBatch batch; |
| |
| /** Last send time. */ |
| private long lastSndTime = U.currentTimeMillis(); |
| |
| /** Automatic unsubscribe flag. */ |
| private final boolean autoUnsubscribe; |
| |
| /** Delayed register flag. */ |
| private boolean delayedRegister; |
| |
| /** |
| * @param nodeId Master node ID. |
| * @param hnd Continuous routine handler. |
| * @param bufSize Buffer size. |
| * @param interval Interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| */ |
| RemoteRoutineInfo(UUID nodeId, GridContinuousHandler hnd, int bufSize, long interval, |
| boolean autoUnsubscribe) { |
| assert nodeId != null; |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| this.nodeId = nodeId; |
| this.hnd = hnd; |
| this.bufSize = bufSize; |
| this.interval = interval; |
| this.autoUnsubscribe = autoUnsubscribe; |
| |
| batch = hnd.createBatch(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridContinuousHandler handler() { |
| return hnd; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int bufferSize() { |
| return bufSize; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long interval() { |
| return interval; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean autoUnsubscribe() { |
| return autoUnsubscribe; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long lastSendTime() { |
| return lastSndTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean delayedRegister() { |
| return delayedRegister; |
| } |
| |
| /** |
| * Marks info to be registered when cache is started. |
| */ |
| public void markDelayedRegister() { |
| assert hnd.isQuery(); |
| |
| delayedRegister = true; |
| } |
| |
| /** |
| * Clears delayed register flag if it was set. |
| * |
| * @return {@code True} if flag was cleared. |
| */ |
| public boolean clearDelayedRegister() { |
| if (delayedRegister) { |
| delayedRegister = false; |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param objs Objects to add. |
| * @return Batch to send. |
| */ |
| GridContinuousBatch addAll(Collection<?> objs) { |
| assert objs != null; |
| |
| GridContinuousBatch toSnd; |
| |
| lock.writeLock().lock(); |
| |
| try { |
| for (Object obj : objs) |
| batch.add(obj); |
| |
| toSnd = batch; |
| |
| batch = hnd.createBatch(); |
| |
| if (interval > 0) |
| lastSndTime = U.currentTimeMillis(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| |
| return toSnd; |
| } |
| |
| /** |
| * @param obj Object to add. |
| * @return Batch to send or {@code null} if there is nothing to send for now. |
| */ |
| @Nullable GridContinuousBatch add(Object obj) { |
| assert obj != null; |
| |
| GridContinuousBatch toSnd = null; |
| |
| if (batch.size() >= bufSize - 1) { |
| lock.writeLock().lock(); |
| |
| try { |
| batch.add(obj); |
| |
| toSnd = batch; |
| |
| batch = hnd.createBatch(); |
| |
| if (interval > 0) |
| lastSndTime = U.currentTimeMillis(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| else { |
| lock.readLock().lock(); |
| |
| try { |
| batch.add(obj); |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| return toSnd; |
| } |
| |
| /** |
| * @return Tuple with batch to send (or {@code null} if there is nothing to |
| * send for now) and time interval after next check is needed. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| IgniteBiTuple<GridContinuousBatch, Long> checkInterval() { |
| assert interval > 0; |
| |
| GridContinuousBatch toSnd = null; |
| long diff; |
| |
| long now = U.currentTimeMillis(); |
| |
| lock.writeLock().lock(); |
| |
| try { |
| diff = now - lastSndTime; |
| |
| if (diff >= interval && batch.size() > 0) { |
| toSnd = batch; |
| |
| batch = hnd.createBatch(); |
| |
| lastSndTime = now; |
| } |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| |
| return F.t(toSnd, diff < interval ? interval - diff : interval); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(RemoteRoutineInfo.class, this); |
| } |
| } |
| |
| /** |
| * Discovery data. |
| */ |
| private static class DiscoveryData implements Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Node ID. */ |
| private UUID nodeId; |
| |
| /** Items. */ |
| @GridToStringInclude |
| private Collection<DiscoveryDataItem> items; |
| |
| /** */ |
| private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public DiscoveryData() { |
| // No-op. |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param clientInfos Client information. |
| */ |
| DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { |
| assert nodeId != null; |
| |
| this.nodeId = nodeId; |
| |
| this.clientInfos = clientInfos; |
| |
| items = new ArrayList<>(); |
| } |
| |
| /** |
| * @param item Item. |
| */ |
| public void addItem(DiscoveryDataItem item) { |
| items.add(item); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeUuid(out, nodeId); |
| U.writeCollection(out, items); |
| U.writeMap(out, clientInfos); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| nodeId = U.readUuid(in); |
| items = U.readCollection(in); |
| clientInfos = U.readMap(in); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DiscoveryData.class, this); |
| } |
| } |
| |
| /** |
| * Discovery data item. |
| */ |
| private static class DiscoveryDataItem implements Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Consume ID. */ |
| private UUID routineId; |
| |
| /** Projection predicate. */ |
| private IgnitePredicate<ClusterNode> prjPred; |
| |
| /** Handler. */ |
| private GridContinuousHandler hnd; |
| |
| /** Buffer size. */ |
| private int bufSize; |
| |
| /** Time interval. */ |
| private long interval; |
| |
| /** Automatic unsubscribe flag. */ |
| private boolean autoUnsubscribe; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public DiscoveryDataItem() { |
| // No-op. |
| } |
| |
| /** |
| * @param routineId Consume ID. |
| * @param prjPred Projection predicate. |
| * @param hnd Handler. |
| * @param bufSize Buffer size. |
| * @param interval Time interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| */ |
| DiscoveryDataItem(UUID routineId, |
| @Nullable IgnitePredicate<ClusterNode> prjPred, |
| GridContinuousHandler hnd, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe |
| ) { |
| assert routineId != null; |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| this.routineId = routineId; |
| this.prjPred = prjPred; |
| this.hnd = hnd; |
| this.bufSize = bufSize; |
| this.interval = interval; |
| this.autoUnsubscribe = autoUnsubscribe; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeUuid(out, routineId); |
| out.writeObject(prjPred); |
| out.writeObject(hnd); |
| out.writeInt(bufSize); |
| out.writeLong(interval); |
| out.writeBoolean(autoUnsubscribe); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| routineId = U.readUuid(in); |
| prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); |
| hnd = (GridContinuousHandler)in.readObject(); |
| bufSize = in.readInt(); |
| interval = in.readLong(); |
| autoUnsubscribe = in.readBoolean(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DiscoveryDataItem.class, this); |
| } |
| } |
| |
| /** |
| * Future for start routine. |
| */ |
| private class StartFuture extends GridFutureAdapter<UUID> { |
| /** Consume ID. */ |
| private UUID routineId; |
| |
| /** Local listener is registered. */ |
| private volatile boolean loc; |
| |
| /** All remote listeners are registered. */ |
| private volatile boolean rmt; |
| |
| /** */ |
| private final DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, RoutineRegisterResults> |
| resCollect; |
| |
| /** |
| * @param routineId Consume ID. |
| */ |
| StartFuture(UUID routineId) { |
| this.routineId = routineId; |
| |
| resCollect = new DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, RoutineRegisterResults>(ctx) { |
| @Override protected RoutineRegisterResults createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) { |
| Map<UUID, Exception> errs = null; |
| Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null; |
| |
| for (Map.Entry<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) { |
| ContinuousRoutineStartResultMessage msg = entry.getValue().message(); |
| |
| if (msg == null) |
| continue; |
| |
| if (msg.error()) { |
| byte[] errBytes = msg.errorBytes(); |
| |
| Exception err = null; |
| |
| if (errBytes != null) { |
| try { |
| err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); |
| } |
| catch (Exception e) { |
| U.warn(log, "Failed to unmarhal continuous routine start error: " + e); |
| } |
| } |
| |
| if (err == null) { |
| err = new IgniteCheckedException("Failed to start continuous " + |
| "routine on node: " + entry.getKey()); |
| } |
| |
| if (errs == null) |
| errs = new HashMap<>(); |
| |
| errs.put(entry.getKey(), err); |
| } |
| else { |
| byte[] cntrsMapBytes = msg.countersMapBytes(); |
| |
| if (cntrsMapBytes != null) { |
| try { |
| CachePartitionPartialCountersMap cntrsMap = U.unmarshal( |
| marsh, |
| cntrsMapBytes, |
| U.resolveClassLoader(ctx.config())); |
| |
| if (cntrsPerNode == null) |
| cntrsPerNode = new HashMap<>(); |
| |
| cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); |
| } |
| catch (Exception e) { |
| U.warn(log, "Failed to unmarhal continuous query update counters: " + e); |
| } |
| } |
| } |
| } |
| |
| return new RoutineRegisterResults(discoCache.version(), errs, cntrsPerNode); |
| } |
| |
| @Override protected void onResultsCollected(RoutineRegisterResults res0) { |
| onAllRemoteRegistered(res0.topVer, res0.errs, res0.cntrsPerNode, null); |
| } |
| |
| @Override protected boolean waitForNode(DiscoCache discoCache, ClusterNode node) { |
| return !ctx.localNodeId().equals(node.id()); |
| } |
| }; |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param errs Errors. |
| * @param cntrsPerNode Update counters. |
| * @param cntrs Update counters. |
| */ |
| private void onAllRemoteRegistered( |
| AffinityTopologyVersion topVer, |
| @Nullable Map<UUID, ? extends Exception> errs, |
| Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode, |
| Map<Integer, T2<Long, Long>> cntrs) { |
| try { |
| if (errs == null || errs.isEmpty()) { |
| LocalRoutineInfo routine = locInfos.get(routineId); |
| |
| // Update partition counters. |
| if (routine != null && routine.handler().isQuery()) { |
| GridCacheAdapter<Object, Object> interCache = |
| ctx.cache().internalCache(routine.handler().cacheName()); |
| |
| GridCacheContext cctx = interCache != null ? interCache.context() : null; |
| |
| if (cctx != null && cntrsPerNode != null && cctx.affinityNode()) |
| cntrsPerNode.put(ctx.localNodeId(), |
| toCountersMap(cctx.topology().localUpdateCounters(false))); |
| |
| routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); |
| } |
| |
| onRemoteRegistered(); |
| } |
| else { |
| Exception firstEx = F.first(errs.values()); |
| |
| onDone(firstEx); |
| |
| stopRoutine(routineId); |
| } |
| } |
| finally { |
| startFuts.remove(routineId, this); |
| } |
| } |
| |
| /** |
| * @param discoCache Discovery state. |
| */ |
| void initRemoteNodes(DiscoCache discoCache) { |
| resCollect.init(discoCache); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| */ |
| void onResult(UUID nodeId, ContinuousRoutineStartResultMessage msg) { |
| resCollect.onMessage(nodeId, msg); |
| } |
| |
| /** |
| * @param nodeId Failed node ID. |
| */ |
| void onNodeFail(UUID nodeId) { |
| resCollect.onNodeFail(nodeId); |
| } |
| |
| /** |
| * Called when local listener is registered. |
| */ |
| void onLocalRegistered() { |
| loc = true; |
| |
| if (rmt && !isDone()) |
| onDone(routineId); |
| } |
| |
| /** |
| * Called when all remote listeners are registered. |
| */ |
| void onRemoteRegistered() { |
| rmt = true; |
| |
| if (loc && !isDone()) |
| onDone(routineId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(StartFuture.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class RoutineRegisterResults { |
| /** */ |
| private final AffinityTopologyVersion topVer; |
| |
| /** */ |
| private final Map<UUID, ? extends Exception> errs; |
| |
| /** */ |
| private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode; |
| |
| /** |
| * @param topVer Topology version. |
| * @param errs Errors. |
| * @param cntrsPerNode Update counters. |
| */ |
| RoutineRegisterResults(AffinityTopologyVersion topVer, |
| Map<UUID, ? extends Exception> errs, |
| Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) { |
| this.topVer = topVer; |
| this.errs = errs; |
| this.cntrsPerNode = cntrsPerNode; |
| } |
| } |
| |
| /** |
| * Future for stop routine. |
| */ |
| private static class StopFuture extends GridFutureAdapter<Object> { |
| /** Timeout object. */ |
| private volatile GridTimeoutObject timeoutObj; |
| |
| /** */ |
| private GridKernalContext ctx; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| StopFuture(GridKernalContext ctx) { |
| this.ctx = ctx; |
| } |
| |
| /** |
| * @param timeoutObj Timeout object. |
| */ |
| public void addTimeoutObject(GridTimeoutObject timeoutObj) { |
| assert timeoutObj != null; |
| |
| this.timeoutObj = timeoutObj; |
| |
| ctx.timeout().addTimeoutObject(timeoutObj); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { |
| if (timeoutObj != null) |
| ctx.timeout().removeTimeoutObject(timeoutObj); |
| |
| return super.onDone(res, err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(StopFuture.class, this); |
| } |
| } |
| |
| /** |
| * Synchronous message acknowledgement future. |
| */ |
| private static class SyncMessageAckFuture extends GridFutureAdapter<Object> { |
| /** */ |
| private UUID nodeId; |
| |
| /** |
| * @param nodeId Master node ID. |
| */ |
| SyncMessageAckFuture(UUID nodeId) { |
| this.nodeId = nodeId; |
| } |
| |
| /** |
| * @return Master node ID. |
| */ |
| UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(SyncMessageAckFuture.class, this); |
| } |
| } |
| } |