| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.query.continuous; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import javax.cache.event.CacheEntryEvent; |
| import javax.cache.event.CacheEntryEventFilter; |
| import javax.cache.event.CacheEntryListener; |
| import javax.cache.event.CacheEntryUpdatedListener; |
| import javax.cache.event.EventType; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.SystemProperty; |
| import org.apache.ignite.cache.CacheEntryEventSerializableFilter; |
| import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.CacheQueryExecutedEvent; |
| import org.apache.ignite.events.CacheQueryReadEvent; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoPolicy; |
| import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; |
| import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.query.CacheQueryType; |
| import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener; |
| import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; |
| import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; |
| import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; |
| import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch; |
| import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.lang.GridPlainRunnable; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteAsyncCallback; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap; |
| import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry.createFilteredEntry; |
| |
| /** |
| * Continuous query handler. |
| */ |
| public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** @see #IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD */ |
| public static final int DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD = 100; |
| |
| /** @see #IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE */ |
| public static final int DFLT_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE = 10_000; |
| |
| /** */ |
| @SystemProperty(value = "The size of the buffer with acknowledgment events that are sent to backup nodes", |
| type = Long.class, defaults = "" + DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD) |
| public static final String IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD = "IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD"; |
| |
| /** */ |
| @SystemProperty(value = "The maximum size of the continuous query listener buffer. " + |
| "10% of events are dropped once the buffer is full", type = Long.class, |
| defaults = "" + DFLT_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE) |
| public static final String IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE = "IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE"; |
| |
| /** */ |
| static final int BACKUP_ACK_THRESHOLD = |
| IgniteSystemProperties.getInteger(IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD, |
| DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD); |
| |
| /** */ |
| static final int LSNR_MAX_BUF_SIZE = |
| IgniteSystemProperties.getInteger(IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE, |
| DFLT_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE); |
| |
| /** |
| * Transformer implementation for processing received remote events. |
| * They are already transformed so we simply return transformed value for event. |
| */ |
| private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> returnValTrans = |
| new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, Object>() { |
| @Override public Object apply(CacheEntryEvent<? extends K, ? extends V> evt) { |
| assert evt.getKey() == null; |
| |
| return evt.getValue(); |
| } |
| }; |
| |
| /** Cache name. */ |
| private String cacheName; |
| |
| /** Topic for ordered messages. */ |
| private Object topic; |
| |
| /** P2P unmarshalling future. */ |
| protected transient IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>(); |
| |
| /** Initialization future. */ |
| protected transient IgniteInternalFuture<Void> initFut; |
| |
| /** Local listener. */ |
| private transient CacheEntryUpdatedListener<K, V> locLsnr; |
| |
| /** Remote filter. */ |
| private CacheEntryEventSerializableFilter<K, V> rmtFilter; |
| |
| /** Deployable object for filter. */ |
| private CacheContinuousQueryDeployableObject rmtFilterDep; |
| |
| /** Internal flag. */ |
| private boolean internal; |
| |
| /** Notify existing flag. */ |
| private boolean notifyExisting; |
| |
| /** Old value required flag. */ |
| private boolean oldValRequired; |
| |
| /** Synchronous flag. */ |
| private boolean sync; |
| |
| /** Ignore expired events flag. */ |
| private boolean ignoreExpired; |
| |
| /** Task name hash code. */ |
| private int taskHash; |
| |
| /** Whether to skip primary check for REPLICATED cache. */ |
| private transient boolean skipPrimaryCheck; |
| |
| /** */ |
| private transient boolean locOnly; |
| |
| /** */ |
| private boolean keepBinary; |
| |
| /** */ |
| private transient ConcurrentMap<Integer, CacheContinuousQueryPartitionRecovery> rcvs; |
| |
| /** */ |
| private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs; |
| |
| /** */ |
| private transient CacheContinuousQueryAcknowledgeBuffer ackBuf; |
| |
| /** */ |
| private transient int cacheId; |
| |
| /** */ |
| private transient volatile Map<Integer, T2<Long, Long>> initUpdCntrs; |
| |
| /** */ |
| private transient volatile Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode; |
| |
| /** */ |
| private transient volatile AffinityTopologyVersion initTopVer; |
| |
| /** */ |
| private transient volatile boolean nodeLeft; |
| |
| /** */ |
| private transient boolean ignoreClsNotFound; |
| |
| /** */ |
| transient boolean asyncCb; |
| |
| /** */ |
| private transient UUID nodeId; |
| |
| /** */ |
| private transient UUID routineId; |
| |
| /** Local update counters values on listener start. Used for skipping events fired before the listener start. */ |
| private transient volatile Map<Integer, T2<Long, Long>> locInitUpdCntrs; |
| |
| /** */ |
| private transient GridKernalContext ctx; |
| |
| /** */ |
| private transient IgniteLogger log; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public CacheContinuousQueryHandler() { |
| // No-op. |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param cacheName Cache name. |
| * @param topic Topic for ordered messages. |
| * @param locLsnr Local listener. |
| * @param rmtFilter Remote filter. |
| * @param oldValRequired Old value required flag. |
| * @param sync Synchronous flag. |
| * @param ignoreExpired Ignore expired events flag. |
| */ |
| public CacheContinuousQueryHandler( |
| String cacheName, |
| Object topic, |
| @Nullable CacheEntryUpdatedListener<K, V> locLsnr, |
| @Nullable CacheEntryEventSerializableFilter<K, V> rmtFilter, |
| boolean oldValRequired, |
| boolean sync, |
| boolean ignoreExpired, |
| boolean ignoreClsNotFound) { |
| assert topic != null; |
| |
| this.cacheName = cacheName; |
| this.topic = topic; |
| this.locLsnr = locLsnr; |
| this.rmtFilter = rmtFilter; |
| this.oldValRequired = oldValRequired; |
| this.sync = sync; |
| this.ignoreExpired = ignoreExpired; |
| this.ignoreClsNotFound = ignoreClsNotFound; |
| |
| cacheId = CU.cacheId(cacheName); |
| } |
| |
| /** |
| * @param internal Internal query. |
| */ |
| public void internal(boolean internal) { |
| this.internal = internal; |
| } |
| |
| /** |
| * @param notifyExisting Notify existing. |
| */ |
| public void notifyExisting(boolean notifyExisting) { |
| this.notifyExisting = notifyExisting; |
| } |
| |
| /** @return {@code True} if handler should obtain existing entries,{@code false} otherwise. */ |
| public boolean notifyExisting() { |
| return notifyExisting; |
| } |
| |
| /** @return {@code True} if old value required for handler, {@code false} otherwise. */ |
| public boolean oldValueRequired() { |
| return oldValRequired; |
| } |
| |
| /** @return Local listener. */ |
| public CacheEntryUpdatedListener<K, V> localListener() { |
| return locLsnr; |
| } |
| |
| /** |
| * @param locOnly Local only. |
| */ |
| public void localOnly(boolean locOnly) { |
| this.locOnly = locOnly; |
| } |
| |
| /** @return {@code True} if handler are local only, {@code false} otherwise. */ |
| public boolean localOnly() { |
| return locOnly; |
| } |
| |
| /** |
| * @param taskHash Task hash. |
| */ |
| public void taskNameHash(int taskHash) { |
| this.taskHash = taskHash; |
| } |
| |
| /** |
| * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. |
| */ |
| public void skipPrimaryCheck(boolean skipPrimaryCheck) { |
| this.skipPrimaryCheck = skipPrimaryCheck; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isEvents() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isMessaging() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isQuery() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean keepBinary() { |
| return keepBinary; |
| } |
| |
| /** |
| * @param keepBinary Keep binary flag. |
| */ |
| public void keepBinary(boolean keepBinary) { |
| this.keepBinary = keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String cacheName() { |
| return cacheName; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode, |
| Map<Integer, T2<Long, Long>> cntrs) { |
| this.initUpdCntrsPerNode = cntrsPerNode; |
| this.initUpdCntrs = cntrs; |
| this.initTopVer = topVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Map<Integer, T2<Long, Long>> updateCounters() { |
| return locInitUpdCntrs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) |
| throws IgniteCheckedException { |
| assert nodeId != null; |
| assert routineId != null; |
| assert ctx != null; |
| |
| initLocalListener(locLsnr, ctx); |
| |
| if (initFut == null) { |
| initFut = p2pUnmarshalFut.chain((fut) -> { |
| try { |
| fut.get(); |
| |
| initRemoteFilter(getEventFilter0(), ctx); |
| |
| IgniteClosure trans = getTransformer0(); |
| |
| if (trans != null) |
| ctx.resource().injectGeneric(trans); |
| } |
| catch (IgniteCheckedException | ExceptionInInitializerError e) { |
| throw new IgniteException("Failed to initialize a continuous query.", e); |
| } |
| |
| return null; |
| }); |
| } |
| |
| if (initFut.error() != null) |
| throw new IgniteCheckedException("Failed to initialize a continuous query.", initFut.error()); |
| |
| entryBufs = new ConcurrentHashMap<>(); |
| |
| ackBuf = new CacheContinuousQueryAcknowledgeBuffer(); |
| |
| rcvs = new ConcurrentHashMap<>(); |
| |
| this.nodeId = nodeId; |
| |
| this.routineId = routineId; |
| |
| this.ctx = ctx; |
| |
| final boolean loc = nodeId.equals(ctx.localNodeId()); |
| |
| assert !skipPrimaryCheck || loc; |
| |
| log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY); |
| |
| CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { |
| @Override public void onBeforeRegister() { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| if (cctx != null && !cctx.isLocal()) |
| cctx.topology().readLock(); |
| } |
| |
| @Override public void onAfterRegister() { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| if (cctx != null && !cctx.isLocal()) |
| cctx.topology().readUnlock(); |
| } |
| |
| @Override public void onRegister() { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| if (cctx != null && !cctx.isLocal()) |
| locInitUpdCntrs = toCountersMap(cctx.topology().localUpdateCounters(false)); |
| } |
| |
| @Override public boolean keepBinary() { |
| return keepBinary; |
| } |
| |
| @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt, |
| boolean primary, |
| final boolean recordIgniteEvt, |
| GridDhtAtomicAbstractUpdateFuture fut) { |
| if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) |
| return; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Entry updated on affinity node [evt=" + evt + ", primary=" + primary + ']'); |
| |
| final GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| // Check that cache stopped. |
| if (cctx == null) |
| return; |
| |
| if (!needNotify(false, cctx, -1, -1, evt)) |
| return; |
| |
| // skipPrimaryCheck is set only when listen locally for replicated cache events. |
| assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); |
| |
| if (asyncCb) { |
| ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure( |
| primary, |
| evt, |
| recordIgniteEvt, |
| fut); |
| |
| ctx.pools().asyncCallbackPool().execute(clsr, evt.partitionId()); |
| } |
| else { |
| final boolean notify = filter(evt); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary |
| + ", notify=" + notify + ']'); |
| |
| if (primary || skipPrimaryCheck) |
| onEntryUpdate(evt, notify, loc, recordIgniteEvt); |
| else |
| handleBackupEntry(cctx, evt.entry()); |
| } |
| } |
| |
| @Override public void onUnregister() { |
| try { |
| CacheEntryEventFilter filter = getEventFilter(); |
| |
| if (filter instanceof PlatformContinuousQueryFilter) |
| ((PlatformContinuousQueryFilter)filter).onQueryUnregister(); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to execute the onUnregister callback " + |
| "on the continuoue query listener. " + |
| "[nodeId=" + nodeId + ", routineId=" + routineId + ", cacheName=" + cacheName + |
| ", err=" + e + "]"); |
| } |
| } |
| } |
| |
| @Override public void cleanupOnAck(Map<Integer, Long> updateCntrs) { |
| for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) { |
| CacheContinuousQueryEventBuffer buf = entryBufs.get(e.getKey()); |
| |
| if (buf != null) |
| buf.cleanupOnAck(e.getValue()); |
| } |
| } |
| |
| @Override public void flushOnExchangeDone(GridKernalContext ctx, AffinityTopologyVersion topVer) { |
| assert topVer != null; |
| |
| try { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { |
| CacheContinuousQueryEventBuffer buf = bufE.getValue(); |
| |
| Collection<CacheContinuousQueryEntry> entries = buf.flushOnExchange((cntr, filtered) -> |
| createFilteredEntry(cctx.cacheId(), bufE.getKey(), topVer, cntr, filtered)); |
| |
| if (entries == null || node == null) |
| continue; |
| |
| for (CacheContinuousQueryEntry e : entries) { |
| e.markBackup(); |
| |
| if (!e.isFiltered()) |
| prepareEntry(cctx, nodeId, e); |
| } |
| |
| ctx.continuous().addBackupNotification(nodeId, routineId, entries, topic); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), |
| "Failed to send backup event notification to node: " + nodeId, e); |
| } |
| } |
| |
| @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) { |
| sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); |
| } |
| |
| @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, |
| AffinityTopologyVersion topVer, boolean primary) { |
| assert evt != null; |
| |
| CacheContinuousQueryEntry e = evt.entry(); |
| |
| e.markFiltered(); |
| |
| onEntryUpdated(evt, primary, false, null); |
| } |
| |
| @Override public CounterSkipContext skipUpdateCounter(final GridCacheContext cctx, |
| @Nullable CounterSkipContext skipCtx, |
| int part, |
| long cntr, |
| AffinityTopologyVersion topVer, |
| boolean primary) { |
| if (skipCtx == null) |
| skipCtx = new CounterSkipContext(part, cntr, topVer); |
| |
| if (!needNotify(true, cctx, part, cntr, null)) |
| return skipCtx; |
| |
| if (loc) { |
| assert !locOnly; |
| |
| final Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, skipCtx.entry()); |
| |
| if (!evts.isEmpty()) { |
| if (asyncCb) { |
| ctx.pools().asyncCallbackPool().execute(new Runnable() { |
| @Override public void run() { |
| try { |
| notifyLocalListener(evts, getTransformer()); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), |
| "Failed to notify local listener.", ex); |
| } |
| } |
| }, part); |
| } |
| else |
| skipCtx.addProcessClosure(new Runnable() { |
| @Override public void run() { |
| try { |
| notifyLocalListener(evts, getTransformer()); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), |
| "Failed to notify local listener.", ex); |
| } |
| } |
| }); |
| } |
| |
| return skipCtx; |
| } |
| |
| CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part); |
| |
| final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary); |
| |
| if (entryOrList != null) { |
| skipCtx.addProcessClosure(new Runnable() { |
| @Override public void run() { |
| try { |
| ctx.continuous().addNotification(nodeId, |
| routineId, |
| entryOrList, |
| topic, |
| false, |
| true); |
| } |
| catch (ClusterTopologyCheckedException ex) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send event notification to node, node left cluster " + |
| "[node=" + nodeId + ", err=" + ex + ']'); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), |
| "Failed to send event notification to node: " + nodeId, ex); |
| } |
| } |
| }); |
| } |
| |
| return skipCtx; |
| } |
| |
| @Override public void onPartitionEvicted(int part) { |
| entryBufs.remove(part); |
| } |
| |
| @Override public boolean oldValueRequired() { |
| return oldValRequired; |
| } |
| |
| @Override public boolean notifyExisting() { |
| return notifyExisting; |
| } |
| |
| private String taskName() { |
| return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; |
| } |
| |
| @Override public boolean isPrimaryOnly() { |
| return locOnly && !skipPrimaryCheck; |
| } |
| |
| /** |
| * Checks whether it is need to notify listeners. |
| * |
| * @param skipEvt {@code True} if this is a skip counter event. |
| * @param cctx Cache context. |
| * @param part Partition id. |
| * @param cntr Update counter. |
| * @param evt CQ event. |
| * @return {@code True} if notification should happen immediately, or {@code false} if it should be delayed. |
| */ |
| private boolean needNotify(boolean skipEvt, |
| GridCacheContext cctx, |
| int part, |
| long cntr, |
| CacheContinuousQueryEvent evt) { |
| assert !skipEvt || evt == null; |
| assert skipEvt || part == -1 && cntr == -1; // part == -1 && cntr == -1 means skip counter. |
| |
| if (!cctx.mvccEnabled() || cctx.isLocal()) |
| return true; |
| |
| assert locInitUpdCntrs != null; |
| |
| cntr = skipEvt ? cntr : evt.getPartitionUpdateCounter(); |
| part = skipEvt ? part : evt.partitionId(); |
| |
| T2<Long, Long> initCntr = locInitUpdCntrs.get(part); |
| |
| // Do not notify listener if entry was updated before the query is started. |
| return initCntr == null || cntr >= initCntr.get2(); |
| } |
| }; |
| |
| CacheContinuousQueryManager mgr = manager(ctx); |
| |
| if (mgr == null) |
| return RegisterStatus.DELAYED; |
| |
| RegisterStatus regStatus = mgr.registerListener(routineId, lsnr, internal); |
| |
| if (regStatus == RegisterStatus.REGISTERED) |
| initFut.listen(res -> sendQueryExecutedEvent()); |
| |
| return regStatus; |
| } |
| |
| /** |
| * Fires continuous query execution event. |
| * @see org.apache.ignite.events.EventType#EVT_CACHE_QUERY_EXECUTED |
| */ |
| private void sendQueryExecutedEvent() { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| CacheEntryEventFilter filter; |
| try { |
| filter = getEventFilter(); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to trigger the continuoue query executed event. " + |
| "[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]"); |
| } |
| |
| return; |
| } |
| |
| if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { |
| //noinspection unchecked |
| ctx.event().record(new CacheQueryExecutedEvent<K, V>( |
| ctx.discovery().localNode(), |
| "Continuous query executed.", |
| EVT_CACHE_QUERY_EXECUTED, |
| CacheQueryType.CONTINUOUS.name(), |
| cacheName, |
| null, |
| null, |
| null, |
| filter instanceof CacheEntryEventSerializableFilter ? |
| (CacheEntryEventSerializableFilter)filter : null, |
| null, |
| nodeId, |
| taskName() |
| )); |
| } |
| } |
| |
| /** |
| * Performs resource injection and checks asynchrony for the provided local listener. |
| * |
| * @param lsnr Local listener. |
| * @param ctx Kernal context. |
| * @throws IgniteCheckedException If failed to perform resource injection. |
| */ |
| private void initLocalListener(CacheEntryListener lsnr, GridKernalContext ctx) throws IgniteCheckedException { |
| if (lsnr != null) { |
| CacheEntryListener impl = |
| lsnr instanceof JCacheQueryLocalListener |
| ? ((JCacheQueryLocalListener)lsnr).impl |
| : lsnr; |
| |
| ctx.resource().injectGeneric(impl); |
| |
| asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class); |
| } |
| } |
| |
| /** |
| * Performs resource injection and checks asynchrony for the provided remote filter. |
| * |
| * @param filter Remote filter. |
| * @param ctx Kernal context. |
| * @throws IgniteCheckedException If failed to perform resource injection. |
| */ |
| protected void initRemoteFilter(CacheEntryEventFilter filter, GridKernalContext ctx) throws IgniteCheckedException { |
| CacheEntryEventFilter impl = |
| filter instanceof JCacheQueryRemoteFilter |
| ? ((JCacheQueryRemoteFilter)filter).impl |
| : filter; |
| |
| if (impl != null) { |
| ctx.resource().injectGeneric(impl); |
| |
| if (!asyncCb) |
| asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class); |
| } |
| } |
| |
| /** |
| * @return Cache entry event filter. |
| * |
| * @throws IgniteCheckedException If P2P unmarshalling failed. |
| */ |
| public CacheEntryEventFilter getEventFilter() throws IgniteCheckedException { |
| initFut.get(); |
| |
| return getEventFilter0(); |
| } |
| |
| /** |
| * Returns an event filter without waiting on the unmarshalling future. |
| * |
| * @return Cache entry event filter. |
| */ |
| protected CacheEntryEventFilter getEventFilter0() { |
| return rmtFilter; |
| } |
| |
| /** |
| * @return Cache entry event transformer. |
| */ |
| @Nullable public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer() throws IgniteCheckedException { |
| initFut.get(); |
| |
| return getTransformer0(); |
| } |
| |
| /** |
| * @return Cache entry event transformer. |
| */ |
| public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer0() { |
| return null; |
| } |
| |
| /** |
| * @return Local listener of transformed events. |
| */ |
| @Nullable public EventListener<?> localTransformedEventListener() { |
| return null; |
| } |
| |
| /** |
| * @param cctx Context. |
| * @param nodeId ID of the node that started routine. |
| * @param entry Entry. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry) |
| throws IgniteCheckedException { |
| if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) { |
| entry.prepareMarshal(cctx); |
| |
| cctx.deploy().prepare(entry); |
| } |
| else |
| entry.prepareMarshal(cctx); |
| } |
| |
| /** |
| * @param ctx Context. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| if (!cctx.isLocal()) { |
| AffinityTopologyVersion topVer = initTopVer; |
| |
| cacheContext(ctx).shared().exchange().affinityReadyFuture(topVer).get(); |
| |
| for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++) |
| getOrCreatePartitionRecovery(ctx, partId, topVer); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unregister(UUID routineId, GridKernalContext ctx) { |
| assert routineId != null; |
| assert ctx != null; |
| |
| GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); |
| |
| if (cache != null) |
| cache.context().continuousQueries().unregisterListener(internal, routineId); |
| } |
| |
| /** |
| * @param ctx Kernal context. |
| * @return Continuous query manager. |
| */ |
| private CacheContinuousQueryManager manager(GridKernalContext ctx) { |
| GridCacheContext<K, V> cacheCtx = cacheContext(ctx); |
| |
| return cacheCtx == null ? null : cacheCtx.continuousQueries(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void notifyCallback(final UUID nodeId, |
| final UUID routineId, |
| Collection<?> objs, |
| final GridKernalContext ctx) { |
| assert nodeId != null; |
| assert routineId != null; |
| assert objs != null; |
| assert ctx != null; |
| |
| if (objs.isEmpty()) |
| return; |
| |
| if (asyncCb) { |
| final List<CacheContinuousQueryEntry> entries = objs instanceof List ? (List)objs : new ArrayList(objs); |
| |
| IgniteStripedThreadPoolExecutor asyncPool = ctx.pools().asyncCallbackPool(); |
| |
| int threadId = asyncPool.threadId(entries.get(0).partition()); |
| |
| int startIdx = 0; |
| |
| if (entries.size() != 1) { |
| for (int i = 1; i < entries.size(); i++) { |
| int curThreadId = asyncPool.threadId(entries.get(i).partition()); |
| |
| // If all entries from one partition avoid creation new collections. |
| if (curThreadId == threadId) |
| continue; |
| |
| final int i0 = i; |
| final int startIdx0 = startIdx; |
| |
| asyncPool.execute(new Runnable() { |
| @Override public void run() { |
| notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0)); |
| } |
| }, threadId); |
| |
| startIdx = i0; |
| threadId = curThreadId; |
| } |
| } |
| |
| final int startIdx0 = startIdx; |
| |
| asyncPool.execute(new Runnable() { |
| @Override public void run() { |
| notifyCallback0(nodeId, ctx, |
| startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size())); |
| } |
| }, threadId); |
| } |
| else |
| notifyCallback0(nodeId, ctx, (Collection)objs); |
| } |
| |
| /** |
| * @param nodeId Node id. |
| * @param ctx Kernal context. |
| * @param entries Entries. |
| */ |
| private void notifyCallback0(UUID nodeId, |
| final GridKernalContext ctx, |
| Collection<CacheContinuousQueryEntry> entries) { |
| final GridCacheContext cctx = cacheContext(ctx); |
| |
| if (cctx == null) { |
| IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to notify callback, cache is not found: " + cacheId); |
| |
| return; |
| } |
| |
| final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size()); |
| |
| for (CacheContinuousQueryEntry e : entries) { |
| GridCacheDeploymentManager depMgr = cctx.deploy(); |
| |
| ClassLoader ldr = depMgr.globalLoader(); |
| |
| try { |
| if (ctx.config().isPeerClassLoadingEnabled()) { |
| GridDeploymentInfo depInfo = e.deployInfo(); |
| |
| if (depInfo != null) { |
| depMgr.p2pContext( |
| nodeId, |
| depInfo.classLoaderId(), |
| depInfo.userVersion(), |
| depInfo.deployMode(), |
| depInfo.participants() |
| ); |
| } |
| } |
| |
| e.unmarshal(cctx, ldr); |
| |
| Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e); |
| |
| if (evts != null && !evts.isEmpty()) |
| entries0.addAll(evts); |
| } |
| catch (IgniteCheckedException ex) { |
| if (ignoreClsNotFound) |
| assert internal; |
| else |
| U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), "Failed to unmarshal entry.", ex); |
| } |
| } |
| |
| notifyLocalListener(entries0, returnValTrans); |
| } |
| |
| /** |
| * @param ctx Context. |
| * @param e entry. |
| * @return Entry collection. |
| */ |
| private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext ctx, |
| CacheContinuousQueryEntry e) { |
| assert e != null; |
| |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| IgniteCache<?, ?> cache = ctx.cache().jcache(cctx.name()); |
| |
| // Initial query entry or evicted entry. These events should be fired immediately. |
| if (internal || e.updateCounter() == -1L) |
| return e.isFiltered() ? Collections.emptyList() : F.asList(new CacheContinuousQueryEvent<>(cache, cctx, e)); |
| |
| CacheContinuousQueryPartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion()); |
| |
| return rec.collectEntries(e, cctx, cache); |
| } |
| |
| /** |
| * @param evt Query event. |
| * @return {@code True} if event passed filter otherwise {@code false}. |
| */ |
| public boolean filter(CacheContinuousQueryEvent evt) { |
| CacheContinuousQueryEntry entry = evt.entry(); |
| |
| boolean notify = !entry.isFiltered(); |
| |
| try { |
| if (notify && getEventFilter() != null) |
| notify = getEventFilter().evaluate(evt); |
| } |
| catch (NoClassDefFoundError e) { |
| P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e); |
| } |
| catch (Exception e) { |
| U.error(log, "CacheEntryEventFilter failed: " + e); |
| } |
| |
| if (!notify) |
| entry.markFiltered(); |
| |
| return notify; |
| } |
| |
| /** |
| * @param evt Continuous query event. |
| * @param notify Notify flag. |
| * @param loc Listener deployed on this node. |
| * @param recordIgniteEvt Record ignite event. |
| */ |
| private void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt, |
| boolean notify, boolean loc, boolean recordIgniteEvt) { |
| try { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| if (cctx == null) |
| return; |
| |
| CacheContinuousQueryEntry entry = evt.entry(); |
| |
| IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans = getTransformer(); |
| |
| if (loc) { |
| if (!locOnly) { |
| Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); |
| |
| notifyLocalListener(evts, trans); |
| |
| if (!internal && !skipPrimaryCheck) |
| sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); |
| } |
| else if (!entry.isFiltered()) |
| notifyLocalListener(F.asList(evt), trans); |
| } |
| else { |
| if (!entry.isFiltered()) { |
| if (trans != null) |
| entry = transformToEntry(trans, evt); |
| |
| prepareEntry(cctx, nodeId, entry); |
| } |
| |
| Object entryOrList = handleEntry(cctx, entry); |
| |
| if (entryOrList != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Send the following event to listener: " + entryOrList); |
| |
| ctx.continuous().addNotification(nodeId, routineId, entryOrList, topic, sync, true); |
| } |
| } |
| } |
| catch (ClusterTopologyCheckedException ex) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send event notification to node, node left cluster " + |
| "[node=" + nodeId + ", err=" + ex + ']'); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), "Failed to send event notification to node: " + nodeId, ex); |
| } |
| |
| if (recordIgniteEvt && notify) { |
| CacheEntryEventFilter filter; |
| try { |
| filter = getEventFilter(); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to trigger a continuous query event. " + |
| "[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]"); |
| } |
| |
| return; |
| } |
| |
| //noinspection unchecked |
| ctx.event().record(new CacheQueryReadEvent<K, V>( |
| ctx.discovery().localNode(), |
| "Continuous query executed.", |
| EVT_CACHE_QUERY_OBJECT_READ, |
| CacheQueryType.CONTINUOUS.name(), |
| cacheName, |
| null, |
| null, |
| null, |
| filter instanceof CacheEntryEventSerializableFilter ? |
| (CacheEntryEventSerializableFilter)filter : null, |
| null, |
| nodeId, |
| taskName(), |
| evt.getKey(), |
| evt.getValue(), |
| evt.getOldValue(), |
| null |
| )); |
| } |
| } |
| |
| /** |
| * Notifies local listener. |
| * |
| * @param evts Events. |
| * @param trans Transformer |
| */ |
| private void notifyLocalListener(Collection<CacheEntryEvent<? extends K, ? extends V>> evts, |
| @Nullable IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans) { |
| EventListener locTransLsnr = localTransformedEventListener(); |
| |
| assert locLsnr == null || locTransLsnr == null; |
| |
| if (F.isEmpty(evts)) |
| return; |
| |
| if (locLsnr != null) |
| locLsnr.onUpdated(evts); |
| |
| if (locTransLsnr != null) |
| locTransLsnr.onUpdated(transform(trans, evts)); |
| } |
| |
| /** |
| * @return Task name. |
| */ |
| private String taskName() { |
| return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onClientDisconnected() { |
| if (internal) |
| return; |
| |
| for (CacheContinuousQueryPartitionRecovery rec : rcvs.values()) |
| rec.resetTopologyCache(); |
| } |
| |
| /** |
| * @param ctx Context. |
| * @param partId Partition id. |
| * @param topVer Topology version for current operation. |
| * @return Partition recovery. |
| */ |
| @NotNull private CacheContinuousQueryPartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, |
| int partId, |
| AffinityTopologyVersion topVer) { |
| assert topVer != null && topVer.topologyVersion() > 0 : topVer; |
| |
| CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId); |
| |
| if (rec == null) { |
| T2<Long, Long> partCntrs = null; |
| |
| Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode; |
| |
| if (initUpdCntrsPerNode != null) { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| GridCacheAffinityManager aff = cctx.affinity(); |
| |
| for (ClusterNode node : aff.nodesByPartition(partId, topVer)) { |
| Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id()); |
| |
| if (map != null) { |
| partCntrs = map.get(partId); |
| |
| break; |
| } |
| } |
| } |
| else if (initUpdCntrs != null) |
| partCntrs = initUpdCntrs.get(partId); |
| |
| rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, |
| partCntrs != null ? partCntrs.get2() : null); |
| |
| CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); |
| |
| if (oldRec != null) |
| rec = oldRec; |
| } |
| |
| return rec; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param e Entry. |
| */ |
| private void handleBackupEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) { |
| if (internal || e.updateCounter() == -1L || nodeLeft) // Skip internal query and expire entries. |
| return; |
| |
| CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition()); |
| |
| buf.processEntry(e.copyWithDataReset(), true); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param e Entry. |
| * @return Entry. |
| */ |
| private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) { |
| assert e != null; |
| assert entryBufs != null; |
| |
| if (internal) { |
| if (e.isFiltered()) |
| return null; |
| else |
| return e; |
| } |
| |
| // Initial query entry. |
| // This events should be fired immediately. |
| if (e.updateCounter() == -1L) |
| return e; |
| |
| CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition()); |
| |
| return buf.processEntry(e, false); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param partId Partition id. |
| * @return Event buffer. |
| */ |
| CacheContinuousQueryEventBuffer partitionBuffer(GridCacheContext<?, ?> cctx, int partId) { |
| return entryBufs.computeIfAbsent(partId, |
| id -> new CacheContinuousQueryEventBuffer((backup) -> { |
| GridDhtLocalPartition locPart = cctx.topology().localPartition(id, null, false); |
| |
| if (locPart == null) |
| return -1L; |
| |
| // Use HWM for primary, LWM for backup. |
| return backup > 0 ? locPart.updateCounter() : locPart.reservedCounter(); |
| }, ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY))); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void flushOnNodeLeft() { |
| nodeLeft = true; |
| |
| for (CacheContinuousQueryEventBuffer buf : entryBufs.values()) |
| buf.flushOnExchange(null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { |
| assert ctx != null; |
| assert ctx.config().isPeerClassLoadingEnabled(); |
| |
| if (rmtFilter != null && !U.isGrid(rmtFilter.getClass())) |
| rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { |
| assert nodeId != null; |
| assert ctx != null; |
| assert ctx.config().isPeerClassLoadingEnabled(); |
| |
| if (rmtFilterDep != null) |
| rmtFilter = p2pUnmarshal(rmtFilterDep, nodeId, ctx); |
| |
| if (!p2pUnmarshalFut.isDone()) |
| ((GridFutureAdapter)p2pUnmarshalFut).onDone(); |
| } |
| |
| /** |
| * @return Whether the handler is marshalled for peer class loading. |
| */ |
| public boolean isMarshalled() { |
| return rmtFilter == null || U.isGrid(rmtFilter.getClass()) || rmtFilterDep != null; |
| } |
| |
| /** |
| * @param depObj Deployable object to unmarshal. |
| * @param nodeId Sender node Id. |
| * @param ctx Kernal context. |
| * @param <T> Result type. |
| * @return Unmarshalled object. |
| * @throws IgniteCheckedException In case of unmarshalling failures. |
| */ |
| protected <T> T p2pUnmarshal(CacheContinuousQueryDeployableObject depObj, |
| UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { |
| if (depObj != null) { |
| try { |
| return depObj.unmarshal(nodeId, ctx); |
| } |
| catch (IgniteCheckedException e) { |
| ((GridFutureAdapter)p2pUnmarshalFut).onDone(e); |
| |
| throw e; |
| } |
| catch (ExceptionInInitializerError e) { |
| IgniteCheckedException err = new IgniteCheckedException("Failed to unmarshal deployable object.", e); |
| |
| ((GridFutureAdapter)p2pUnmarshalFut).onDone(err); |
| |
| throw err; |
| } |
| } |
| else |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridContinuousBatch createBatch() { |
| return new GridContinuousQueryBatch(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onBatchAcknowledged(final UUID routineId, |
| GridContinuousBatch batch, |
| final GridKernalContext ctx) { |
| sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); |
| } |
| |
| /** |
| * @param t Acknowledge information. |
| * @param routineId Routine ID. |
| * @param ctx Context. |
| */ |
| private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t, |
| final UUID routineId, |
| final GridKernalContext ctx) { |
| if (t != null) { |
| ctx.closure().runLocalSafe(new GridPlainRunnable() { |
| @Override public void run() { |
| GridCacheContext<K, V> cctx = cacheContext(ctx); |
| |
| CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(), |
| routineId, |
| t.get1()); |
| |
| for (AffinityTopologyVersion topVer : t.get2()) { |
| for (ClusterNode node : ctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer)) { |
| if (!node.isLocal()) { |
| try { |
| cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send acknowledge message, node left " + |
| "[msg=" + msg + ", node=" + node + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY); |
| |
| U.error(log, "Failed to send acknowledge message " + |
| "[msg=" + msg + ", node=" + node + ']', e); |
| } |
| } |
| } |
| } |
| } |
| }); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object orderedTopic() { |
| return topic; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridContinuousHandler clone() { |
| try { |
| return (GridContinuousHandler)super.clone(); |
| } |
| catch (CloneNotSupportedException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(CacheContinuousQueryHandler.class, this); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeString(out, cacheName); |
| out.writeObject(topic); |
| |
| boolean b = rmtFilterDep != null; |
| |
| out.writeBoolean(b); |
| |
| if (b) |
| out.writeObject(rmtFilterDep); |
| else |
| out.writeObject(rmtFilter); |
| |
| out.writeBoolean(internal); |
| out.writeBoolean(notifyExisting); |
| out.writeBoolean(oldValRequired); |
| out.writeBoolean(sync); |
| out.writeBoolean(ignoreExpired); |
| out.writeInt(taskHash); |
| out.writeBoolean(keepBinary); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| cacheName = U.readString(in); |
| topic = in.readObject(); |
| |
| boolean b = in.readBoolean(); |
| |
| if (b) { |
| rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject(); |
| |
| p2pUnmarshalFut = new GridFutureAdapter<>(); |
| } |
| else |
| rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject(); |
| |
| internal = in.readBoolean(); |
| notifyExisting = in.readBoolean(); |
| oldValRequired = in.readBoolean(); |
| sync = in.readBoolean(); |
| ignoreExpired = in.readBoolean(); |
| taskHash = in.readInt(); |
| keepBinary = in.readBoolean(); |
| |
| cacheId = CU.cacheId(cacheName); |
| } |
| |
| /** |
| * @param ctx Kernal context. |
| * @return Cache context. |
| */ |
| private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { |
| assert ctx != null; |
| |
| return ctx.cache().<K, V>context().cacheContext(cacheId); |
| } |
| |
| /** |
| * |
| */ |
| private class ContinuousQueryAsyncClosure implements Runnable { |
| /** */ |
| private final CacheContinuousQueryEvent<K, V> evt; |
| |
| /** */ |
| private final boolean primary; |
| |
| /** */ |
| private final boolean recordIgniteEvt; |
| |
| /** */ |
| private final IgniteInternalFuture<?> fut; |
| |
| /** |
| * @param primary Primary flag. |
| * @param evt Event. |
| * @param recordIgniteEvt Fired event. |
| * @param fut Dht future. |
| */ |
| ContinuousQueryAsyncClosure( |
| boolean primary, |
| CacheContinuousQueryEvent<K, V> evt, |
| boolean recordIgniteEvt, |
| IgniteInternalFuture<?> fut) { |
| this.primary = primary; |
| this.evt = evt; |
| this.recordIgniteEvt = recordIgniteEvt; |
| this.fut = fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| final boolean notify = filter(evt); |
| |
| if (primary || skipPrimaryCheck) { |
| if (fut == null) { |
| onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); |
| |
| return; |
| } |
| |
| if (fut.isDone()) { |
| if (fut.error() != null) |
| evt.entry().markFiltered(); |
| |
| onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); |
| } |
| else { |
| fut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> f) { |
| if (f.error() != null) |
| evt.entry().markFiltered(); |
| |
| ctx.pools().asyncCallbackPool().execute(new Runnable() { |
| @Override public void run() { |
| onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); |
| } |
| }, evt.entry().partition()); |
| } |
| }); |
| } |
| } |
| else |
| handleBackupEntry(cacheContext(ctx), evt.entry()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(ContinuousQueryAsyncClosure.class, this); |
| } |
| } |
| |
| /** |
| * @param trans Transformer. |
| * @param evts Source events. |
| * @return Collection of transformed values. |
| */ |
| private Iterable transform(final IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans, |
| Collection<CacheEntryEvent<? extends K, ? extends V>> evts) { |
| final Iterator<CacheEntryEvent<? extends K, ? extends V>> iter = evts.iterator(); |
| |
| return new Iterable() { |
| @NotNull @Override public Iterator iterator() { |
| return new Iterator() { |
| @Override public boolean hasNext() { |
| return iter.hasNext(); |
| } |
| |
| @Override public Object next() { |
| return transform(trans, iter.next()); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** |
| * Transform event data with {@link #getTransformer()} if exists. |
| * |
| * @param trans Transformer. |
| * @param evt Event to transform. |
| * @return Entry contains only transformed data if transformer exists. Unchanged event if transformer is not set. |
| * @see #getTransformer() |
| */ |
| private CacheContinuousQueryEntry transformToEntry(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans, |
| CacheContinuousQueryEvent<? extends K, ? extends V> evt) { |
| Object transVal = transform(trans, evt); |
| |
| return new CacheContinuousQueryEntry(evt.entry().cacheId(), |
| evt.entry().eventType(), |
| null, |
| transVal == null ? null : cacheContext(ctx).toCacheObject(transVal), |
| null, |
| evt.entry().isKeepBinary(), |
| evt.entry().partition(), |
| evt.entry().updateCounter(), |
| evt.entry().topologyVersion(), |
| evt.entry().flags()); |
| } |
| |
| /** |
| * @param trans Transformer. |
| * @param evt Event. |
| * @return Transformed value. |
| */ |
| private Object transform(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans, |
| CacheEntryEvent<? extends K, ? extends V> evt) { |
| assert trans != null; |
| |
| Object transVal = null; |
| |
| try { |
| transVal = trans.apply(evt); |
| } |
| catch (NoClassDefFoundError e) { |
| P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e); |
| } |
| catch (Exception e) { |
| U.error(log, e); |
| } |
| |
| return transVal; |
| } |
| } |