blob: dc71affd905ce1c4303265c5d75d4f34143a0918 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.query.continuous;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry.createFilteredEntry;
/**
* Continuous query handler.
*/
public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** */
private static final long serialVersionUID = 0L;
/** @see #IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD */
public static final int DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD = 100;
/** @see #IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE */
public static final int DFLT_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE = 10_000;
/** */
@SystemProperty(value = "The size of the buffer with acknowledgment events that are sent to backup nodes",
type = Long.class, defaults = "" + DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD)
public static final String IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD = "IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD";
/** */
@SystemProperty(value = "The maximum size of the continuous query listener buffer. " +
"10% of events are dropped once the buffer is full", type = Long.class,
defaults = "" + DFLT_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE)
public static final String IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE = "IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE";
/** */
static final int BACKUP_ACK_THRESHOLD =
IgniteSystemProperties.getInteger(IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD,
DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD);
/** */
static final int LSNR_MAX_BUF_SIZE =
IgniteSystemProperties.getInteger(IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE,
DFLT_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE);
/**
* Transformer implementation for processing received remote events.
* They are already transformed so we simply return transformed value for event.
*/
private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> returnValTrans =
new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, Object>() {
@Override public Object apply(CacheEntryEvent<? extends K, ? extends V> evt) {
assert evt.getKey() == null;
return evt.getValue();
}
};
/** Cache name. */
private String cacheName;
/** Topic for ordered messages. */
private Object topic;
/** P2P unmarshalling future. */
protected transient IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
/** Initialization future. */
protected transient IgniteInternalFuture<Void> initFut;
/** Local listener. */
private transient CacheEntryUpdatedListener<K, V> locLsnr;
/** Remote filter. */
private CacheEntryEventSerializableFilter<K, V> rmtFilter;
/** Deployable object for filter. */
private CacheContinuousQueryDeployableObject rmtFilterDep;
/** Internal flag. */
private boolean internal;
/** Notify existing flag. */
private boolean notifyExisting;
/** Old value required flag. */
private boolean oldValRequired;
/** Synchronous flag. */
private boolean sync;
/** Ignore expired events flag. */
private boolean ignoreExpired;
/** Task name hash code. */
private int taskHash;
/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;
/** */
private transient boolean locOnly;
/** */
private boolean keepBinary;
/** */
private transient ConcurrentMap<Integer, CacheContinuousQueryPartitionRecovery> rcvs;
/** */
private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs;
/** */
private transient CacheContinuousQueryAcknowledgeBuffer ackBuf;
/** */
private transient int cacheId;
/** */
private transient volatile Map<Integer, T2<Long, Long>> initUpdCntrs;
/** */
private transient volatile Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode;
/** */
private transient volatile AffinityTopologyVersion initTopVer;
/** */
private transient volatile boolean nodeLeft;
/** */
private transient boolean ignoreClsNotFound;
/** */
transient boolean asyncCb;
/** */
private transient UUID nodeId;
/** */
private transient UUID routineId;
/** Local update counters values on listener start. Used for skipping events fired before the listener start. */
private transient volatile Map<Integer, T2<Long, Long>> locInitUpdCntrs;
/** */
private transient GridKernalContext ctx;
/** */
private transient IgniteLogger log;
/**
* Required by {@link Externalizable}.
*/
public CacheContinuousQueryHandler() {
// No-op.
}
/**
* Constructor.
*
* @param cacheName Cache name.
* @param topic Topic for ordered messages.
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
* @param oldValRequired Old value required flag.
* @param sync Synchronous flag.
* @param ignoreExpired Ignore expired events flag.
*/
public CacheContinuousQueryHandler(
String cacheName,
Object topic,
@Nullable CacheEntryUpdatedListener<K, V> locLsnr,
@Nullable CacheEntryEventSerializableFilter<K, V> rmtFilter,
boolean oldValRequired,
boolean sync,
boolean ignoreExpired,
boolean ignoreClsNotFound) {
assert topic != null;
this.cacheName = cacheName;
this.topic = topic;
this.locLsnr = locLsnr;
this.rmtFilter = rmtFilter;
this.oldValRequired = oldValRequired;
this.sync = sync;
this.ignoreExpired = ignoreExpired;
this.ignoreClsNotFound = ignoreClsNotFound;
cacheId = CU.cacheId(cacheName);
}
/**
* @param internal Internal query.
*/
public void internal(boolean internal) {
this.internal = internal;
}
/**
* @param notifyExisting Notify existing.
*/
public void notifyExisting(boolean notifyExisting) {
this.notifyExisting = notifyExisting;
}
/** @return {@code True} if handler should obtain existing entries,{@code false} otherwise. */
public boolean notifyExisting() {
return notifyExisting;
}
/** @return {@code True} if old value required for handler, {@code false} otherwise. */
public boolean oldValueRequired() {
return oldValRequired;
}
/** @return Local listener. */
public CacheEntryUpdatedListener<K, V> localListener() {
return locLsnr;
}
/**
* @param locOnly Local only.
*/
public void localOnly(boolean locOnly) {
this.locOnly = locOnly;
}
/** @return {@code True} if handler are local only, {@code false} otherwise. */
public boolean localOnly() {
return locOnly;
}
/**
* @param taskHash Task hash.
*/
public void taskNameHash(int taskHash) {
this.taskHash = taskHash;
}
/**
* @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
*/
public void skipPrimaryCheck(boolean skipPrimaryCheck) {
this.skipPrimaryCheck = skipPrimaryCheck;
}
/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
}
/** {@inheritDoc} */
@Override public boolean isMessaging() {
return false;
}
/** {@inheritDoc} */
@Override public boolean isQuery() {
return true;
}
/** {@inheritDoc} */
@Override public boolean keepBinary() {
return keepBinary;
}
/**
* @param keepBinary Keep binary flag.
*/
public void keepBinary(boolean keepBinary) {
this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@Override public String cacheName() {
return cacheName;
}
/** {@inheritDoc} */
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
this.initUpdCntrsPerNode = cntrsPerNode;
this.initUpdCntrs = cntrs;
this.initTopVer = topVer;
}
/** {@inheritDoc} */
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
return locInitUpdCntrs;
}
/** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
assert routineId != null;
assert ctx != null;
initLocalListener(locLsnr, ctx);
if (initFut == null) {
initFut = p2pUnmarshalFut.chain((fut) -> {
try {
fut.get();
initRemoteFilter(getEventFilter0(), ctx);
IgniteClosure trans = getTransformer0();
if (trans != null)
ctx.resource().injectGeneric(trans);
}
catch (IgniteCheckedException | ExceptionInInitializerError e) {
throw new IgniteException("Failed to initialize a continuous query.", e);
}
return null;
});
}
if (initFut.error() != null)
throw new IgniteCheckedException("Failed to initialize a continuous query.", initFut.error());
entryBufs = new ConcurrentHashMap<>();
ackBuf = new CacheContinuousQueryAcknowledgeBuffer();
rcvs = new ConcurrentHashMap<>();
this.nodeId = nodeId;
this.routineId = routineId;
this.ctx = ctx;
final boolean loc = nodeId.equals(ctx.localNodeId());
assert !skipPrimaryCheck || loc;
log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
@Override public void onBeforeRegister() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (cctx != null && !cctx.isLocal())
cctx.topology().readLock();
}
@Override public void onAfterRegister() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (cctx != null && !cctx.isLocal())
cctx.topology().readUnlock();
}
@Override public void onRegister() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (cctx != null && !cctx.isLocal())
locInitUpdCntrs = toCountersMap(cctx.topology().localUpdateCounters(false));
}
@Override public boolean keepBinary() {
return keepBinary;
}
@Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt,
boolean primary,
final boolean recordIgniteEvt,
GridDhtAtomicAbstractUpdateFuture fut) {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
return;
if (log.isDebugEnabled())
log.debug("Entry updated on affinity node [evt=" + evt + ", primary=" + primary + ']');
final GridCacheContext<K, V> cctx = cacheContext(ctx);
// Check that cache stopped.
if (cctx == null)
return;
if (!needNotify(false, cctx, -1, -1, evt))
return;
// skipPrimaryCheck is set only when listen locally for replicated cache events.
assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
if (asyncCb) {
ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure(
primary,
evt,
recordIgniteEvt,
fut);
ctx.pools().asyncCallbackPool().execute(clsr, evt.partitionId());
}
else {
final boolean notify = filter(evt);
if (log.isDebugEnabled())
log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary
+ ", notify=" + notify + ']');
if (primary || skipPrimaryCheck)
onEntryUpdate(evt, notify, loc, recordIgniteEvt);
else
handleBackupEntry(cctx, evt.entry());
}
}
@Override public void onUnregister() {
try {
CacheEntryEventFilter filter = getEventFilter();
if (filter instanceof PlatformContinuousQueryFilter)
((PlatformContinuousQueryFilter)filter).onQueryUnregister();
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to execute the onUnregister callback " +
"on the continuoue query listener. " +
"[nodeId=" + nodeId + ", routineId=" + routineId + ", cacheName=" + cacheName +
", err=" + e + "]");
}
}
}
@Override public void cleanupOnAck(Map<Integer, Long> updateCntrs) {
for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) {
CacheContinuousQueryEventBuffer buf = entryBufs.get(e.getKey());
if (buf != null)
buf.cleanupOnAck(e.getValue());
}
}
@Override public void flushOnExchangeDone(GridKernalContext ctx, AffinityTopologyVersion topVer) {
assert topVer != null;
try {
GridCacheContext<K, V> cctx = cacheContext(ctx);
ClusterNode node = ctx.discovery().node(nodeId);
for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) {
CacheContinuousQueryEventBuffer buf = bufE.getValue();
Collection<CacheContinuousQueryEntry> entries = buf.flushOnExchange((cntr, filtered) ->
createFilteredEntry(cctx.cacheId(), bufE.getKey(), topVer, cntr, filtered));
if (entries == null || node == null)
continue;
for (CacheContinuousQueryEntry e : entries) {
e.markBackup();
if (!e.isFiltered())
prepareEntry(cctx, nodeId, e);
}
ctx.continuous().addBackupNotification(nodeId, routineId, entries, topic);
}
}
catch (IgniteCheckedException e) {
U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
"Failed to send backup event notification to node: " + nodeId, e);
}
}
@Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
}
@Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
AffinityTopologyVersion topVer, boolean primary) {
assert evt != null;
CacheContinuousQueryEntry e = evt.entry();
e.markFiltered();
onEntryUpdated(evt, primary, false, null);
}
@Override public CounterSkipContext skipUpdateCounter(final GridCacheContext cctx,
@Nullable CounterSkipContext skipCtx,
int part,
long cntr,
AffinityTopologyVersion topVer,
boolean primary) {
if (skipCtx == null)
skipCtx = new CounterSkipContext(part, cntr, topVer);
if (!needNotify(true, cctx, part, cntr, null))
return skipCtx;
if (loc) {
assert !locOnly;
final Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, skipCtx.entry());
if (!evts.isEmpty()) {
if (asyncCb) {
ctx.pools().asyncCallbackPool().execute(new Runnable() {
@Override public void run() {
try {
notifyLocalListener(evts, getTransformer());
}
catch (IgniteCheckedException ex) {
U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
"Failed to notify local listener.", ex);
}
}
}, part);
}
else
skipCtx.addProcessClosure(new Runnable() {
@Override public void run() {
try {
notifyLocalListener(evts, getTransformer());
}
catch (IgniteCheckedException ex) {
U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
"Failed to notify local listener.", ex);
}
}
});
}
return skipCtx;
}
CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
if (entryOrList != null) {
skipCtx.addProcessClosure(new Runnable() {
@Override public void run() {
try {
ctx.continuous().addNotification(nodeId,
routineId,
entryOrList,
topic,
false,
true);
}
catch (ClusterTopologyCheckedException ex) {
if (log.isDebugEnabled())
log.debug("Failed to send event notification to node, node left cluster " +
"[node=" + nodeId + ", err=" + ex + ']');
}
catch (IgniteCheckedException ex) {
U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
"Failed to send event notification to node: " + nodeId, ex);
}
}
});
}
return skipCtx;
}
@Override public void onPartitionEvicted(int part) {
entryBufs.remove(part);
}
@Override public boolean oldValueRequired() {
return oldValRequired;
}
@Override public boolean notifyExisting() {
return notifyExisting;
}
private String taskName() {
return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
}
@Override public boolean isPrimaryOnly() {
return locOnly && !skipPrimaryCheck;
}
/**
* Checks whether it is need to notify listeners.
*
* @param skipEvt {@code True} if this is a skip counter event.
* @param cctx Cache context.
* @param part Partition id.
* @param cntr Update counter.
* @param evt CQ event.
* @return {@code True} if notification should happen immediately, or {@code false} if it should be delayed.
*/
private boolean needNotify(boolean skipEvt,
GridCacheContext cctx,
int part,
long cntr,
CacheContinuousQueryEvent evt) {
assert !skipEvt || evt == null;
assert skipEvt || part == -1 && cntr == -1; // part == -1 && cntr == -1 means skip counter.
if (!cctx.mvccEnabled() || cctx.isLocal())
return true;
assert locInitUpdCntrs != null;
cntr = skipEvt ? cntr : evt.getPartitionUpdateCounter();
part = skipEvt ? part : evt.partitionId();
T2<Long, Long> initCntr = locInitUpdCntrs.get(part);
// Do not notify listener if entry was updated before the query is started.
return initCntr == null || cntr >= initCntr.get2();
}
};
CacheContinuousQueryManager mgr = manager(ctx);
if (mgr == null)
return RegisterStatus.DELAYED;
RegisterStatus regStatus = mgr.registerListener(routineId, lsnr, internal);
if (regStatus == RegisterStatus.REGISTERED)
initFut.listen(res -> sendQueryExecutedEvent());
return regStatus;
}
/**
* Fires continuous query execution event.
* @see org.apache.ignite.events.EventType#EVT_CACHE_QUERY_EXECUTED
*/
private void sendQueryExecutedEvent() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
CacheEntryEventFilter filter;
try {
filter = getEventFilter();
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to trigger the continuoue query executed event. " +
"[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]");
}
return;
}
if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
//noinspection unchecked
ctx.event().record(new CacheQueryExecutedEvent<K, V>(
ctx.discovery().localNode(),
"Continuous query executed.",
EVT_CACHE_QUERY_EXECUTED,
CacheQueryType.CONTINUOUS.name(),
cacheName,
null,
null,
null,
filter instanceof CacheEntryEventSerializableFilter ?
(CacheEntryEventSerializableFilter)filter : null,
null,
nodeId,
taskName()
));
}
}
/**
* Performs resource injection and checks asynchrony for the provided local listener.
*
* @param lsnr Local listener.
* @param ctx Kernal context.
* @throws IgniteCheckedException If failed to perform resource injection.
*/
private void initLocalListener(CacheEntryListener lsnr, GridKernalContext ctx) throws IgniteCheckedException {
if (lsnr != null) {
CacheEntryListener impl =
lsnr instanceof JCacheQueryLocalListener
? ((JCacheQueryLocalListener)lsnr).impl
: lsnr;
ctx.resource().injectGeneric(impl);
asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class);
}
}
/**
* Performs resource injection and checks asynchrony for the provided remote filter.
*
* @param filter Remote filter.
* @param ctx Kernal context.
* @throws IgniteCheckedException If failed to perform resource injection.
*/
protected void initRemoteFilter(CacheEntryEventFilter filter, GridKernalContext ctx) throws IgniteCheckedException {
CacheEntryEventFilter impl =
filter instanceof JCacheQueryRemoteFilter
? ((JCacheQueryRemoteFilter)filter).impl
: filter;
if (impl != null) {
ctx.resource().injectGeneric(impl);
if (!asyncCb)
asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class);
}
}
/**
* @return Cache entry event filter.
*
* @throws IgniteCheckedException If P2P unmarshalling failed.
*/
public CacheEntryEventFilter getEventFilter() throws IgniteCheckedException {
initFut.get();
return getEventFilter0();
}
/**
* Returns an event filter without waiting on the unmarshalling future.
*
* @return Cache entry event filter.
*/
protected CacheEntryEventFilter getEventFilter0() {
return rmtFilter;
}
/**
* @return Cache entry event transformer.
*/
@Nullable public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer() throws IgniteCheckedException {
initFut.get();
return getTransformer0();
}
/**
* @return Cache entry event transformer.
*/
public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer0() {
return null;
}
/**
* @return Local listener of transformed events.
*/
@Nullable public EventListener<?> localTransformedEventListener() {
return null;
}
/**
* @param cctx Context.
* @param nodeId ID of the node that started routine.
* @param entry Entry.
* @throws IgniteCheckedException In case of error.
*/
private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry)
throws IgniteCheckedException {
if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) {
entry.prepareMarshal(cctx);
cctx.deploy().prepare(entry);
}
else
entry.prepareMarshal(cctx);
}
/**
* @param ctx Context.
* @throws IgniteCheckedException In case of error.
*/
void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (!cctx.isLocal()) {
AffinityTopologyVersion topVer = initTopVer;
cacheContext(ctx).shared().exchange().affinityReadyFuture(topVer).get();
for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
getOrCreatePartitionRecovery(ctx, partId, topVer);
}
}
/** {@inheritDoc} */
@Override public void unregister(UUID routineId, GridKernalContext ctx) {
assert routineId != null;
assert ctx != null;
GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
if (cache != null)
cache.context().continuousQueries().unregisterListener(internal, routineId);
}
/**
* @param ctx Kernal context.
* @return Continuous query manager.
*/
private CacheContinuousQueryManager manager(GridKernalContext ctx) {
GridCacheContext<K, V> cacheCtx = cacheContext(ctx);
return cacheCtx == null ? null : cacheCtx.continuousQueries();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void notifyCallback(final UUID nodeId,
final UUID routineId,
Collection<?> objs,
final GridKernalContext ctx) {
assert nodeId != null;
assert routineId != null;
assert objs != null;
assert ctx != null;
if (objs.isEmpty())
return;
if (asyncCb) {
final List<CacheContinuousQueryEntry> entries = objs instanceof List ? (List)objs : new ArrayList(objs);
IgniteStripedThreadPoolExecutor asyncPool = ctx.pools().asyncCallbackPool();
int threadId = asyncPool.threadId(entries.get(0).partition());
int startIdx = 0;
if (entries.size() != 1) {
for (int i = 1; i < entries.size(); i++) {
int curThreadId = asyncPool.threadId(entries.get(i).partition());
// If all entries from one partition avoid creation new collections.
if (curThreadId == threadId)
continue;
final int i0 = i;
final int startIdx0 = startIdx;
asyncPool.execute(new Runnable() {
@Override public void run() {
notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0));
}
}, threadId);
startIdx = i0;
threadId = curThreadId;
}
}
final int startIdx0 = startIdx;
asyncPool.execute(new Runnable() {
@Override public void run() {
notifyCallback0(nodeId, ctx,
startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size()));
}
}, threadId);
}
else
notifyCallback0(nodeId, ctx, (Collection)objs);
}
/**
* @param nodeId Node id.
* @param ctx Kernal context.
* @param entries Entries.
*/
private void notifyCallback0(UUID nodeId,
final GridKernalContext ctx,
Collection<CacheContinuousQueryEntry> entries) {
final GridCacheContext cctx = cacheContext(ctx);
if (cctx == null) {
IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
if (log.isDebugEnabled())
log.debug("Failed to notify callback, cache is not found: " + cacheId);
return;
}
final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size());
for (CacheContinuousQueryEntry e : entries) {
GridCacheDeploymentManager depMgr = cctx.deploy();
ClassLoader ldr = depMgr.globalLoader();
try {
if (ctx.config().isPeerClassLoadingEnabled()) {
GridDeploymentInfo depInfo = e.deployInfo();
if (depInfo != null) {
depMgr.p2pContext(
nodeId,
depInfo.classLoaderId(),
depInfo.userVersion(),
depInfo.deployMode(),
depInfo.participants()
);
}
}
e.unmarshal(cctx, ldr);
Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e);
if (evts != null && !evts.isEmpty())
entries0.addAll(evts);
}
catch (IgniteCheckedException ex) {
if (ignoreClsNotFound)
assert internal;
else
U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), "Failed to unmarshal entry.", ex);
}
}
notifyLocalListener(entries0, returnValTrans);
}
/**
* @param ctx Context.
* @param e entry.
* @return Entry collection.
*/
private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext ctx,
CacheContinuousQueryEntry e) {
assert e != null;
GridCacheContext<K, V> cctx = cacheContext(ctx);
IgniteCache<?, ?> cache = ctx.cache().jcache(cctx.name());
// Initial query entry or evicted entry. These events should be fired immediately.
if (internal || e.updateCounter() == -1L)
return e.isFiltered() ? Collections.emptyList() : F.asList(new CacheContinuousQueryEvent<>(cache, cctx, e));
CacheContinuousQueryPartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
return rec.collectEntries(e, cctx, cache);
}
/**
* @param evt Query event.
* @return {@code True} if event passed filter otherwise {@code false}.
*/
public boolean filter(CacheContinuousQueryEvent evt) {
CacheContinuousQueryEntry entry = evt.entry();
boolean notify = !entry.isFiltered();
try {
if (notify && getEventFilter() != null)
notify = getEventFilter().evaluate(evt);
}
catch (NoClassDefFoundError e) {
P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e);
}
catch (Exception e) {
U.error(log, "CacheEntryEventFilter failed: " + e);
}
if (!notify)
entry.markFiltered();
return notify;
}
/**
* @param evt Continuous query event.
* @param notify Notify flag.
* @param loc Listener deployed on this node.
* @param recordIgniteEvt Record ignite event.
*/
private void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt,
boolean notify, boolean loc, boolean recordIgniteEvt) {
try {
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (cctx == null)
return;
CacheContinuousQueryEntry entry = evt.entry();
IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans = getTransformer();
if (loc) {
if (!locOnly) {
Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
notifyLocalListener(evts, trans);
if (!internal && !skipPrimaryCheck)
sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
}
else if (!entry.isFiltered())
notifyLocalListener(F.asList(evt), trans);
}
else {
if (!entry.isFiltered()) {
if (trans != null)
entry = transformToEntry(trans, evt);
prepareEntry(cctx, nodeId, entry);
}
Object entryOrList = handleEntry(cctx, entry);
if (entryOrList != null) {
if (log.isDebugEnabled())
log.debug("Send the following event to listener: " + entryOrList);
ctx.continuous().addNotification(nodeId, routineId, entryOrList, topic, sync, true);
}
}
}
catch (ClusterTopologyCheckedException ex) {
if (log.isDebugEnabled())
log.debug("Failed to send event notification to node, node left cluster " +
"[node=" + nodeId + ", err=" + ex + ']');
}
catch (IgniteCheckedException ex) {
U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), "Failed to send event notification to node: " + nodeId, ex);
}
if (recordIgniteEvt && notify) {
CacheEntryEventFilter filter;
try {
filter = getEventFilter();
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to trigger a continuous query event. " +
"[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]");
}
return;
}
//noinspection unchecked
ctx.event().record(new CacheQueryReadEvent<K, V>(
ctx.discovery().localNode(),
"Continuous query executed.",
EVT_CACHE_QUERY_OBJECT_READ,
CacheQueryType.CONTINUOUS.name(),
cacheName,
null,
null,
null,
filter instanceof CacheEntryEventSerializableFilter ?
(CacheEntryEventSerializableFilter)filter : null,
null,
nodeId,
taskName(),
evt.getKey(),
evt.getValue(),
evt.getOldValue(),
null
));
}
}
/**
* Notifies local listener.
*
* @param evts Events.
* @param trans Transformer
*/
private void notifyLocalListener(Collection<CacheEntryEvent<? extends K, ? extends V>> evts,
@Nullable IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans) {
EventListener locTransLsnr = localTransformedEventListener();
assert locLsnr == null || locTransLsnr == null;
if (F.isEmpty(evts))
return;
if (locLsnr != null)
locLsnr.onUpdated(evts);
if (locTransLsnr != null)
locTransLsnr.onUpdated(transform(trans, evts));
}
/**
* @return Task name.
*/
private String taskName() {
return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
}
/** {@inheritDoc} */
@Override public void onClientDisconnected() {
if (internal)
return;
for (CacheContinuousQueryPartitionRecovery rec : rcvs.values())
rec.resetTopologyCache();
}
/**
* @param ctx Context.
* @param partId Partition id.
* @param topVer Topology version for current operation.
* @return Partition recovery.
*/
@NotNull private CacheContinuousQueryPartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
int partId,
AffinityTopologyVersion topVer) {
assert topVer != null && topVer.topologyVersion() > 0 : topVer;
CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId);
if (rec == null) {
T2<Long, Long> partCntrs = null;
Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
if (initUpdCntrsPerNode != null) {
GridCacheContext<K, V> cctx = cacheContext(ctx);
GridCacheAffinityManager aff = cctx.affinity();
for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
if (map != null) {
partCntrs = map.get(partId);
break;
}
}
}
else if (initUpdCntrs != null)
partCntrs = initUpdCntrs.get(partId);
rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs != null ? partCntrs.get2() : null);
CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
if (oldRec != null)
rec = oldRec;
}
return rec;
}
/**
* @param cctx Cache context.
* @param e Entry.
*/
private void handleBackupEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
if (internal || e.updateCounter() == -1L || nodeLeft) // Skip internal query and expire entries.
return;
CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
buf.processEntry(e.copyWithDataReset(), true);
}
/**
* @param cctx Cache context.
* @param e Entry.
* @return Entry.
*/
private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
assert e != null;
assert entryBufs != null;
if (internal) {
if (e.isFiltered())
return null;
else
return e;
}
// Initial query entry.
// This events should be fired immediately.
if (e.updateCounter() == -1L)
return e;
CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
return buf.processEntry(e, false);
}
/**
* @param cctx Cache context.
* @param partId Partition id.
* @return Event buffer.
*/
CacheContinuousQueryEventBuffer partitionBuffer(GridCacheContext<?, ?> cctx, int partId) {
return entryBufs.computeIfAbsent(partId,
id -> new CacheContinuousQueryEventBuffer((backup) -> {
GridDhtLocalPartition locPart = cctx.topology().localPartition(id, null, false);
if (locPart == null)
return -1L;
// Use HWM for primary, LWM for backup.
return backup > 0 ? locPart.updateCounter() : locPart.reservedCounter();
}, ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY)));
}
/** {@inheritDoc} */
@Override public void flushOnNodeLeft() {
nodeLeft = true;
for (CacheContinuousQueryEventBuffer buf : entryBufs.values())
buf.flushOnExchange(null);
}
/** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();
if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, ctx);
}
/** {@inheritDoc} */
@Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
assert nodeId != null;
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();
if (rmtFilterDep != null)
rmtFilter = p2pUnmarshal(rmtFilterDep, nodeId, ctx);
if (!p2pUnmarshalFut.isDone())
((GridFutureAdapter)p2pUnmarshalFut).onDone();
}
/**
* @return Whether the handler is marshalled for peer class loading.
*/
public boolean isMarshalled() {
return rmtFilter == null || U.isGrid(rmtFilter.getClass()) || rmtFilterDep != null;
}
/**
* @param depObj Deployable object to unmarshal.
* @param nodeId Sender node Id.
* @param ctx Kernal context.
* @param <T> Result type.
* @return Unmarshalled object.
* @throws IgniteCheckedException In case of unmarshalling failures.
*/
protected <T> T p2pUnmarshal(CacheContinuousQueryDeployableObject depObj,
UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
if (depObj != null) {
try {
return depObj.unmarshal(nodeId, ctx);
}
catch (IgniteCheckedException e) {
((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
throw e;
}
catch (ExceptionInInitializerError e) {
IgniteCheckedException err = new IgniteCheckedException("Failed to unmarshal deployable object.", e);
((GridFutureAdapter)p2pUnmarshalFut).onDone(err);
throw err;
}
}
else
return null;
}
/** {@inheritDoc} */
@Override public GridContinuousBatch createBatch() {
return new GridContinuousQueryBatch();
}
/** {@inheritDoc} */
@Override public void onBatchAcknowledged(final UUID routineId,
GridContinuousBatch batch,
final GridKernalContext ctx) {
sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
}
/**
* @param t Acknowledge information.
* @param routineId Routine ID.
* @param ctx Context.
*/
private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t,
final UUID routineId,
final GridKernalContext ctx) {
if (t != null) {
ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(),
routineId,
t.get1());
for (AffinityTopologyVersion topVer : t.get2()) {
for (ClusterNode node : ctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer)) {
if (!node.isLocal()) {
try {
cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignored) {
IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
if (log.isDebugEnabled())
log.debug("Failed to send acknowledge message, node left " +
"[msg=" + msg + ", node=" + node + ']');
}
catch (IgniteCheckedException e) {
IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
U.error(log, "Failed to send acknowledge message " +
"[msg=" + msg + ", node=" + node + ']', e);
}
}
}
}
}
});
}
}
/** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return topic;
}
/** {@inheritDoc} */
@Override public GridContinuousHandler clone() {
try {
return (GridContinuousHandler)super.clone();
}
catch (CloneNotSupportedException e) {
throw new IllegalStateException(e);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheContinuousQueryHandler.class, this);
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, cacheName);
out.writeObject(topic);
boolean b = rmtFilterDep != null;
out.writeBoolean(b);
if (b)
out.writeObject(rmtFilterDep);
else
out.writeObject(rmtFilter);
out.writeBoolean(internal);
out.writeBoolean(notifyExisting);
out.writeBoolean(oldValRequired);
out.writeBoolean(sync);
out.writeBoolean(ignoreExpired);
out.writeInt(taskHash);
out.writeBoolean(keepBinary);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cacheName = U.readString(in);
topic = in.readObject();
boolean b = in.readBoolean();
if (b) {
rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject();
p2pUnmarshalFut = new GridFutureAdapter<>();
}
else
rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject();
internal = in.readBoolean();
notifyExisting = in.readBoolean();
oldValRequired = in.readBoolean();
sync = in.readBoolean();
ignoreExpired = in.readBoolean();
taskHash = in.readInt();
keepBinary = in.readBoolean();
cacheId = CU.cacheId(cacheName);
}
/**
* @param ctx Kernal context.
* @return Cache context.
*/
private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
assert ctx != null;
return ctx.cache().<K, V>context().cacheContext(cacheId);
}
/**
*
*/
private class ContinuousQueryAsyncClosure implements Runnable {
/** */
private final CacheContinuousQueryEvent<K, V> evt;
/** */
private final boolean primary;
/** */
private final boolean recordIgniteEvt;
/** */
private final IgniteInternalFuture<?> fut;
/**
* @param primary Primary flag.
* @param evt Event.
* @param recordIgniteEvt Fired event.
* @param fut Dht future.
*/
ContinuousQueryAsyncClosure(
boolean primary,
CacheContinuousQueryEvent<K, V> evt,
boolean recordIgniteEvt,
IgniteInternalFuture<?> fut) {
this.primary = primary;
this.evt = evt;
this.recordIgniteEvt = recordIgniteEvt;
this.fut = fut;
}
/** {@inheritDoc} */
@Override public void run() {
final boolean notify = filter(evt);
if (primary || skipPrimaryCheck) {
if (fut == null) {
onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
return;
}
if (fut.isDone()) {
if (fut.error() != null)
evt.entry().markFiltered();
onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
}
else {
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
if (f.error() != null)
evt.entry().markFiltered();
ctx.pools().asyncCallbackPool().execute(new Runnable() {
@Override public void run() {
onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
}
}, evt.entry().partition());
}
});
}
}
else
handleBackupEntry(cacheContext(ctx), evt.entry());
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ContinuousQueryAsyncClosure.class, this);
}
}
/**
* @param trans Transformer.
* @param evts Source events.
* @return Collection of transformed values.
*/
private Iterable transform(final IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans,
Collection<CacheEntryEvent<? extends K, ? extends V>> evts) {
final Iterator<CacheEntryEvent<? extends K, ? extends V>> iter = evts.iterator();
return new Iterable() {
@NotNull @Override public Iterator iterator() {
return new Iterator() {
@Override public boolean hasNext() {
return iter.hasNext();
}
@Override public Object next() {
return transform(trans, iter.next());
}
};
}
};
}
/**
* Transform event data with {@link #getTransformer()} if exists.
*
* @param trans Transformer.
* @param evt Event to transform.
* @return Entry contains only transformed data if transformer exists. Unchanged event if transformer is not set.
* @see #getTransformer()
*/
private CacheContinuousQueryEntry transformToEntry(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans,
CacheContinuousQueryEvent<? extends K, ? extends V> evt) {
Object transVal = transform(trans, evt);
return new CacheContinuousQueryEntry(evt.entry().cacheId(),
evt.entry().eventType(),
null,
transVal == null ? null : cacheContext(ctx).toCacheObject(transVal),
null,
evt.entry().isKeepBinary(),
evt.entry().partition(),
evt.entry().updateCounter(),
evt.entry().topologyVersion(),
evt.entry().flags());
}
/**
* @param trans Transformer.
* @param evt Event.
* @return Transformed value.
*/
private Object transform(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans,
CacheEntryEvent<? extends K, ? extends V> evt) {
assert trans != null;
Object transVal = null;
try {
transVal = trans.apply(evt);
}
catch (NoClassDefFoundError e) {
P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e);
}
catch (Exception e) {
U.error(log, e);
}
return transVal;
}
}