| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.continuous; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridMessageListenHandler; |
| import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; |
| import org.apache.ignite.internal.IgniteDeploymentCheckedException; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.deployment.GridDeployment; |
| import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; |
| import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; |
| import org.apache.ignite.internal.managers.discovery.CustomEventListener; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheProcessor; |
| import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteProductVersion; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.jetbrains.annotations.Nullable; |
| import org.jsr166.ConcurrentHashMap8; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK; |
| import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_NOTIFICATION; |
| |
| /** |
| * Processor for continuous routines. |
| */ |
| public class GridContinuousProcessor extends GridProcessorAdapter { |
| /** Local infos. */ |
| private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap8<>(); |
| |
| /** Local infos. */ |
| private final ConcurrentMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos = new ConcurrentHashMap8<>(); |
| |
| /** Remote infos. */ |
| private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap8<>(); |
| |
| /** Start futures. */ |
| private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<>(); |
| |
| /** Stop futures. */ |
| private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>(); |
| |
| /** Threads started by this processor. */ |
| private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap8<>(); |
| |
| /** */ |
| public static final IgniteProductVersion QUERY_MSG_VER_2_SINCE = IgniteProductVersion.fromString("1.5.9"); |
| |
| /** */ |
| private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>(); |
| |
| /** Stopped IDs. */ |
| private final Collection<UUID> stopped = new HashSet<>(); |
| |
| /** Lock for stop process. */ |
| private final Lock stopLock = new ReentrantLock(); |
| |
| /** Marshaller. */ |
| private Marshaller marsh; |
| |
| /** Delay in milliseconds between retries. */ |
| private long retryDelay = 1000; |
| |
| /** Number of retries using to send messages. */ |
| private int retryCnt = 3; |
| |
| /** */ |
| private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock(); |
| |
| /** */ |
| private boolean processorStopped; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public GridContinuousProcessor(GridKernalContext ctx) { |
| super(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| if (ctx.config().isDaemon()) |
| return; |
| |
| retryDelay = ctx.config().getNetworkSendRetryDelay(); |
| retryCnt = ctx.config().getNetworkSendRetryCount(); |
| |
| marsh = ctx.config().getMarshaller(); |
| |
| ctx.event().addLocalEventListener(new GridLocalEventListener() { |
| @SuppressWarnings({"fallthrough", "TooBroadScope"}) |
| @Override public void onEvent(Event evt) { |
| assert evt instanceof DiscoveryEvent; |
| assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; |
| |
| UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); |
| |
| clientInfos.remove(nodeId); |
| |
| // Unregister handlers created by left node. |
| for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { |
| UUID routineId = e.getKey(); |
| RemoteRoutineInfo info = e.getValue(); |
| |
| if (info.autoUnsubscribe && nodeId.equals(info.nodeId)) |
| unregisterRemote(routineId); |
| } |
| |
| for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { |
| SyncMessageAckFuture fut = e.getValue(); |
| |
| if (fut.nodeId().equals(nodeId)) { |
| SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); |
| |
| if (fut0 != null) { |
| ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( |
| "Node left grid while sending message to: " + nodeId); |
| |
| fut0.onDone(err); |
| } |
| } |
| } |
| } |
| }, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| ctx.event().addLocalEventListener(new GridLocalEventListener() { |
| @Override public void onEvent(Event evt) { |
| cancelFutures(new IgniteCheckedException("Topology segmented")); |
| } |
| }, EVT_NODE_SEGMENTED); |
| |
| ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class, |
| new CustomEventListener<StartRoutineDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StartRoutineDiscoveryMessage msg) { |
| if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping()) |
| processStartRequest(snd, msg); |
| } |
| }); |
| |
| ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, |
| new CustomEventListener<StartRoutineAckDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StartRoutineAckDiscoveryMessage msg) { |
| StartFuture fut = startFuts.remove(msg.routineId()); |
| |
| if (fut != null) { |
| if (msg.errs().isEmpty()) { |
| LocalRoutineInfo routine = locInfos.get(msg.routineId()); |
| |
| // Update partition counters. |
| if (routine != null && routine.handler().isQuery()) { |
| Map<UUID, Map<Integer, Long>> cntrsPerNode = msg.updateCountersPerNode(); |
| Map<Integer, Long> cntrs = msg.updateCounters(); |
| |
| GridCacheAdapter<Object, Object> interCache = |
| ctx.cache().internalCache(routine.handler().cacheName()); |
| |
| GridCacheContext cctx = interCache != null ? interCache.context() : null; |
| |
| if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) |
| cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); |
| |
| routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); |
| } |
| |
| fut.onRemoteRegistered(); |
| } |
| else { |
| IgniteCheckedException firstEx = F.first(msg.errs().values()); |
| |
| fut.onDone(firstEx); |
| |
| stopRoutine(msg.routineId()); |
| } |
| } |
| } |
| }); |
| |
| ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class, |
| new CustomEventListener<StopRoutineDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StopRoutineDiscoveryMessage msg) { |
| if (!snd.id().equals(ctx.localNodeId())) { |
| UUID routineId = msg.routineId(); |
| |
| unregisterRemote(routineId); |
| |
| if (snd.isClient()) { |
| Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id()); |
| |
| if (clientRoutineMap != null) |
| clientRoutineMap.remove(msg.routineId()); |
| } |
| } |
| } |
| }); |
| |
| ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class, |
| new CustomEventListener<StopRoutineAckDiscoveryMessage>() { |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, |
| ClusterNode snd, |
| StopRoutineAckDiscoveryMessage msg) { |
| StopFuture fut = stopFuts.remove(msg.routineId()); |
| |
| if (fut != null) |
| fut.onDone(); |
| } |
| }); |
| |
| ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object obj) { |
| GridContinuousMessage msg = (GridContinuousMessage)obj; |
| |
| if (msg.data() == null && msg.dataBytes() != null) { |
| try { |
| msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config()))); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to process message (ignoring): " + msg, e); |
| |
| return; |
| } |
| } |
| |
| switch (msg.type()) { |
| case MSG_EVT_NOTIFICATION: |
| processNotification(nodeId, msg); |
| |
| break; |
| |
| case MSG_EVT_ACK: |
| processMessageAck(msg); |
| |
| break; |
| |
| default: |
| assert false : "Unexpected message received: " + msg.type(); |
| } |
| } |
| }); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Continuous processor started."); |
| } |
| |
| /** |
| * @param e Error. |
| */ |
| private void cancelFutures(IgniteCheckedException e) { |
| for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) { |
| StartFuture fut = itr.next(); |
| |
| itr.remove(); |
| |
| fut.onDone(e); |
| } |
| |
| for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) { |
| StopFuture fut = itr.next(); |
| |
| itr.remove(); |
| |
| fut.onDone(e); |
| } |
| } |
| |
| /** |
| * @return {@code true} if lock successful, {@code false} if processor already stopped. |
| */ |
| @SuppressWarnings("LockAcquiredButNotSafelyReleased") |
| public boolean lockStopping() { |
| processorStopLock.readLock().lock(); |
| |
| if (processorStopped) { |
| processorStopLock.readLock().unlock(); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * |
| */ |
| public void unlockStopping() { |
| processorStopLock.readLock().unlock(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop(boolean cancel) { |
| processorStopLock.writeLock().lock(); |
| |
| try { |
| processorStopped = true; |
| } |
| finally { |
| processorStopLock.writeLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) throws IgniteCheckedException { |
| if (ctx.config().isDaemon()) |
| return; |
| |
| ctx.io().removeMessageListener(TOPIC_CONTINUOUS); |
| |
| for (IgniteThread thread : bufCheckThreads.values()) { |
| U.interrupt(thread); |
| U.join(thread); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Continuous processor stopped."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { |
| return DiscoveryDataExchangeType.CONTINUOUS_PROC; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { |
| if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { |
| Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size()); |
| |
| for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) { |
| Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size()); |
| |
| for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet()) |
| copy.put(e0.getKey(), e0.getValue()); |
| |
| clientInfos0.put(e.getKey(), copy); |
| } |
| |
| DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); |
| |
| // Collect listeners information (will be sent to joining node during discovery process). |
| for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { |
| UUID routineId = e.getKey(); |
| LocalRoutineInfo info = e.getValue(); |
| |
| data.addItem(new DiscoveryDataItem(routineId, |
| info.prjPred, |
| info.hnd, |
| info.bufSize, |
| info.interval, |
| info.autoUnsubscribe)); |
| } |
| |
| return data; |
| } |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) { |
| DiscoveryData data = (DiscoveryData)obj; |
| |
| if (!ctx.isDaemon() && data != null) { |
| for (DiscoveryDataItem item : data.items) { |
| try { |
| if (item.prjPred != null) |
| ctx.resource().injectGeneric(item.prjPred); |
| |
| // Register handler only if local node passes projection predicate. |
| if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) && |
| !locInfos.containsKey(item.routineId)) { |
| if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, |
| item.autoUnsubscribe, false)) |
| item.hnd.onListenerRegistered(item.routineId, ctx); |
| } |
| |
| if (!item.autoUnsubscribe) |
| // Register routine locally. |
| locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo( |
| item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe)); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to register continuous handler.", e); |
| } |
| } |
| |
| for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { |
| UUID clientNodeId = entry.getKey(); |
| |
| Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue(); |
| |
| for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) { |
| UUID routineId = e.getKey(); |
| LocalRoutineInfo info = e.getValue(); |
| |
| try { |
| if (info.prjPred != null) |
| ctx.resource().injectGeneric(info.prjPred); |
| |
| if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { |
| if (registerHandler(clientNodeId, |
| routineId, |
| info.hnd, |
| info.bufSize, |
| info.interval, |
| info.autoUnsubscribe, |
| false)) |
| info.hnd.onListenerRegistered(routineId, ctx); |
| } |
| } |
| catch (IgniteCheckedException err) { |
| U.error(log, "Failed to register continuous handler.", err); |
| } |
| } |
| |
| Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey()); |
| |
| if (map == null) { |
| map = new HashMap<>(); |
| |
| clientInfos.put(entry.getKey(), map); |
| } |
| |
| map.putAll(entry.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * Callback invoked when cache is started. |
| * |
| * @param ctx Cache context. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException { |
| for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) { |
| UUID routineId = entry.getKey(); |
| RemoteRoutineInfo rmtInfo = entry.getValue(); |
| |
| GridContinuousHandler hnd = rmtInfo.hnd; |
| |
| if (hnd.isQuery() && F.eq(ctx.name(), hnd.cacheName()) && rmtInfo.clearDelayedRegister()) { |
| GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx); |
| |
| assert status != GridContinuousHandler.RegisterStatus.DELAYED; |
| |
| if (status == GridContinuousHandler.RegisterStatus.REGISTERED) |
| hnd.onListenerRegistered(routineId, this.ctx); |
| } |
| } |
| } |
| |
| /** |
| * @param ctx Callback invoked when cache is stopped. |
| */ |
| public void onCacheStop(GridCacheContext ctx) { |
| Iterator<Map.Entry<UUID, RemoteRoutineInfo>> it = rmtInfos.entrySet().iterator(); |
| |
| while (it.hasNext()) { |
| Map.Entry<UUID, RemoteRoutineInfo> entry = it.next(); |
| |
| GridContinuousHandler hnd = entry.getValue().hnd; |
| |
| if (hnd.isQuery() && F.eq(ctx.name(), hnd.cacheName())) |
| it.remove(); |
| } |
| } |
| |
| /** |
| * @param hnd Handler. |
| * @param bufSize Buffer size. |
| * @param interval Time interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| * @param locOnly Local only flag. |
| * @param prjPred Projection predicate. |
| * @return Future. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd, |
| boolean locOnly, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe, |
| @Nullable IgnitePredicate<ClusterNode> prjPred) { |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| // Generate ID. |
| final UUID routineId = UUID.randomUUID(); |
| |
| // Register routine locally. |
| locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe)); |
| |
| if (locOnly) { |
| try { |
| registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); |
| |
| hnd.onListenerRegistered(routineId, ctx); |
| |
| return new GridFinishedFuture<>(routineId); |
| } |
| catch (IgniteCheckedException e) { |
| unregisterHandler(routineId, hnd, true); |
| |
| return new GridFinishedFuture<>(e); |
| } |
| } |
| |
| // Whether local node is included in routine. |
| boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); |
| |
| StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe); |
| |
| try { |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| // Handle peer deployment for projection predicate. |
| if (prjPred != null && !U.isGrid(prjPred.getClass())) { |
| Class cls = U.detectClass(prjPred); |
| |
| String clsName = cls.getName(); |
| |
| GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + prjPred); |
| |
| reqData.className(clsName); |
| reqData.deploymentInfo(new GridDeploymentInfoBean(dep)); |
| |
| reqData.p2pMarshal(marsh); |
| } |
| |
| // Handle peer deployment for other handler-specific objects. |
| reqData.handler().p2pMarshal(ctx); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| |
| // Register per-routine notifications listener if ordered messaging is used. |
| if (hnd.orderedTopic() != null) { |
| ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object obj) { |
| GridContinuousMessage msg = (GridContinuousMessage)obj; |
| |
| // Only notification can be ordered. |
| assert msg.type() == MSG_EVT_NOTIFICATION; |
| |
| if (msg.data() == null && msg.dataBytes() != null) { |
| try { |
| msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config()))); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to process message (ignoring): " + msg, e); |
| |
| return; |
| } |
| } |
| |
| processNotification(nodeId, msg); |
| } |
| }); |
| } |
| |
| StartFuture fut = new StartFuture(ctx, routineId); |
| |
| startFuts.put(routineId, fut); |
| |
| try { |
| if (locIncluded |
| && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) |
| hnd.onListenerRegistered(routineId, ctx); |
| |
| ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, |
| reqData.handler().keepBinary())); |
| } |
| catch (IgniteCheckedException e) { |
| startFuts.remove(routineId); |
| locInfos.remove(routineId); |
| |
| unregisterHandler(routineId, hnd, true); |
| |
| fut.onDone(e); |
| |
| return fut; |
| } |
| |
| // Handler is registered locally. |
| fut.onLocalRegistered(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param routineId Consume ID. |
| * @return Future. |
| */ |
| public IgniteInternalFuture<?> stopRoutine(UUID routineId) { |
| assert routineId != null; |
| |
| boolean doStop = false; |
| |
| StopFuture fut = stopFuts.get(routineId); |
| |
| // Only one thread will stop routine with provided ID. |
| if (fut == null) { |
| StopFuture old = stopFuts.putIfAbsent(routineId, fut = new StopFuture(ctx)); |
| |
| if (old != null) |
| fut = old; |
| else |
| doStop = true; |
| } |
| |
| if (doStop) { |
| // Unregister routine locally. |
| LocalRoutineInfo routine = locInfos.remove(routineId); |
| |
| // Finish if routine is not found (wrong ID is provided). |
| if (routine == null) { |
| stopFuts.remove(routineId); |
| |
| fut.onDone(); |
| |
| return fut; |
| } |
| |
| // Unregister handler locally. |
| unregisterHandler(routineId, routine.hnd, true); |
| |
| try { |
| ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); |
| } |
| catch (IgniteCheckedException e) { |
| fut.onDone(e); |
| } |
| |
| if (ctx.isStopping()) |
| fut.onDone(); |
| } |
| |
| return fut; |
| } |
| |
| /** |
| * @param nodeId ID of the node that started routine. |
| * @param routineId Routine ID. |
| * @param objs Notification objects. |
| * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void addBackupNotification(UUID nodeId, |
| final UUID routineId, |
| Collection<?> objs, |
| @Nullable Object orderedTopic) |
| throws IgniteCheckedException { |
| if (processorStopped) |
| return; |
| |
| final RemoteRoutineInfo info = rmtInfos.get(routineId); |
| |
| if (info != null) { |
| final GridContinuousBatch batch = info.addAll(objs); |
| |
| sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null); |
| } |
| } |
| |
| /** |
| * @param nodeId ID of the node that started routine. |
| * @param routineId Routine ID. |
| * @param obj Notification object. |
| * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. |
| * @param sync If {@code true} then waits for event acknowledgment. |
| * @param msg If {@code true} then sent data is message. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void addNotification(UUID nodeId, |
| final UUID routineId, |
| @Nullable Object obj, |
| @Nullable Object orderedTopic, |
| boolean sync, |
| boolean msg) |
| throws IgniteCheckedException { |
| assert nodeId != null; |
| assert routineId != null; |
| assert !msg || obj instanceof Message : obj; |
| |
| assert !nodeId.equals(ctx.localNodeId()); |
| |
| if (processorStopped) |
| return; |
| |
| final RemoteRoutineInfo info = rmtInfos.get(routineId); |
| |
| if (info != null) { |
| assert info.interval == 0 || !sync; |
| |
| if (sync) { |
| SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId); |
| |
| IgniteUuid futId = IgniteUuid.randomUuid(); |
| |
| syncMsgFuts.put(futId, fut); |
| |
| try { |
| sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null); |
| } |
| catch (IgniteCheckedException e) { |
| syncMsgFuts.remove(futId); |
| |
| throw e; |
| } |
| |
| fut.get(); |
| } |
| else { |
| final GridContinuousBatch batch = info.add(obj); |
| |
| if (batch != null) { |
| CI1<IgniteException> ackC = new CI1<IgniteException>() { |
| @Override public void apply(IgniteException e) { |
| if (e == null) |
| info.hnd.onBatchAcknowledged(routineId, batch, ctx); |
| } |
| }; |
| |
| sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC); |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { |
| cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected.")); |
| |
| for (UUID rmtId : rmtInfos.keySet()) |
| unregisterRemote(rmtId); |
| |
| rmtInfos.clear(); |
| |
| clientInfos.clear(); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param routineId Routine ID. |
| * @param futId Future ID. |
| * @param toSnd Notification object to send. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param msg If {@code true} then sent data is collection of messages. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendNotification(UUID nodeId, |
| UUID routineId, |
| @Nullable IgniteUuid futId, |
| Collection<Object> toSnd, |
| @Nullable Object orderedTopic, |
| boolean msg, |
| IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert nodeId != null; |
| assert routineId != null; |
| assert toSnd != null; |
| assert !toSnd.isEmpty(); |
| |
| sendWithRetries(nodeId, |
| new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg), |
| orderedTopic, |
| ackC); |
| } |
| |
| /** |
| * @param node Sender. |
| * @param req Start request. |
| */ |
| private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) { |
| UUID routineId = req.routineId(); |
| StartRequestData data = req.startRequestData(); |
| |
| GridContinuousHandler hnd = data.handler(); |
| |
| if (req.keepBinary()) { |
| assert hnd instanceof CacheContinuousQueryHandler; |
| |
| ((CacheContinuousQueryHandler)hnd).keepBinary(true); |
| } |
| |
| IgniteCheckedException err = null; |
| |
| try { |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| String clsName = data.className(); |
| |
| if (clsName != null) { |
| GridDeploymentInfo depInfo = data.deploymentInfo(); |
| |
| GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, |
| depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); |
| |
| data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config())); |
| } |
| |
| hnd.p2pUnmarshal(node.id(), ctx); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| |
| U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); |
| } |
| |
| GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? |
| new GridMessageListenHandler((GridMessageListenHandler)hnd) : |
| hnd; |
| |
| if (node.isClient()) { |
| Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id()); |
| |
| if (clientRoutineMap == null) { |
| clientRoutineMap = new HashMap<>(); |
| |
| Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRoutineMap); |
| |
| assert old == null; |
| } |
| |
| clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), |
| hnd0, |
| data.bufferSize(), |
| data.interval(), |
| data.autoUnsubscribe())); |
| } |
| |
| boolean registered = false; |
| |
| if (err == null) { |
| try { |
| IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate(); |
| |
| if (prjPred != null) |
| ctx.resource().injectGeneric(prjPred); |
| |
| if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && |
| !locInfos.containsKey(routineId)) { |
| registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(), |
| data.autoUnsubscribe(), false); |
| } |
| |
| if (!data.autoUnsubscribe()) |
| // Register routine locally. |
| locInfos.putIfAbsent(routineId, new LocalRoutineInfo( |
| prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe())); |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| |
| U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); |
| } |
| } |
| |
| // Load partition counters. |
| if (hnd0.isQuery()) { |
| GridCacheProcessor proc = ctx.cache(); |
| |
| if (proc != null) { |
| GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); |
| |
| if (cache != null && !cache.isLocal() && cache.context().userCache()) |
| req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters()); |
| } |
| } |
| |
| if (err != null) |
| req.addError(ctx.localNodeId(), err); |
| |
| if (registered) |
| hnd0.onListenerRegistered(routineId, ctx); |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processMessageAck(GridContinuousMessage msg) { |
| assert msg.futureId() != null; |
| |
| SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId()); |
| |
| if (fut != null) |
| fut.onDone(); |
| } |
| |
| /** |
| * @param nodeId Sender ID. |
| * @param msg Message. |
| */ |
| private void processNotification(UUID nodeId, GridContinuousMessage msg) { |
| assert nodeId != null; |
| assert msg != null; |
| |
| UUID routineId = msg.routineId(); |
| |
| try { |
| LocalRoutineInfo routine = locInfos.get(routineId); |
| |
| if (routine != null) |
| routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)msg.data(), ctx); |
| } |
| finally { |
| if (msg.futureId() != null) { |
| try { |
| sendWithRetries(nodeId, |
| new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false), |
| null, |
| null); |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to send event acknowledgment to node: " + nodeId, e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param routineId Consume ID. |
| * @param hnd Handler. |
| * @param bufSize Buffer size. |
| * @param interval Time interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| * @param loc Local registration flag. |
| * @return Whether listener was actually registered. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private boolean registerHandler(final UUID nodeId, |
| final UUID routineId, |
| final GridContinuousHandler hnd, |
| int bufSize, |
| final long interval, |
| boolean autoUnsubscribe, |
| boolean loc) throws IgniteCheckedException { |
| assert nodeId != null; |
| assert routineId != null; |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| final RemoteRoutineInfo info = new RemoteRoutineInfo(nodeId, hnd, bufSize, interval, autoUnsubscribe); |
| |
| boolean doRegister = loc; |
| |
| if (!doRegister) { |
| stopLock.lock(); |
| |
| try { |
| doRegister = !stopped.remove(routineId) && rmtInfos.putIfAbsent(routineId, info) == null; |
| } |
| finally { |
| stopLock.unlock(); |
| } |
| } |
| |
| if (doRegister) { |
| if (interval > 0) { |
| IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker", log) { |
| @SuppressWarnings("ConstantConditions") |
| @Override protected void body() { |
| long interval0 = interval; |
| |
| while (!isCancelled()) { |
| try { |
| U.sleep(interval0); |
| } |
| catch (IgniteInterruptedCheckedException ignored) { |
| break; |
| } |
| |
| IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval(); |
| |
| final GridContinuousBatch batch = t.get1(); |
| |
| if (batch != null && batch.size() > 0) { |
| try { |
| Collection<Object> toSnd = batch.collect(); |
| |
| boolean msg = toSnd.iterator().next() instanceof Message; |
| |
| CI1<IgniteException> ackC = new CI1<IgniteException>() { |
| @Override public void apply(IgniteException e) { |
| if (e == null) |
| info.hnd.onBatchAcknowledged(routineId, batch, ctx); |
| } |
| }; |
| |
| sendNotification(nodeId, |
| routineId, |
| null, |
| toSnd, |
| hnd.orderedTopic(), |
| msg, |
| ackC); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send notification to node (is node alive?): " + nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send notification to node: " + nodeId, e); |
| } |
| } |
| |
| interval0 = t.get2(); |
| } |
| } |
| }); |
| |
| bufCheckThreads.put(routineId, checker); |
| |
| checker.start(); |
| } |
| |
| GridContinuousHandler.RegisterStatus status = hnd.register(nodeId, routineId, ctx); |
| |
| if (status == GridContinuousHandler.RegisterStatus.DELAYED) { |
| info.markDelayedRegister(); |
| |
| return false; |
| } |
| else |
| return status == GridContinuousHandler.RegisterStatus.REGISTERED; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param routineId Routine ID. |
| * @param hnd Handler |
| * @param loc If Handler unregistered on master node. |
| */ |
| private void unregisterHandler(UUID routineId, GridContinuousHandler hnd, boolean loc) { |
| assert routineId != null; |
| assert hnd != null; |
| |
| if (loc && hnd.orderedTopic() != null) |
| ctx.io().removeMessageListener(hnd.orderedTopic()); |
| |
| hnd.unregister(routineId, ctx); |
| |
| IgniteThread checker = bufCheckThreads.remove(routineId); |
| |
| if (checker != null) |
| checker.interrupt(); |
| } |
| |
| /** |
| * @param routineId Routine ID. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| private void unregisterRemote(UUID routineId) { |
| RemoteRoutineInfo remote; |
| LocalRoutineInfo loc; |
| |
| stopLock.lock(); |
| |
| try { |
| remote = rmtInfos.remove(routineId); |
| |
| loc = locInfos.remove(routineId); |
| |
| if (remote == null) |
| stopped.add(routineId); |
| } |
| finally { |
| stopLock.unlock(); |
| } |
| |
| if (remote != null) |
| unregisterHandler(routineId, remote.hnd, false); |
| else if (loc != null) { |
| // Removes routine at node started it when stopRoutine called from another node. |
| unregisterHandler(routineId, loc.hnd, false); |
| } |
| } |
| |
| /** |
| * @param nodeId Destination node ID. |
| * @param msg Message. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic, |
| IgniteInClosure<IgniteException> ackC) |
| throws IgniteCheckedException { |
| assert nodeId != null; |
| assert msg != null; |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node != null) |
| sendWithRetries(node, msg, orderedTopic, ackC); |
| else |
| throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId); |
| } |
| |
| /** |
| * @param node Destination node. |
| * @param msg Message. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic, |
| IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert node != null; |
| assert msg != null; |
| |
| sendWithRetries(F.asList(node), msg, orderedTopic, ackC); |
| } |
| |
| /** |
| * @param nodes Destination nodes. |
| * @param msg Message. |
| * @param orderedTopic Topic for ordered notifications. |
| * If {@code null}, non-ordered message will be sent. |
| * @param ackC Ack closure. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, |
| @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { |
| assert !F.isEmpty(nodes); |
| assert msg != null; |
| |
| if (!msg.messages() && |
| msg.data() != null && |
| (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id()))) |
| msg.dataBytes(marsh.marshal(msg.data())); |
| |
| for (ClusterNode node : nodes) { |
| int cnt = 0; |
| |
| while (cnt <= retryCnt) { |
| try { |
| cnt++; |
| |
| if (orderedTopic != null) { |
| ctx.io().sendOrderedMessage( |
| node, |
| orderedTopic, |
| msg, |
| SYSTEM_POOL, |
| 0, |
| true, |
| ackC); |
| } |
| else |
| ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC); |
| |
| break; |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw e; |
| } |
| catch (IgniteCheckedException e) { |
| if (!ctx.discovery().alive(node.id())) |
| throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e); |
| |
| if (cnt == retryCnt) |
| throw e; |
| else if (log.isDebugEnabled()) |
| log.debug("Failed to send message to node (will retry): " + node.id()); |
| } |
| |
| U.sleep(retryDelay); |
| } |
| } |
| } |
| |
| /** |
| * Local routine info. |
| */ |
| @SuppressWarnings("PackageVisibleInnerClass") |
| static class LocalRoutineInfo implements Serializable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Projection predicate. */ |
| private final IgnitePredicate<ClusterNode> prjPred; |
| |
| /** Continuous routine handler. */ |
| private final GridContinuousHandler hnd; |
| |
| /** Buffer size. */ |
| private final int bufSize; |
| |
| /** Time interval. */ |
| private final long interval; |
| |
| /** Automatic unsubscribe flag. */ |
| private boolean autoUnsubscribe; |
| |
| /** |
| * @param prjPred Projection predicate. |
| * @param hnd Continuous routine handler. |
| * @param bufSize Buffer size. |
| * @param interval Interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| */ |
| LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, |
| GridContinuousHandler hnd, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe) |
| { |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| this.prjPred = prjPred; |
| this.hnd = hnd; |
| this.bufSize = bufSize; |
| this.interval = interval; |
| this.autoUnsubscribe = autoUnsubscribe; |
| } |
| |
| /** |
| * @return Handler. |
| */ |
| GridContinuousHandler handler() { |
| return hnd; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(LocalRoutineInfo.class, this); |
| } |
| } |
| |
| /** |
| * Remote routine info. |
| */ |
| private static class RemoteRoutineInfo { |
| /** Master node ID. */ |
| private UUID nodeId; |
| |
| /** Continuous routine handler. */ |
| private final GridContinuousHandler hnd; |
| |
| /** Buffer size. */ |
| private final int bufSize; |
| |
| /** Time interval. */ |
| private final long interval; |
| |
| /** Lock. */ |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| /** Batch. */ |
| private GridContinuousBatch batch; |
| |
| /** Last send time. */ |
| private long lastSndTime = U.currentTimeMillis(); |
| |
| /** Automatic unsubscribe flag. */ |
| private boolean autoUnsubscribe; |
| |
| /** Delayed register flag. */ |
| private boolean delayedRegister; |
| |
| /** |
| * @param nodeId Master node ID. |
| * @param hnd Continuous routine handler. |
| * @param bufSize Buffer size. |
| * @param interval Interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| */ |
| RemoteRoutineInfo(UUID nodeId, GridContinuousHandler hnd, int bufSize, long interval, |
| boolean autoUnsubscribe) { |
| assert nodeId != null; |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| this.nodeId = nodeId; |
| this.hnd = hnd; |
| this.bufSize = bufSize; |
| this.interval = interval; |
| this.autoUnsubscribe = autoUnsubscribe; |
| |
| batch = hnd.createBatch(); |
| } |
| |
| /** |
| * Marks info to be registered when cache is started. |
| */ |
| public void markDelayedRegister() { |
| assert hnd.isQuery(); |
| |
| delayedRegister = true; |
| } |
| |
| /** |
| * Clears delayed register flag if it was set. |
| * |
| * @return {@code True} if flag was cleared. |
| */ |
| public boolean clearDelayedRegister() { |
| if (delayedRegister) { |
| delayedRegister = false; |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param objs Objects to add. |
| * @return Batch to send. |
| */ |
| GridContinuousBatch addAll(Collection<?> objs) { |
| assert objs != null; |
| assert objs.size() > 0; |
| |
| GridContinuousBatch toSnd = null; |
| |
| lock.writeLock().lock(); |
| |
| try { |
| for (Object obj : objs) |
| batch.add(obj); |
| |
| toSnd = batch; |
| |
| batch = hnd.createBatch(); |
| |
| if (interval > 0) |
| lastSndTime = U.currentTimeMillis(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| |
| return toSnd; |
| } |
| |
| /** |
| * @param obj Object to add. |
| * @return Batch to send or {@code null} if there is nothing to send for now. |
| */ |
| @Nullable GridContinuousBatch add(Object obj) { |
| assert obj != null; |
| |
| GridContinuousBatch toSnd = null; |
| |
| if (batch.size() >= bufSize - 1) { |
| lock.writeLock().lock(); |
| |
| try { |
| batch.add(obj); |
| |
| toSnd = batch; |
| |
| batch = hnd.createBatch(); |
| |
| if (interval > 0) |
| lastSndTime = U.currentTimeMillis(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| else { |
| lock.readLock().lock(); |
| |
| try { |
| batch.add(obj); |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| return toSnd; |
| } |
| |
| /** |
| * @return Tuple with batch to send (or {@code null} if there is nothing to |
| * send for now) and time interval after next check is needed. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| IgniteBiTuple<GridContinuousBatch, Long> checkInterval() { |
| assert interval > 0; |
| |
| GridContinuousBatch toSnd = null; |
| long diff; |
| |
| long now = U.currentTimeMillis(); |
| |
| lock.writeLock().lock(); |
| |
| try { |
| diff = now - lastSndTime; |
| |
| if (diff >= interval && batch.size() > 0) { |
| toSnd = batch; |
| |
| batch = hnd.createBatch(); |
| |
| lastSndTime = now; |
| } |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| |
| return F.t(toSnd, diff < interval ? interval - diff : interval); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(RemoteRoutineInfo.class, this); |
| } |
| } |
| |
| /** |
| * Discovery data. |
| */ |
| private static class DiscoveryData implements Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Node ID. */ |
| private UUID nodeId; |
| |
| /** Items. */ |
| @GridToStringInclude |
| private Collection<DiscoveryDataItem> items; |
| |
| /** */ |
| private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public DiscoveryData() { |
| // No-op. |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param clientInfos Client information. |
| */ |
| DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { |
| assert nodeId != null; |
| |
| this.nodeId = nodeId; |
| |
| this.clientInfos = clientInfos; |
| |
| items = new ArrayList<>(); |
| } |
| |
| /** |
| * @param item Item. |
| */ |
| public void addItem(DiscoveryDataItem item) { |
| items.add(item); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeUuid(out, nodeId); |
| U.writeCollection(out, items); |
| U.writeMap(out, clientInfos); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| nodeId = U.readUuid(in); |
| items = U.readCollection(in); |
| clientInfos = U.readMap(in); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DiscoveryData.class, this); |
| } |
| } |
| |
| /** |
| * Discovery data item. |
| */ |
| private static class DiscoveryDataItem implements Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Consume ID. */ |
| private UUID routineId; |
| |
| /** Projection predicate. */ |
| private IgnitePredicate<ClusterNode> prjPred; |
| |
| /** Handler. */ |
| private GridContinuousHandler hnd; |
| |
| /** Buffer size. */ |
| private int bufSize; |
| |
| /** Time interval. */ |
| private long interval; |
| |
| /** Automatic unsubscribe flag. */ |
| private boolean autoUnsubscribe; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public DiscoveryDataItem() { |
| // No-op. |
| } |
| |
| /** |
| * @param routineId Consume ID. |
| * @param prjPred Projection predicate. |
| * @param hnd Handler. |
| * @param bufSize Buffer size. |
| * @param interval Time interval. |
| * @param autoUnsubscribe Automatic unsubscribe flag. |
| */ |
| DiscoveryDataItem(UUID routineId, |
| @Nullable IgnitePredicate<ClusterNode> prjPred, |
| GridContinuousHandler hnd, |
| int bufSize, |
| long interval, |
| boolean autoUnsubscribe) |
| { |
| assert routineId != null; |
| assert hnd != null; |
| assert bufSize > 0; |
| assert interval >= 0; |
| |
| this.routineId = routineId; |
| this.prjPred = prjPred; |
| this.hnd = hnd; |
| this.bufSize = bufSize; |
| this.interval = interval; |
| this.autoUnsubscribe = autoUnsubscribe; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeUuid(out, routineId); |
| out.writeObject(prjPred); |
| out.writeObject(hnd); |
| out.writeInt(bufSize); |
| out.writeLong(interval); |
| out.writeBoolean(autoUnsubscribe); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| routineId = U.readUuid(in); |
| prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); |
| hnd = (GridContinuousHandler)in.readObject(); |
| bufSize = in.readInt(); |
| interval = in.readLong(); |
| autoUnsubscribe = in.readBoolean(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(DiscoveryDataItem.class, this); |
| } |
| } |
| |
| /** |
| * Future for start routine. |
| */ |
| private static class StartFuture extends GridFutureAdapter<UUID> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private GridKernalContext ctx; |
| |
| /** Consume ID. */ |
| private UUID routineId; |
| |
| /** Local listener is registered. */ |
| private volatile boolean loc; |
| |
| /** All remote listeners are registered. */ |
| private volatile boolean rmt; |
| |
| /** Timeout object. */ |
| private volatile GridTimeoutObject timeoutObj; |
| |
| /** |
| * @param ctx Kernal context. |
| * @param routineId Consume ID. |
| */ |
| StartFuture(GridKernalContext ctx, UUID routineId) { |
| this.ctx = ctx; |
| |
| this.routineId = routineId; |
| } |
| |
| /** |
| * Called when local listener is registered. |
| */ |
| public void onLocalRegistered() { |
| loc = true; |
| |
| if (rmt && !isDone()) |
| onDone(routineId); |
| } |
| |
| /** |
| * Called when all remote listeners are registered. |
| */ |
| public void onRemoteRegistered() { |
| rmt = true; |
| |
| if (loc && !isDone()) |
| onDone(routineId); |
| } |
| |
| /** |
| * @param timeoutObj Timeout object. |
| */ |
| public void addTimeoutObject(GridTimeoutObject timeoutObj) { |
| assert timeoutObj != null; |
| |
| this.timeoutObj = timeoutObj; |
| |
| ctx.timeout().addTimeoutObject(timeoutObj); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable UUID res, @Nullable Throwable err) { |
| if (timeoutObj != null) |
| ctx.timeout().removeTimeoutObject(timeoutObj); |
| |
| return super.onDone(res, err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(StartFuture.class, this); |
| } |
| } |
| |
| /** |
| * Future for stop routine. |
| */ |
| private static class StopFuture extends GridFutureAdapter<Object> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Timeout object. */ |
| private volatile GridTimeoutObject timeoutObj; |
| |
| /** */ |
| private GridKernalContext ctx; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| StopFuture(GridKernalContext ctx) { |
| this.ctx = ctx; |
| } |
| |
| /** |
| * @param timeoutObj Timeout object. |
| */ |
| public void addTimeoutObject(GridTimeoutObject timeoutObj) { |
| assert timeoutObj != null; |
| |
| this.timeoutObj = timeoutObj; |
| |
| ctx.timeout().addTimeoutObject(timeoutObj); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { |
| if (timeoutObj != null) |
| ctx.timeout().removeTimeoutObject(timeoutObj); |
| |
| return super.onDone(res, err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(StopFuture.class, this); |
| } |
| } |
| |
| /** |
| * Synchronous message acknowledgement future. |
| */ |
| private static class SyncMessageAckFuture extends GridFutureAdapter<Object> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private UUID nodeId; |
| |
| /** |
| * @param nodeId Master node ID. |
| */ |
| SyncMessageAckFuture(UUID nodeId) { |
| this.nodeId = nodeId; |
| } |
| |
| /** |
| * @return Master node ID. |
| */ |
| UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(SyncMessageAckFuture.class, this); |
| } |
| } |
| } |