blob: e7d6967de0d72136156cbfa457300ec37499147a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_EVICTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_NODES_LEFT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
/**
* Cache event manager.
*/
public class GridCacheEventManager extends GridCacheManagerAdapter {
/** Force keep binary flag. Will be set if event notification encountered exception during unmarshalling. */
private boolean forceKeepBinary;
/**
* Adds local event listener.
*
* @param lsnr Listener.
* @param evts Types of events.
*/
public void addListener(GridLocalEventListener lsnr, int... evts) {
cctx.gridEvents().addLocalEventListener(lsnr, evts);
}
/**
* Removes local event listener.
*
* @param lsnr Local event listener.
*/
public void removeListener(GridLocalEventListener lsnr) {
cctx.gridEvents().removeLocalEventListener(lsnr);
}
/**
* @param key Key for event.
* @param tx Possible surrounding transaction.
* @param txLbl Possible lable of possible surrounding transaction.
* @param val Read value.
* @param taskName Task name.
* @param keepBinary Keep binary flag.
*/
public void readEvent(KeyCacheObject key,
@Nullable IgniteInternalTx tx,
@Nullable String txLbl,
@Nullable CacheObject val,
@Nullable String taskName,
boolean keepBinary) {
if (isRecordable(EVT_CACHE_OBJECT_READ)) {
addEvent(cctx.affinity().partition(key),
key,
cctx.localNodeId(),
tx,
txLbl,
null,
EVT_CACHE_OBJECT_READ,
val,
val != null,
val,
val != null,
null,
taskName,
keepBinary);
}
}
/**
* @param part Partition.
* @param key Key for the event.
* @param tx Possible surrounding transaction.
* @param owner Possible surrounding lock.
* @param type Event type.
* @param newVal New value.
* @param hasNewVal Whether new value is present or not.
* @param oldVal Old value.
* @param hasOldVal Whether old value is present or not.
* @param cloClsName Closure class name.
* @param taskName Task name.
*/
public void addEvent(int part,
KeyCacheObject key,
@Nullable IgniteInternalTx tx,
@Nullable GridCacheMvccCandidate owner,
int type,
@Nullable CacheObject newVal,
boolean hasNewVal,
@Nullable CacheObject oldVal,
boolean hasOldVal,
String cloClsName,
String taskName,
boolean keepBinary
) {
addEvent(part,
key,
cctx.localNodeId(),
tx,
owner,
type,
newVal,
hasNewVal,
oldVal,
hasOldVal,
cloClsName,
taskName,
keepBinary);
}
/**
* @param type Event type (start or stop).
*/
public void addEvent(int type) {
addEvent(
0,
null,
cctx.localNodeId(),
null,
null,
null,
type,
null,
false,
null,
false,
null,
null,
false);
}
/**
* @param part Partition.
* @param key Key for the event.
* @param nodeId Node ID.
* @param tx Possible surrounding transaction.
* @param owner Possible surrounding lock.
* @param type Event type.
* @param newVal New value.
* @param hasNewVal Whether new value is present or not.
* @param oldVal Old value.
* @param hasOldVal Whether old value is present or not.
* @param cloClsName Closure class name.
* @param taskName Task name.
*/
public void addEvent(int part,
KeyCacheObject key,
UUID nodeId,
@Nullable IgniteInternalTx tx,
GridCacheMvccCandidate owner,
int type,
CacheObject newVal,
boolean hasNewVal,
CacheObject oldVal,
boolean hasOldVal,
String cloClsName,
String taskName,
boolean keepBinary
) {
addEvent(part,
key,
nodeId,
tx,
null,
owner == null ? null : owner.version(),
type,
newVal,
hasNewVal,
oldVal,
hasOldVal,
cloClsName,
taskName,
keepBinary);
}
/**
* @param part Partition.
* @param key Key for the event.
* @param evtNodeId Node ID.
* @param owner Possible surrounding lock.
* @param type Event type.
* @param newVal New value.
* @param hasNewVal Whether new value is present or not.
* @param oldVal Old value.
* @param hasOldVal Whether old value is present or not.
* @param cloClsName Closure class name.
* @param taskName Task name.
*/
public void addEvent(int part,
KeyCacheObject key,
UUID evtNodeId,
@Nullable GridCacheMvccCandidate owner,
int type,
@Nullable CacheObject newVal,
boolean hasNewVal,
CacheObject oldVal,
boolean hasOldVal,
String cloClsName,
String taskName,
boolean keepBinary
) {
IgniteInternalTx tx = owner == null ? null : cctx.tm().tx(owner.version());
addEvent(part,
key,
evtNodeId,
tx,
null,
owner == null ? null : owner.version(),
type,
newVal,
hasNewVal,
oldVal,
hasOldVal,
cloClsName,
taskName,
keepBinary);
}
/**
* @param part Partition.
* @param key Key for the event.
* @param evtNodeId Event node ID.
* @param tx Possible surrounding transaction.
* @param txLbl Possible label of possible surrounding transaction.
* @param lockId Lock ID.
* @param type Event type.
* @param newVal New value.
* @param hasNewVal Whether new value is present or not.
* @param oldVal Old value.
* @param hasOldVal Whether old value is present or not.
* @param cloClsName Closure class name.
* @param taskName Task class name.
*/
public void addEvent(
int part,
KeyCacheObject key,
UUID evtNodeId,
@Nullable IgniteInternalTx tx,
@Nullable String txLbl,
@Nullable Object lockId,
int type,
@Nullable CacheObject newVal,
boolean hasNewVal,
@Nullable CacheObject oldVal,
boolean hasOldVal,
@Nullable String cloClsName,
@Nullable String taskName,
boolean keepBinary
) {
assert key != null || type == EVT_CACHE_STARTED || type == EVT_CACHE_STOPPED;
if (!cctx.events().isRecordable(type))
LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));
// Events are not fired for internal entry.
if (key == null || !key.internal()) {
ClusterNode evtNode = cctx.discovery().node(evtNodeId);
if (evtNode == null)
evtNode = findNodeInHistory(evtNodeId);
if (evtNode == null)
LT.warn(log, "Failed to find event node in grid topology history " +
"(try to increase topology history size configuration property of configured " +
"discovery SPI): " + evtNodeId);
keepBinary = keepBinary || forceKeepBinary;
Object key0;
Object val0;
Object oldVal0;
try {
key0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false, null);
val0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(newVal, keepBinary, false, null);
oldVal0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(oldVal, keepBinary, false, null);
}
catch (Exception e) {
if (!cctx.cacheObjectContext().kernalContext().cacheObjects().isBinaryEnabled(cctx.config()))
throw e;
if (log.isDebugEnabled())
log.debug("Failed to unmarshall cache object value for the event notification: " + e);
if (!forceKeepBinary)
LT.warn(log, "Failed to unmarshall cache object value for the event notification " +
"(all further notifications will keep binary object format).");
forceKeepBinary = true;
key0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, true, false, null);
val0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(newVal, true, false, null);
oldVal0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(oldVal, true, false, null);
}
IgniteUuid xid = tx == null ? null : tx.xid();
String finalTxLbl = (tx == null || tx.label() == null) ? txLbl : tx.label();
cctx.gridEvents().record(new CacheEvent(cctx.name(),
cctx.localNode(),
evtNode,
"Cache event.",
type,
part,
cctx.isNear(),
key0,
xid,
finalTxLbl,
lockId,
val0,
hasNewVal,
oldVal0,
hasOldVal,
cacheEventSubjectId(type),
cloClsName,
taskName));
}
}
/** @return The ID of security subject that was an initiator of the event with the specified type. */
private UUID cacheEventSubjectId(int type) {
return type == EVT_CACHE_NODES_LEFT ||
type == EVT_CACHE_ENTRY_EVICTED ||
type == EVT_CACHE_OBJECT_EXPIRED
? null : securitySubjectId(cctx);
}
/**
* Tries to find node in history by specified ID.
*
* @param nodeId Node ID.
* @return Found node or {@code null} if history doesn't contain this node.
*/
@Nullable private ClusterNode findNodeInHistory(UUID nodeId) {
for (long topVer = cctx.discovery().topologyVersion() - 1; topVer > 0; topVer--) {
Collection<ClusterNode> top = cctx.discovery().topology(topVer);
if (top == null)
break;
for (ClusterNode node : top)
if (F.eq(node.id(), nodeId))
return node;
}
return null;
}
/**
* @param type Event type.
* @return {@code True} if event is recordable.
*/
public boolean isRecordable(int type) {
GridCacheContext cctx0 = cctx;
// Event recording is impossible in recovery mode.
if (cctx0 == null || cctx0.kernalContext().recoveryMode())
return false;
try {
CacheConfiguration cfg = cctx0.config();
return cctx0.userCache() && cctx0.gridEvents().isRecordable(type) && !cfg.isEventsDisabled();
}
catch (IllegalStateException e) {
// Cache context was cleaned up.
return false;
}
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>> ");
X.println(">>> Cache event manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() +
", cache=" + cctx.name() + ", stats=" + "N/A" + ']');
}
}