| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.query.continuous; |
| |
| import java.io.Closeable; |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.BiFunction; |
| import javax.cache.Cache; |
| import javax.cache.configuration.CacheEntryListenerConfiguration; |
| import javax.cache.configuration.Factory; |
| import javax.cache.event.CacheEntryCreatedListener; |
| import javax.cache.event.CacheEntryEvent; |
| import javax.cache.event.CacheEntryEventFilter; |
| import javax.cache.event.CacheEntryExpiredListener; |
| import javax.cache.event.CacheEntryListener; |
| import javax.cache.event.CacheEntryRemovedListener; |
| import javax.cache.event.CacheEntryUpdatedListener; |
| import javax.cache.event.EventType; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cache.CacheEntryEventSerializableFilter; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.query.CacheQueryEntryEvent; |
| import org.apache.ignite.cache.query.ContinuousQuery; |
| import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheGroupContext; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; |
| import org.apache.ignite.internal.util.GridLongList; |
| import org.apache.ignite.internal.util.lang.gridfunc.IsAllPredicate; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgniteOutClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.util.AttributeNodeFilter; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static javax.cache.event.EventType.CREATED; |
| import static javax.cache.event.EventType.EXPIRED; |
| import static javax.cache.event.EventType.REMOVED; |
| import static javax.cache.event.EventType.UPDATED; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; |
| import static org.apache.ignite.internal.IgniteFeatures.CONT_QRY_SECURITY_AWARE; |
| import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports; |
| import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE; |
| |
| /** |
| * Continuous queries manager. |
| */ |
| public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { |
| /** */ |
| private static final byte CREATED_FLAG = 0b0001; |
| |
| /** */ |
| private static final byte UPDATED_FLAG = 0b0010; |
| |
| /** */ |
| private static final byte REMOVED_FLAG = 0b0100; |
| |
| /** */ |
| private static final byte EXPIRED_FLAG = 0b1000; |
| |
| /** */ |
| private static final long BACKUP_ACK_FREQ = 5000; |
| |
| /** Listeners. */ |
| private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap<>(); |
| |
| /** Listeners count. */ |
| private final AtomicInteger lsnrCnt = new AtomicInteger(); |
| |
| /** Internal entries listeners. */ |
| private final ConcurrentMap<UUID, CacheContinuousQueryListener> intLsnrs = new ConcurrentHashMap<>(); |
| |
| /** Internal listeners count. */ |
| private final AtomicInteger intLsnrCnt = new AtomicInteger(); |
| |
| /** Query sequence number for message topic. */ |
| private final AtomicLong seq = new AtomicLong(); |
| |
| /** JCache listeners. */ |
| private final ConcurrentMap<CacheEntryListenerConfiguration, JCacheQuery> jCacheLsnrs = |
| new ConcurrentHashMap<>(); |
| |
| /** Ordered topic prefix. */ |
| private String topicPrefix; |
| |
| /** Cancelable future task for backup cleaner */ |
| private GridTimeoutProcessor.CancelableTask cancelableTask; |
| |
| /** {@inheritDoc} */ |
| @Override protected void stop0(boolean cancel, boolean destroy) { |
| if (cancelableTask != null) { |
| cancelableTask.close(); |
| |
| cancelableTask = null; |
| } |
| } |
| |
| /** |
| * USED ONLY FOR TESTING. |
| * |
| * @return Internal cancelable future task for backup cleaner. |
| */ |
| protected GridTimeoutProcessor.CancelableTask getCancelableTask() { |
| return cancelableTask; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void start0() throws IgniteCheckedException { |
| // Append cache name to the topic. |
| topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); |
| |
| if (cctx.affinityNode()) { |
| cctx.io().addCacheHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, |
| new CI2<UUID, CacheContinuousQueryBatchAck>() { |
| @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { |
| CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); |
| |
| if (lsnr != null) |
| lsnr.cleanupOnAck(msg.updateCntrs()); |
| } |
| }); |
| |
| cancelableTask = cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override protected void onKernalStart0() throws IgniteCheckedException { |
| Iterable<CacheEntryListenerConfiguration> cfgs = cctx.config().getCacheEntryListenerConfigurations(); |
| |
| if (cfgs != null) { |
| for (CacheEntryListenerConfiguration cfg : cfgs) |
| executeJCacheQuery(cfg, true, false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onKernalStop0(boolean cancel) { |
| super.onKernalStop0(cancel); |
| |
| for (JCacheQuery lsnr : jCacheLsnrs.values()) { |
| try { |
| lsnr.cancel(); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to stop JCache entry listener: " + e.getMessage()); |
| } |
| } |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @return {@code True} if should notify continuous query manager. |
| */ |
| public boolean notifyContinuousQueries(@Nullable IgniteInternalTx tx) { |
| return cctx.isReplicated() || |
| (!cctx.isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())); |
| } |
| |
| /** |
| * @param lsnrs Listeners to notify. |
| * @param key Entry key. |
| * @param partId Partition id. |
| * @param updCntr Updated counter. |
| * @param primary Primary. |
| * @param topVer Topology version. |
| */ |
| private void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, |
| KeyCacheObject key, |
| int partId, |
| long updCntr, |
| boolean primary, |
| AffinityTopologyVersion topVer) { |
| assert lsnrs != null; |
| |
| for (CacheContinuousQueryListener lsnr : lsnrs.values()) { |
| CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( |
| cctx.cacheId(), |
| UPDATED, |
| key, |
| null, |
| null, |
| lsnr.keepBinary(), |
| partId, |
| updCntr, |
| topVer, |
| (byte)0); |
| |
| CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( |
| cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); |
| |
| lsnr.skipUpdateEvent(evt, topVer, primary); |
| } |
| } |
| |
| /** |
| * @param skipCtx Context. |
| * @param part Partition number. |
| * @param cntr Update counter. |
| * @param topVer Topology version. |
| * @return Context. |
| */ |
| @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx, |
| int part, |
| long cntr, |
| AffinityTopologyVersion topVer, |
| boolean primary) { |
| for (CacheContinuousQueryListener lsnr : lsnrs.values()) |
| skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, primary); |
| |
| return skipCtx; |
| } |
| |
| /** |
| * For cache updates in shared cache group need notify others caches CQ listeners |
| * that generated counter should be skipped. |
| * |
| * @param cctx Cache context. |
| * @param part Partition. |
| * @param topVer Topology version. |
| * @param gaps Even-length array of pairs [start, end] for each gap. |
| */ |
| @Nullable public void closeBackupUpdateCountersGaps(GridCacheContext cctx, |
| int part, |
| AffinityTopologyVersion topVer, |
| GridLongList gaps) { |
| assert gaps != null && gaps.size() % 2 == 0; |
| |
| for (int i = 0; i < gaps.size() / 2; i++) { |
| long gapStart = gaps.get(i * 2); |
| long gapStop = gaps.get(i * 2 + 1); |
| |
| /* |
| * No user listeners should be called by this invocation. In the common case of partitioned cache or |
| * replicated cache with non-local-only listener gaps (dummy filtered CQ events) will be added to the |
| * backup queue without passing it to any listener. In the special case of local-only listener on |
| * replicated cache there is no backup queues used at all and therefore no gaps occur - all unfiltered |
| * events are passed to listeners upon arrive. |
| */ |
| for (long cntr = gapStart; cntr <= gapStop; cntr++) |
| skipUpdateEvent(lsnrs, null, part, cntr, false, topVer); |
| } |
| } |
| |
| /** |
| * @param internal Internal entry flag (internal key or not user cache). |
| * @param preload Whether update happened during preloading. |
| * @return Registered listeners. |
| */ |
| @Nullable public Map<UUID, CacheContinuousQueryListener> updateListeners( |
| boolean internal, |
| boolean preload) { |
| if (preload && !internal) |
| return null; |
| |
| ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol; |
| |
| if (internal) |
| lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; |
| else |
| lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; |
| |
| return F.isEmpty(lsnrCol) ? null : lsnrCol; |
| } |
| |
| /** |
| * @param key Key. |
| * @param newVal New value. |
| * @param oldVal Old value. |
| * @param internal Internal entry (internal key or not user cache). |
| * @param partId Partition. |
| * @param primary {@code True} if called on primary node. |
| * @param preload Whether update happened during preloading. |
| * @param updateCntr Update counter. |
| * @param fut Dht atomic future. |
| * @param topVer Topology version. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void onEntryUpdated( |
| KeyCacheObject key, |
| CacheObject newVal, |
| CacheObject oldVal, |
| boolean internal, |
| int partId, |
| boolean primary, |
| boolean preload, |
| long updateCntr, |
| @Nullable GridDhtAtomicAbstractUpdateFuture fut, |
| AffinityTopologyVersion topVer |
| ) throws IgniteCheckedException { |
| Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload); |
| |
| if (lsnrCol != null) { |
| onEntryUpdated( |
| lsnrCol, |
| key, |
| newVal, |
| oldVal, |
| internal, |
| partId, |
| primary, |
| preload, |
| updateCntr, |
| fut, |
| topVer); |
| } |
| } |
| |
| /** |
| * @param lsnrCol Listeners to notify. |
| * @param key Key. |
| * @param newVal New value. |
| * @param oldVal Old value. |
| * @param internal Internal entry (internal key or not user cache), |
| * @param partId Partition. |
| * @param primary {@code True} if called on primary node. |
| * @param preload Whether update happened during preloading. |
| * @param updateCntr Update counter. |
| * @param topVer Topology version. |
| * @param fut Dht atomic future. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void onEntryUpdated( |
| Map<UUID, CacheContinuousQueryListener> lsnrCol, |
| KeyCacheObject key, |
| CacheObject newVal, |
| CacheObject oldVal, |
| boolean internal, |
| int partId, |
| boolean primary, |
| boolean preload, |
| long updateCntr, |
| @Nullable GridDhtAtomicAbstractUpdateFuture fut, |
| AffinityTopologyVersion topVer |
| ) throws IgniteCheckedException { |
| assert key != null; |
| assert lsnrCol != null; |
| |
| boolean hasNewVal = newVal != null; |
| boolean hasOldVal = oldVal != null; |
| |
| if (!hasNewVal && !hasOldVal) { |
| skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer); |
| |
| return; |
| } |
| |
| EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; |
| |
| boolean initialized = false; |
| |
| boolean recordIgniteEvt = primary && !internal && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); |
| |
| for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { |
| if (preload && !lsnr.notifyExisting() || lsnr.isPrimaryOnly() && !primary) |
| continue; |
| |
| if (!initialized) { |
| if (lsnr.oldValueRequired()) { |
| oldVal = (CacheObject)cctx.unwrapTemporary(oldVal); |
| |
| if (oldVal != null) |
| oldVal.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); |
| } |
| |
| if (newVal != null) |
| newVal.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); |
| |
| initialized = true; |
| } |
| |
| CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( |
| cctx.cacheId(), |
| evtType, |
| key, |
| (!internal && evtType == REMOVED && lsnr.oldValueRequired()) ? oldVal : newVal, |
| lsnr.oldValueRequired() ? oldVal : null, |
| lsnr.keepBinary(), |
| partId, |
| updateCntr, |
| topVer, |
| (byte)0); |
| |
| IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name(), true); |
| |
| assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() + |
| ", locStart=" + cctx.startTopologyVersion() + |
| ", locNode=" + cctx.localNode() + |
| ", stopping=" + cctx.kernalContext().isStopping(); |
| |
| CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(jcache, cctx, e0); |
| |
| lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut); |
| } |
| } |
| |
| /** |
| * @param e Entry. |
| * @param key Key. |
| * @param oldVal Old value. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void onEntryExpired(GridCacheEntryEx e, KeyCacheObject key, CacheObject oldVal) |
| throws IgniteCheckedException { |
| assert e != null; |
| assert key != null; |
| |
| if (e.isInternal()) |
| return; |
| |
| ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; |
| |
| if (F.isEmpty(lsnrCol)) |
| return; |
| |
| boolean primary = cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), AffinityTopologyVersion.NONE); |
| |
| if (cctx.isReplicated() || primary) { |
| boolean recordIgniteEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); |
| |
| boolean initialized = false; |
| |
| for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { |
| if (!initialized) { |
| if (lsnr.oldValueRequired()) |
| oldVal = (CacheObject)cctx.unwrapTemporary(oldVal); |
| |
| if (oldVal != null) |
| oldVal.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); |
| |
| initialized = true; |
| } |
| |
| CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( |
| cctx.cacheId(), |
| EXPIRED, |
| key, |
| lsnr.oldValueRequired() ? oldVal : null, |
| lsnr.oldValueRequired() ? oldVal : null, |
| lsnr.keepBinary(), |
| e.partition(), |
| -1, |
| null, |
| (byte)0); |
| |
| CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( |
| cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); |
| |
| lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, null); |
| } |
| } |
| } |
| |
| /** |
| * @param locLsnr Local listener. |
| * @param rmtFilter Remote filter. |
| * @param rmtFilterFactory Remote filter factory |
| * @param bufSize Buffer size. |
| * @param timeInterval Time interval. |
| * @param autoUnsubscribe Auto unsubscribe flag. |
| * @param loc Local flag. |
| * @return Continuous routine ID. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public UUID executeQuery(@Nullable final CacheEntryUpdatedListener locLsnr, |
| @Nullable final EventListener locTransLsnr, |
| @Nullable final CacheEntryEventSerializableFilter<K, V> rmtFilter, |
| @Nullable final Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory, |
| @Nullable final Factory<IgniteClosure<K, V>> rmtTransFactory, |
| int bufSize, |
| long timeInterval, |
| boolean autoUnsubscribe, |
| boolean loc, |
| final boolean keepBinary, |
| final boolean includeExpired |
| ) throws IgniteCheckedException { |
| IgniteOutClosure<CacheContinuousQueryHandler> clsr; |
| |
| if (rmtTransFactory != null) { |
| clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { |
| @Override public CacheContinuousQueryHandler apply() { |
| return new CacheContinuousQueryHandlerV3( |
| cctx.name(), |
| TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), |
| locTransLsnr, |
| securityAwareFilterFactory(rmtFilterFactory), |
| securityAwareTransformerFactory(rmtTransFactory), |
| true, |
| false, |
| !includeExpired, |
| false); |
| } |
| }; |
| } |
| else if (rmtFilterFactory != null) { |
| clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { |
| @Override public CacheContinuousQueryHandler apply() { |
| return new CacheContinuousQueryHandlerV2( |
| cctx.name(), |
| TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), |
| locLsnr, |
| securityAwareFilterFactory(rmtFilterFactory), |
| true, |
| false, |
| !includeExpired, |
| false, |
| null); |
| } |
| }; |
| } |
| else { |
| clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { |
| @Override public CacheContinuousQueryHandler apply() { |
| assert locTransLsnr == null; |
| |
| return new CacheContinuousQueryHandler( |
| cctx.name(), |
| TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), |
| locLsnr, |
| securityAwareFilter(rmtFilter), |
| true, |
| false, |
| !includeExpired, |
| false); |
| } |
| }; |
| } |
| |
| return executeQuery0( |
| locLsnr, |
| clsr, |
| bufSize, |
| timeInterval, |
| autoUnsubscribe, |
| false, |
| false, |
| loc, |
| keepBinary, |
| false); |
| } |
| |
| /** |
| * @param locLsnr Local listener. |
| * @param rmtFilter Remote filter. |
| * @param loc Local flag. |
| * @param notifyExisting Notify existing flag. |
| * @param sync Synchronous flag. |
| * @return Continuous routine ID. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public UUID executeInternalQuery(final CacheEntryUpdatedListener<?, ?> locLsnr, |
| final CacheEntryEventSerializableFilter rmtFilter, |
| final boolean loc, |
| final boolean notifyExisting, |
| final boolean ignoreClassNotFound, |
| final boolean sync |
| ) throws IgniteCheckedException { |
| return executeQuery0( |
| locLsnr, |
| new IgniteOutClosure<CacheContinuousQueryHandler>() { |
| @Override public CacheContinuousQueryHandler apply() { |
| return new CacheContinuousQueryHandler( |
| cctx.name(), |
| TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), |
| locLsnr, |
| rmtFilter, |
| true, |
| sync, |
| true, |
| ignoreClassNotFound); |
| } |
| }, |
| ContinuousQuery.DFLT_PAGE_SIZE, |
| ContinuousQuery.DFLT_TIME_INTERVAL, |
| ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, |
| true, |
| notifyExisting, |
| loc, |
| false, |
| false); |
| } |
| |
| /** |
| * @param routineId Consume ID. |
| */ |
| public void cancelInternalQuery(UUID routineId) { |
| try { |
| cctx.kernalContext().continuous().stopRoutine(routineId).get(); |
| } |
| catch (IgniteCheckedException | IgniteException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to stop internal continuous query: " + e.getMessage()); |
| } |
| } |
| |
| /** |
| * @param cfg Listener configuration. |
| * @param onStart Whether listener is created on node start. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void executeJCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart, boolean keepBinary) |
| throws IgniteCheckedException { |
| JCacheQuery lsnr = new JCacheQuery(cfg, onStart, keepBinary); |
| |
| JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr); |
| |
| if (old != null) |
| throw new IllegalArgumentException("Listener is already registered for configuration: " + cfg); |
| |
| try { |
| lsnr.execute(); |
| } |
| catch (IgniteCheckedException e) { |
| cancelJCacheQuery(cfg); |
| |
| throw e; |
| } |
| } |
| |
| /** |
| * @param cfg Listener configuration. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void cancelJCacheQuery(CacheEntryListenerConfiguration cfg) throws IgniteCheckedException { |
| JCacheQuery lsnr = jCacheLsnrs.remove(cfg); |
| |
| if (lsnr != null) |
| lsnr.cancel(); |
| } |
| |
| /** |
| * @param topVer Finished exchange topology version. |
| */ |
| public void flushOnExchangeDone(AffinityTopologyVersion topVer) { |
| for (CacheContinuousQueryListener lsnr : lsnrs.values()) |
| lsnr.flushOnExchangeDone(cctx.kernalContext(), topVer); |
| } |
| |
| /** |
| * Partition evicted callback. |
| * |
| * @param part Partition number. |
| */ |
| public void onPartitionEvicted(int part) { |
| for (CacheContinuousQueryListener lsnr : lsnrs.values()) |
| lsnr.onPartitionEvicted(part); |
| |
| for (CacheContinuousQueryListener lsnr : intLsnrs.values()) |
| lsnr.onPartitionEvicted(part); |
| } |
| |
| /** |
| * @param locLsnr Local listener. |
| * @param clsr Closure to create CacheContinuousQueryHandler. |
| * @param bufSize Buffer size. |
| * @param timeInterval Time interval. |
| * @param autoUnsubscribe Auto unsubscribe flag. |
| * @param internal Internal flag. |
| * @param notifyExisting Notify existing flag. |
| * @param loc Local flag. |
| * @param keepBinary Keep binary flag. |
| * @param onStart Waiting topology exchange. |
| * @return Continuous routine ID. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, |
| IgniteOutClosure<CacheContinuousQueryHandler> clsr, |
| int bufSize, |
| long timeInterval, |
| boolean autoUnsubscribe, |
| boolean internal, |
| boolean notifyExisting, |
| boolean loc, |
| final boolean keepBinary, |
| boolean onStart |
| ) throws IgniteCheckedException { |
| cctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? |
| cctx.kernalContext().job().currentTaskNameHash() : 0; |
| |
| boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); |
| |
| final CacheContinuousQueryHandler hnd = clsr.apply(); |
| |
| hnd.taskNameHash(taskNameHash); |
| hnd.skipPrimaryCheck(skipPrimaryCheck); |
| hnd.notifyExisting(notifyExisting); |
| hnd.internal(internal); |
| hnd.keepBinary(keepBinary); |
| hnd.localOnly(loc); |
| |
| IgnitePredicate<ClusterNode> pred = loc ? F.nodeForNodeId(cctx.localNodeId()) |
| : new IsAllPredicate<>(cctx.group().nodeFilter(), new AttributeNodeFilter(ATTR_CLIENT_MODE, false)); |
| |
| assert pred != null : cctx.config(); |
| |
| UUID id = null; |
| |
| try { |
| id = cctx.kernalContext().continuous().startRoutine( |
| hnd, |
| loc, |
| bufSize, |
| timeInterval, |
| autoUnsubscribe, |
| pred).get(); |
| |
| if (hnd.isQuery() && cctx.userCache() && !loc && !onStart) |
| hnd.waitTopologyFuture(cctx.kernalContext()); |
| } |
| catch (NodeStoppingException e) { |
| // Wrap original exception to show the source of continuous query start stacktrace. |
| throw new NodeStoppingException(e); |
| } |
| catch (IgniteCheckedException e) { |
| log.warning("Failed to start continuous query.", e); |
| |
| if (id != null) |
| cctx.kernalContext().continuous().stopRoutine(id); |
| |
| throw new IgniteCheckedException("Failed to start continuous query.", e); |
| } |
| |
| if (notifyExisting) { |
| assert locLsnr != null : "Local listener can't be null if notification for existing entries are enabled"; |
| |
| final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator( |
| cctx.cacheId(), |
| true, |
| true, |
| AffinityTopologyVersion.NONE, |
| null, |
| null); |
| |
| locLsnr.onUpdated(new Iterable<CacheEntryEvent>() { |
| @Override public Iterator<CacheEntryEvent> iterator() { |
| return new Iterator<CacheEntryEvent>() { |
| private CacheContinuousQueryEvent next; |
| |
| { |
| advance(); |
| } |
| |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| @Override public CacheEntryEvent next() { |
| if (!hasNext()) |
| throw new NoSuchElementException(); |
| |
| CacheEntryEvent next0 = next; |
| |
| advance(); |
| |
| return next0; |
| } |
| |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| private void advance() { |
| next = null; |
| |
| while (next == null) { |
| if (!it.hasNext()) |
| break; |
| |
| CacheDataRow e = it.next(); |
| |
| CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( |
| cctx.cacheId(), |
| CREATED, |
| e.key(), |
| e.value(), |
| null, |
| keepBinary, |
| 0, |
| -1, |
| null, |
| (byte)0); |
| |
| next = new CacheContinuousQueryEvent<>( |
| cctx.kernalContext().cache().jcache(cctx.name()), |
| cctx, entry); |
| |
| if (!hnd.filter(next)) |
| next = null; |
| } |
| } |
| }; |
| } |
| }); |
| } |
| |
| return id; |
| } |
| |
| /** |
| * @param factory Original factory. |
| * @return Security aware factory. |
| */ |
| private Factory<IgniteClosure<K, V>> securityAwareTransformerFactory(Factory<IgniteClosure<K, V>> factory) { |
| return securityAwareComponent(factory, SecurityAwareTransformerFactory::new); |
| } |
| |
| /** |
| * @param factory Original factory. |
| * @return Security aware factory. |
| */ |
| private Factory<CacheEntryEventFilter<K, V>> securityAwareFilterFactory(Factory<CacheEntryEventFilter<K, V>> factory) { |
| return securityAwareComponent(factory, SecurityAwareFilterFactory::new); |
| } |
| |
| /** |
| * @param filter Original filter. |
| * @return Security aware filter. |
| */ |
| private CacheEntryEventSerializableFilter<K, V> securityAwareFilter(CacheEntryEventSerializableFilter<K, V> filter) { |
| return securityAwareComponent(filter, SecurityAwareFilter::new); |
| } |
| |
| /** |
| * @param component Original component. |
| * @param f Function that converts the original component to a security aware component. |
| * @return Security aware component. |
| */ |
| private <T> T securityAwareComponent(T component, BiFunction<UUID, T, T> f) { |
| if (component == null) |
| return null; |
| |
| GridKernalContext ctx = cctx.kernalContext(); |
| |
| if (ctx.security().enabled() && allNodesSupports(ctx.discovery().allNodes(), CONT_QRY_SECURITY_AWARE)) { |
| final UUID subjectId = ctx.security().securityContext().subject().id(); |
| |
| return f.apply(subjectId, component); |
| } |
| |
| return component; |
| } |
| |
| /** |
| * @param keepBinary Keep binary flag. |
| * @param filter Filter. |
| * @return Iterable for events created for existing cache entries. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public Iterable<CacheEntryEvent<?, ?>> existingEntries(final boolean keepBinary, final CacheEntryEventFilter filter) |
| throws IgniteCheckedException { |
| final Iterator<Cache.Entry<?, ?>> it = cctx.cache().igniteIterator(keepBinary); |
| |
| final Cache cache = cctx.kernalContext().cache().jcache(cctx.name()); |
| |
| return new Iterable<CacheEntryEvent<?, ?>>() { |
| @Override public Iterator<CacheEntryEvent<?, ?>> iterator() { |
| return new Iterator<CacheEntryEvent<?, ?>>() { |
| private CacheQueryEntryEvent<?, ?> next; |
| |
| { |
| advance(); |
| } |
| |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| @Override public CacheEntryEvent<?, ?> next() { |
| if (!hasNext()) |
| throw new NoSuchElementException(); |
| |
| CacheEntryEvent next0 = next; |
| |
| advance(); |
| |
| return next0; |
| } |
| |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| private void advance() { |
| next = null; |
| |
| while (next == null) { |
| if (!it.hasNext()) |
| break; |
| |
| Cache.Entry e = it.next(); |
| |
| next = new CacheEntryEventImpl( |
| cache, |
| CREATED, |
| e.getKey(), |
| e.getValue()); |
| |
| if (filter != null && !filter.evaluate(next)) |
| next = null; |
| } |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** |
| * @param lsnrId Listener ID. |
| * @param lsnr Listener. |
| * @param internal Internal flag. |
| * @return Whether listener was actually registered. |
| */ |
| GridContinuousHandler.RegisterStatus registerListener( |
| UUID lsnrId, |
| CacheContinuousQueryListener lsnr, |
| boolean internal |
| ) { |
| boolean added; |
| |
| if (internal) { |
| added = intLsnrs.putIfAbsent(lsnrId, lsnr) == null; |
| |
| if (added) |
| intLsnrCnt.incrementAndGet(); |
| } |
| else { |
| lsnr.onBeforeRegister(); |
| |
| try { |
| CacheGroupContext grp = cctx.group(); |
| |
| grp.listenerLock().writeLock().lock(); |
| |
| try { |
| added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; |
| |
| if (added) { |
| lsnrCnt.incrementAndGet(); |
| |
| lsnr.onRegister(); |
| |
| if (lsnrCnt.get() == 1) { |
| if (grp.sharedGroup()) |
| grp.addCacheWithContinuousQuery(cctx); |
| } |
| } |
| } |
| finally { |
| grp.listenerLock().writeLock().unlock(); |
| } |
| } |
| finally { |
| lsnr.onAfterRegister(); |
| } |
| } |
| |
| return added ? GridContinuousHandler.RegisterStatus.REGISTERED |
| : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; |
| } |
| |
| /** |
| * @param internal Internal flag. |
| * @param id Listener ID. |
| */ |
| void unregisterListener(boolean internal, UUID id) { |
| CacheContinuousQueryListener lsnr; |
| |
| if (internal) { |
| if ((lsnr = intLsnrs.remove(id)) != null) { |
| intLsnrCnt.decrementAndGet(); |
| |
| lsnr.onUnregister(); |
| } |
| } |
| else { |
| cctx.group().listenerLock().writeLock().lock(); |
| |
| try { |
| if ((lsnr = lsnrs.remove(id)) != null) { |
| int cnt = lsnrCnt.decrementAndGet(); |
| |
| if (cctx.group().sharedGroup() && cnt == 0) |
| cctx.group().removeCacheWithContinuousQuery(cctx); |
| } |
| } |
| finally { |
| cctx.group().listenerLock().writeLock().unlock(); |
| } |
| |
| if (lsnr != null) |
| lsnr.onUnregister(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class JCacheQuery { |
| /** */ |
| private final CacheEntryListenerConfiguration cfg; |
| |
| /** */ |
| private final boolean onStart; |
| |
| /** */ |
| private final boolean keepBinary; |
| |
| /** */ |
| private volatile UUID routineId; |
| |
| /** */ |
| private volatile JCacheQueryLocalListener locLsnr; |
| |
| /** |
| * @param cfg Listener configuration. |
| * @param onStart {@code True} if executed on cache start. |
| */ |
| private JCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart, boolean keepBinary) { |
| this.cfg = cfg; |
| this.onStart = onStart; |
| this.keepBinary = keepBinary; |
| } |
| |
| /** |
| * @throws IgniteCheckedException In case of error. |
| */ |
| @SuppressWarnings("unchecked") |
| void execute() throws IgniteCheckedException { |
| if (!onStart) |
| cctx.config().addCacheEntryListenerConfiguration(cfg); |
| |
| CacheEntryListener locLsnrImpl = (CacheEntryListener)cfg.getCacheEntryListenerFactory().create(); |
| |
| if (locLsnrImpl == null) |
| throw new IgniteCheckedException("Local CacheEntryListener is mandatory and can't be null."); |
| |
| byte types = 0; |
| |
| types |= locLsnrImpl instanceof CacheEntryCreatedListener ? CREATED_FLAG : 0; |
| types |= locLsnrImpl instanceof CacheEntryUpdatedListener ? UPDATED_FLAG : 0; |
| types |= locLsnrImpl instanceof CacheEntryRemovedListener ? REMOVED_FLAG : 0; |
| types |= locLsnrImpl instanceof CacheEntryExpiredListener ? EXPIRED_FLAG : 0; |
| |
| if (types == 0) |
| throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces."); |
| |
| final byte types0 = types; |
| |
| locLsnr = new JCacheQueryLocalListener( |
| locLsnrImpl, |
| log); |
| |
| routineId = executeQuery0( |
| locLsnr, |
| new IgniteOutClosure<CacheContinuousQueryHandler>() { |
| @Override public CacheContinuousQueryHandler apply() { |
| CacheContinuousQueryHandler hnd; |
| Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory(); |
| |
| if (rmtFilterFactory != null) |
| hnd = new CacheContinuousQueryHandlerV2( |
| cctx.name(), |
| TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), |
| locLsnr, |
| securityAwareFilterFactory(rmtFilterFactory), |
| cfg.isOldValueRequired(), |
| cfg.isSynchronous(), |
| false, |
| false, |
| types0); |
| else { |
| JCacheQueryRemoteFilter jCacheFilter; |
| |
| CacheEntryEventFilter filter = null; |
| |
| if (rmtFilterFactory != null) { |
| filter = rmtFilterFactory.create(); |
| |
| if (!(filter instanceof Serializable)) |
| throw new IgniteException("Topology has nodes of the old versions. " + |
| "In this case EntryEventFilter must implement java.io.Serializable " + |
| "interface. Filter: " + filter); |
| } |
| |
| jCacheFilter = new JCacheQueryRemoteFilter(filter, types0); |
| |
| hnd = new CacheContinuousQueryHandler( |
| cctx.name(), |
| TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), |
| locLsnr, |
| securityAwareFilter(jCacheFilter), |
| cfg.isOldValueRequired(), |
| cfg.isSynchronous(), |
| false, |
| false); |
| } |
| |
| return hnd; |
| } |
| }, |
| ContinuousQuery.DFLT_PAGE_SIZE, |
| ContinuousQuery.DFLT_TIME_INTERVAL, |
| ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, |
| false, |
| false, |
| false, |
| keepBinary, |
| onStart |
| ); |
| } |
| |
| /** |
| * @throws IgniteCheckedException In case of error. |
| */ |
| @SuppressWarnings("unchecked") |
| void cancel() throws IgniteCheckedException { |
| UUID routineId0 = routineId; |
| |
| if (routineId0 != null) |
| cctx.kernalContext().continuous().stopRoutine(routineId0).get(); |
| |
| cctx.config().removeCacheEntryListenerConfiguration(cfg); |
| |
| U.closeQuiet(locLsnr); |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V>, Closeable { |
| /** */ |
| final CacheEntryListener<K, V> impl; |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** |
| * @param impl Listener. |
| * @param log Logger. |
| */ |
| JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) { |
| assert impl != null; |
| assert log != null; |
| |
| this.impl = impl; |
| |
| this.log = log; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { |
| for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { |
| try { |
| switch (evt.getEventType()) { |
| case CREATED: |
| assert impl instanceof CacheEntryCreatedListener : evt; |
| |
| ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt)); |
| |
| break; |
| |
| case UPDATED: |
| assert impl instanceof CacheEntryUpdatedListener : evt; |
| |
| ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt)); |
| |
| break; |
| |
| case REMOVED: |
| assert impl instanceof CacheEntryRemovedListener : evt; |
| |
| ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt)); |
| |
| break; |
| |
| case EXPIRED: |
| assert impl instanceof CacheEntryExpiredListener : evt; |
| |
| ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt)); |
| |
| break; |
| |
| default: |
| throw new IllegalStateException("Unknown type: " + evt.getEventType()); |
| } |
| } |
| catch (Exception e) { |
| U.error(log, "CacheEntryListener failed: " + e); |
| } |
| } |
| } |
| |
| /** |
| * @param evt Event. |
| * @return Singleton iterable. |
| */ |
| private Iterable<CacheEntryEvent<? extends K, ? extends V>> singleton( |
| CacheEntryEvent<? extends K, ? extends V> evt) { |
| assert evt instanceof CacheContinuousQueryEvent; |
| |
| Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(1); |
| |
| evts.add(evt); |
| |
| return evts; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() throws IOException { |
| if (impl instanceof Closeable) |
| U.closeQuiet((Closeable)impl); |
| } |
| } |
| |
| /** |
| * For handler version 2.0 this filter should not be serialized. |
| */ |
| protected static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| protected CacheEntryEventFilter impl; |
| |
| /** */ |
| private byte types; |
| |
| /** */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** |
| * For {@link Externalizable}. |
| */ |
| public JCacheQueryRemoteFilter() { |
| // no-op. |
| } |
| |
| /** |
| * @param impl Filter. |
| * @param types Types. |
| */ |
| JCacheQueryRemoteFilter(@Nullable CacheEntryEventFilter impl, byte types) { |
| assert types != 0; |
| |
| this.impl = impl; |
| this.types = types; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean evaluate(CacheEntryEvent evt) { |
| try { |
| return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt)); |
| } |
| catch (Exception e) { |
| U.error(log, "CacheEntryEventFilter failed: " + e); |
| |
| return true; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| out.writeObject(impl); |
| out.writeByte(types); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| impl = (CacheEntryEventFilter)in.readObject(); |
| types = in.readByte(); |
| } |
| |
| /** |
| * @param evtType Type. |
| * @return Flag value. |
| */ |
| private byte flag(EventType evtType) { |
| switch (evtType) { |
| case CREATED: |
| return CREATED_FLAG; |
| |
| case UPDATED: |
| return UPDATED_FLAG; |
| |
| case REMOVED: |
| return REMOVED_FLAG; |
| |
| case EXPIRED: |
| return EXPIRED_FLAG; |
| |
| default: |
| throw new IllegalStateException("Unknown type: " + evtType); |
| } |
| } |
| } |
| |
| /** |
| * Task flash backup queue. |
| */ |
| private static final class BackupCleaner implements Runnable { |
| /** Listeners. */ |
| private final Map<UUID, CacheContinuousQueryListener> lsnrs; |
| |
| /** Context. */ |
| private final GridKernalContext ctx; |
| |
| /** |
| * @param lsnrs Listeners. |
| */ |
| public BackupCleaner(Map<UUID, CacheContinuousQueryListener> lsnrs, GridKernalContext ctx) { |
| this.lsnrs = lsnrs; |
| this.ctx = ctx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| for (CacheContinuousQueryListener lsnr : lsnrs.values()) |
| lsnr.acknowledgeBackupOnTimeout(ctx); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static class CacheEntryEventImpl extends CacheQueryEntryEvent { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| @GridToStringInclude(sensitive = true) |
| private Object key; |
| |
| /** */ |
| @GridToStringInclude(sensitive = true) |
| private Object val; |
| |
| /** |
| * @param src Event source. |
| * @param evtType Event type. |
| * @param key Key. |
| * @param val Value. |
| */ |
| public CacheEntryEventImpl(Cache src, EventType evtType, Object key, Object val) { |
| super(src, evtType); |
| |
| this.key = key; |
| this.val = val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getPartitionUpdateCounter() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object getOldValue() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isOldValueAvailable() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object getKey() { |
| return key; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object getValue() { |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object unwrap(Class cls) { |
| if (cls.isAssignableFrom(getClass())) |
| return cls.cast(this); |
| |
| throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(CacheEntryEventImpl.class, this); |
| } |
| } |
| } |