blob: 3640db83d6e3eeff3a10aa380ae80e670f1ee981 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.event;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class DistributedEventTracker implements EventTracker {
private static final Logger logger = LogService.getLogger();
/**
* a mapping of originator to the last event applied to this cache
*
* Keys are instances of {@link ThreadIdentifier}, values are instances of
* {@link EventSequenceNumberHolder}.
*/
private final ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> recordedEvents =
new ConcurrentHashMap<>(100);
/**
* a mapping of originator to bulkOps
*
* Keys are instances of @link {@link ThreadIdentifier}
*/
private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps =
new ConcurrentHashMap<>(100);
/**
* a mapping of originator to bulkOperation's last version tags. This map differs from
* {@link #recordedBulkOps} in that the thread identifier used here is the base member id and
* thread id of the bulk op, as opposed to the fake thread id which is assigned for each bucket.
*
* recordedBulkOps are also only tracked on the secondary for partitioned regions
* recordedBulkOpVersionTags are tracked on both the primary and secondary.
*
* Keys are instances of @link {@link ThreadIdentifier}, values are instances of
* {@link BulkOperationHolder}.
*/
private final ConcurrentMap<ThreadIdentifier, BulkOperationHolder> recordedBulkOpVersionTags =
new ConcurrentHashMap<>(100);
/**
* The member that the region corresponding to this tracker (if any) received its initial image
* from (if a replicate)
*/
private volatile InternalDistributedMember initialImageProvider;
/**
* The cache associated with this tracker
*/
private InternalCache cache;
/**
* The name of this tracker
*/
private String name;
/**
* whether or not this tracker has been initialized to allow entry operation. replicate region
* does not initiate event tracker from its replicates.
*/
private volatile boolean initialized;
/**
* object used to wait for initialization
*/
private final StoppableCountDownLatch initializationLatch;
/**
* Create an event tracker
*
* @param cache the cache of the region to associate with this tracker
* @param stopper the CancelCriterion for the region
* @param regionName name of the region
*/
public DistributedEventTracker(InternalCache cache, CancelCriterion stopper, String regionName) {
this.cache = cache;
this.name = "Event Tracker for " + regionName;
this.initializationLatch = new StoppableCountDownLatch(stopper, 1);
}
@Override
public void start() {
if (cache.getEventTrackerTask() != null) {
cache.getEventTrackerTask().addTracker(this);
}
}
@Override
public void stop() {
if (cache.getEventTrackerTask() != null) {
cache.getEventTrackerTask().removeTracker(this);
}
}
@Override
public void clear() {
recordedEvents.clear();
recordedBulkOps.clear();
recordedBulkOpVersionTags.clear();
}
@Override
public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() {
Map<ThreadIdentifier, EventSequenceNumberHolder> result = new HashMap<>(recordedEvents.size());
for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : recordedEvents.entrySet()) {
EventSequenceNumberHolder holder = entry.getValue();
// don't transfer version tags - adds too much bulk just so we can do client tag recovery
result.put(entry.getKey(),
new EventSequenceNumberHolder(holder.getLastSequenceNumber(), null));
}
return result;
}
@Override
public void recordState(InternalDistributedMember provider,
Map<ThreadIdentifier, EventSequenceNumberHolder> state) {
this.initialImageProvider = provider;
StringBuffer sb = null;
if (logger.isDebugEnabled()) {
sb = new StringBuffer(200);
sb.append("Recording initial state for ").append(this.name).append(": ");
}
for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : state.entrySet()) {
if (sb != null) {
sb.append("\n ").append(entry.getKey().expensiveToString()).append("; sequenceID=")
.append(entry.getValue());
}
// record only if we haven't received an event that is newer
recordSequenceNumber(entry.getKey(), entry.getValue(), true);
}
if (sb != null) {
logger.debug(sb);
}
// fix for bug 41622 - hang in GII. This keeps ops from waiting for the
// full GII to complete
setInitialized();
}
@Override
public void setInitialized() {
initializationLatch.countDown();
initialized = true;
}
@Override
public void waitOnInitialization() throws InterruptedException {
initializationLatch.await();
}
/**
* Record an event sequence id if it is higher than what we currently have. This is intended for
* use during initial image transfer.
*
* @param membershipID the key of an entry in the map obtained from getEventState()
* @param evhObj the value of an entry in the map obtained from getEventState()
*/
protected void recordSequenceNumber(ThreadIdentifier membershipID,
EventSequenceNumberHolder evhObj) {
recordSequenceNumber(membershipID, evhObj, false);
}
/**
* Record an event sequence id if it is higher than what we currently have. This is intended for
* use during initial image transfer.
*
* @param threadID the key of an entry in the map obtained from getEventState()
* @param evh the value of an entry in the map obtained from getEventState()
* @param ifAbsent only record this state if there's not already an entry for this memberID
*/
private void recordSequenceNumber(ThreadIdentifier threadID, EventSequenceNumberHolder evh,
boolean ifAbsent) {
boolean removed;
if (logger.isDebugEnabled()) {
logger.debug("recording {} {}", threadID.expensiveToString(), evh.toString());
}
do {
removed = false;
EventSequenceNumberHolder oldEvh = recordedEvents.putIfAbsent(threadID, evh);
if (oldEvh != null) {
synchronized (oldEvh) {
if (oldEvh.isRemoved()) {
// need to wait for an entry being removed by the sweeper to go away
removed = true;
continue;
} else {
if (ifAbsent) {
break;
}
oldEvh.setEndOfLifeTimestamp(0);
if (oldEvh.getLastSequenceNumber() < evh.getLastSequenceNumber()) {
oldEvh.setLastSequenceNumber(evh.getLastSequenceNumber());
oldEvh.setVersionTag(evh.getVersionTag());
}
}
}
} else {
evh.setEndOfLifeTimestamp(0);
}
} while (removed);
}
@Override
public void recordEvent(InternalCacheEvent event) {
EventID eventID = event.getEventId();
if (ignoreEvent(event, eventID)) {
return; // not tracked
}
LocalRegion lr = (LocalRegion) event.getRegion();
ThreadIdentifier membershipID = createThreadIDFromEvent(eventID);
VersionTag tag = null;
if (lr.getServerProxy() == null) {
tag = event.getVersionTag();
RegionVersionVector v = ((LocalRegion) event.getRegion()).getVersionVector();
canonicalizeIDs(tag, v);
}
EventSequenceNumberHolder newEvh = new EventSequenceNumberHolder(eventID.getSequenceID(), tag);
if (logger.isTraceEnabled()) {
logger.trace("region event tracker recording {}", event);
}
recordSequenceNumber(membershipID, newEvh);
// If this is a bulkOp, and concurrency checks are enabled, we need to
// save the version tag in case we retry.
// Make recordBulkOp version tag after recordSequenceNumber, so that recordBulkOpStart
// in a retry bulk op would not incorrectly remove the saved version tag in
// recordedBulkOpVersionTags
if (lr.getConcurrencyChecksEnabled()
&& (event.getOperation().isPutAll() || event.getOperation().isRemoveAll())
&& lr.getServerProxy() == null) {
recordBulkOpEvent(event, membershipID);
}
}
private ThreadIdentifier createThreadIDFromEvent(EventID eventID) {
return new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
}
/**
* Record a version tag for a bulk operation.
*/
private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier threadID) {
EventID eventID = event.getEventId();
VersionTag tag = event.getVersionTag();
if (tag == null) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("recording bulkOp event {} {} {} op={}", threadID.expensiveToString(), eventID,
tag, event.getOperation());
}
RegionVersionVector versionVector = ((LocalRegion) event.getRegion()).getVersionVector();
canonicalizeIDs(tag, versionVector);
// Loop until we can successfully update the recorded bulk operations
// For this thread id.
boolean retry = false;
do {
BulkOperationHolder bulkOpTracker = recordedBulkOpVersionTags.get(threadID);
if (bulkOpTracker == null) {
bulkOpTracker = new BulkOperationHolder();
BulkOperationHolder old = recordedBulkOpVersionTags.putIfAbsent(threadID, bulkOpTracker);
if (old != null) {
retry = true;
continue;
}
}
synchronized (bulkOpTracker) {
if (bulkOpTracker.isRemoved()) {
retry = true;
continue;
}
// Add the version tag for bulkOp event.
bulkOpTracker.putVersionTag(eventID, event.getVersionTag());
retry = false;
}
} while (retry);
}
private void canonicalizeIDs(VersionTag tag, RegionVersionVector versionVector) {
if (tag != null && versionVector != null) {
tag.setMemberID(versionVector.getCanonicalId(tag.getMemberID()));
if (tag.getPreviousMemberID() != null) {
tag.setPreviousMemberID(versionVector.getCanonicalId(tag.getPreviousMemberID()));
}
}
}
@Override
public boolean hasSeenEvent(InternalCacheEvent event) {
EventID eventID = event.getEventId();
if (ignoreEvent(event, eventID)) {
return false; // not tracked
}
return hasSeenEvent(eventID, event);
}
@Override
public boolean hasSeenEvent(EventID eventID) {
return hasSeenEvent(eventID, null);
}
@Override
public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) {
if (eventID == null) {
return false;
}
EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID);
if (evh == null) {
return false;
}
synchronized (evh) {
if (evh.isRemoved() || evh.getLastSequenceNumber() < eventID.getSequenceID()) {
return false;
}
// log at fine because partitioned regions can send event multiple times
// during normal operation during bucket region initialization
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE,
"Cache encountered replay of event with ID {}. Highest recorded for this source is {}",
eventID, evh.getLastSequenceNumber());
}
// bug #44956 - recover version tag for duplicate event
if (evh.getLastSequenceNumber() == eventID.getSequenceID() && tagHolder != null
&& evh.getVersionTag() != null) {
((EntryEventImpl) tagHolder).setVersionTag(evh.getVersionTag());
}
return true;
}
}
private EventSequenceNumberHolder getSequenceHolderForEvent(EventID eventID) {
ThreadIdentifier membershipID = createThreadIDFromEvent(eventID);
return recordedEvents.get(membershipID);
}
@Override
public VersionTag findVersionTagForSequence(EventID eventID) {
EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID);
if (evh == null) {
if (logger.isDebugEnabled()) {
logger.debug("search for version tag failed as no event is recorded for {}",
createThreadIDFromEvent(eventID).expensiveToString());
}
return null;
}
synchronized (evh) {
if (logger.isDebugEnabled()) {
logger.debug("search for version tag located last event for {}: {}",
createThreadIDFromEvent(eventID).expensiveToString(), evh);
}
if (evh.getLastSequenceNumber() != eventID.getSequenceID()) {
return null;
}
// log at fine because partitioned regions can send event multiple times
// during normal operation during bucket region initialization
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE)
&& evh.getVersionTag() == null) {
logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE,
"Could not recover version tag. Found event holder with no version tag for {}",
eventID);
}
return evh.getVersionTag();
}
}
@Override
public VersionTag findVersionTagForBulkOp(EventID eventID) {
if (eventID == null) {
return null;
}
ThreadIdentifier threadID = createThreadIDFromEvent(eventID);
BulkOperationHolder evh = recordedBulkOpVersionTags.get(threadID);
if (evh == null) {
if (logger.isDebugEnabled()) {
logger.debug("search for version tag failed as no events are recorded for {}",
threadID.expensiveToString());
}
return null;
}
synchronized (evh) {
if (logger.isDebugEnabled()) {
logger.debug("search for version tag located event holder for {}: {}",
threadID.expensiveToString(), evh);
}
return evh.getEntryVersionTags().get(eventID);
}
}
@Override
public String getName() {
return name;
}
/**
* @return true if the event should not be tracked, false otherwise
*/
private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) {
if (eventID == null) {
return true;
} else {
boolean isVersioned = (event.getVersionTag() != null);
boolean isClient = event.hasClientOrigin();
if (isVersioned && isClient) {
return false; // version tags for client events are kept for retries by the client
}
boolean isEntry = event.getOperation().isEntry();
boolean isPr = event.getRegion().getAttributes().getDataPolicy().withPartitioning()
|| ((LocalRegion) event.getRegion()).isUsedForPartitionedRegionBucket();
return (!isClient && // ignore if it originated on a server, and
isEntry && // it affects an entry and
!isPr); // is not on a PR
}
}
@Override
public void syncBulkOp(Runnable r, EventID eventID, boolean partOfTransaction) {
if (partOfTransaction) {
r.run();
return;
}
Assert.assertTrue(eventID != null);
ThreadIdentifier membershipID = createThreadIDFromEvent(eventID);
Object opSyncObj = null;
do {
opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object());
if (opSyncObj == null) {
opSyncObj = recordedBulkOps.get(membershipID);
}
} while (opSyncObj == null);
synchronized (opSyncObj) {
try {
recordBulkOpStart(eventID, membershipID);
// Perform the bulk op
r.run();
} finally {
recordedBulkOps.remove(membershipID);
}
}
}
@Override
public void recordBulkOpStart(EventID eventID, ThreadIdentifier tid) {
if (logger.isDebugEnabled()) {
logger.debug("recording bulkOp start for {}", tid.expensiveToString());
}
EventSequenceNumberHolder evh = recordedEvents.get(tid);
if (evh == null) {
return;
}
synchronized (evh) {
// only remove it when a new bulk op occurs
if (eventID.getSequenceID() > evh.getLastSequenceNumber()) {
this.recordedBulkOpVersionTags.remove(tid);
}
}
}
@Override
public boolean isInitialized() {
return this.initialized;
}
@Override
public boolean isInitialImageProvider(DistributedMember mbr) {
return (this.initialImageProvider != null) && (mbr != null)
&& this.initialImageProvider.equals(mbr);
}
@Override
public ConcurrentMap<ThreadIdentifier, BulkOperationHolder> getRecordedBulkOpVersionTags() {
return recordedBulkOpVersionTags;
}
@Override
public ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> getRecordedEvents() {
return recordedEvents;
}
@Override
public String toString() {
return "" + this.name + "(initialized=" + this.initialized + ")";
}
}