| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.event; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalCacheEvent; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.ha.ThreadIdentifier; |
| import org.apache.geode.internal.cache.versions.RegionVersionVector; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| |
| public class DistributedEventTracker implements EventTracker { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * a mapping of originator to the last event applied to this cache |
| * |
| * Keys are instances of {@link ThreadIdentifier}, values are instances of |
| * {@link EventSequenceNumberHolder}. |
| */ |
| private final ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> recordedEvents = |
| new ConcurrentHashMap<>(100); |
| |
| /** |
| * a mapping of originator to bulkOps |
| * |
| * Keys are instances of @link {@link ThreadIdentifier} |
| */ |
| private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps = |
| new ConcurrentHashMap<>(100); |
| |
| /** |
| * a mapping of originator to bulkOperation's last version tags. This map differs from |
| * {@link #recordedBulkOps} in that the thread identifier used here is the base member id and |
| * thread id of the bulk op, as opposed to the fake thread id which is assigned for each bucket. |
| * |
| * recordedBulkOps are also only tracked on the secondary for partitioned regions |
| * recordedBulkOpVersionTags are tracked on both the primary and secondary. |
| * |
| * Keys are instances of @link {@link ThreadIdentifier}, values are instances of |
| * {@link BulkOperationHolder}. |
| */ |
| private final ConcurrentMap<ThreadIdentifier, BulkOperationHolder> recordedBulkOpVersionTags = |
| new ConcurrentHashMap<>(100); |
| |
| /** |
| * The member that the region corresponding to this tracker (if any) received its initial image |
| * from (if a replicate) |
| */ |
| private volatile InternalDistributedMember initialImageProvider; |
| |
| /** |
| * The cache associated with this tracker |
| */ |
| private InternalCache cache; |
| |
| /** |
| * The name of this tracker |
| */ |
| private String name; |
| |
| /** |
| * whether or not this tracker has been initialized to allow entry operation. replicate region |
| * does not initiate event tracker from its replicates. |
| */ |
| private volatile boolean initialized; |
| |
| /** |
| * object used to wait for initialization |
| */ |
| private final StoppableCountDownLatch initializationLatch; |
| |
| /** |
| * Create an event tracker |
| * |
| * @param cache the cache of the region to associate with this tracker |
| * @param stopper the CancelCriterion for the region |
| * @param regionName name of the region |
| */ |
| public DistributedEventTracker(InternalCache cache, CancelCriterion stopper, String regionName) { |
| |
| this.cache = cache; |
| this.name = "Event Tracker for " + regionName; |
| this.initializationLatch = new StoppableCountDownLatch(stopper, 1); |
| } |
| |
| @Override |
| public void start() { |
| if (cache.getEventTrackerTask() != null) { |
| cache.getEventTrackerTask().addTracker(this); |
| } |
| } |
| |
| @Override |
| public void stop() { |
| if (cache.getEventTrackerTask() != null) { |
| cache.getEventTrackerTask().removeTracker(this); |
| } |
| } |
| |
| @Override |
| public void clear() { |
| recordedEvents.clear(); |
| recordedBulkOps.clear(); |
| recordedBulkOpVersionTags.clear(); |
| } |
| |
| @Override |
| public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() { |
| Map<ThreadIdentifier, EventSequenceNumberHolder> result = new HashMap<>(recordedEvents.size()); |
| for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : recordedEvents.entrySet()) { |
| EventSequenceNumberHolder holder = entry.getValue(); |
| // don't transfer version tags - adds too much bulk just so we can do client tag recovery |
| result.put(entry.getKey(), |
| new EventSequenceNumberHolder(holder.getLastSequenceNumber(), null)); |
| } |
| return result; |
| } |
| |
| @Override |
| public void recordState(InternalDistributedMember provider, |
| Map<ThreadIdentifier, EventSequenceNumberHolder> state) { |
| this.initialImageProvider = provider; |
| StringBuffer sb = null; |
| if (logger.isDebugEnabled()) { |
| sb = new StringBuffer(200); |
| sb.append("Recording initial state for ").append(this.name).append(": "); |
| } |
| for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : state.entrySet()) { |
| if (sb != null) { |
| sb.append("\n ").append(entry.getKey().expensiveToString()).append("; sequenceID=") |
| .append(entry.getValue()); |
| } |
| // record only if we haven't received an event that is newer |
| recordSequenceNumber(entry.getKey(), entry.getValue(), true); |
| } |
| if (sb != null) { |
| logger.debug(sb); |
| } |
| // fix for bug 41622 - hang in GII. This keeps ops from waiting for the |
| // full GII to complete |
| setInitialized(); |
| } |
| |
| @Override |
| public void setInitialized() { |
| initializationLatch.countDown(); |
| initialized = true; |
| } |
| |
| @Override |
| public void waitOnInitialization() throws InterruptedException { |
| initializationLatch.await(); |
| } |
| |
| /** |
| * Record an event sequence id if it is higher than what we currently have. This is intended for |
| * use during initial image transfer. |
| * |
| * @param membershipID the key of an entry in the map obtained from getEventState() |
| * @param evhObj the value of an entry in the map obtained from getEventState() |
| */ |
| protected void recordSequenceNumber(ThreadIdentifier membershipID, |
| EventSequenceNumberHolder evhObj) { |
| recordSequenceNumber(membershipID, evhObj, false); |
| } |
| |
| /** |
| * Record an event sequence id if it is higher than what we currently have. This is intended for |
| * use during initial image transfer. |
| * |
| * @param threadID the key of an entry in the map obtained from getEventState() |
| * @param evh the value of an entry in the map obtained from getEventState() |
| * @param ifAbsent only record this state if there's not already an entry for this memberID |
| */ |
| private void recordSequenceNumber(ThreadIdentifier threadID, EventSequenceNumberHolder evh, |
| boolean ifAbsent) { |
| boolean removed; |
| if (logger.isDebugEnabled()) { |
| logger.debug("recording {} {}", threadID.expensiveToString(), evh.toString()); |
| } |
| do { |
| removed = false; |
| EventSequenceNumberHolder oldEvh = recordedEvents.putIfAbsent(threadID, evh); |
| if (oldEvh != null) { |
| synchronized (oldEvh) { |
| if (oldEvh.isRemoved()) { |
| // need to wait for an entry being removed by the sweeper to go away |
| removed = true; |
| continue; |
| } else { |
| if (ifAbsent) { |
| break; |
| } |
| oldEvh.setEndOfLifeTimestamp(0); |
| if (oldEvh.getLastSequenceNumber() < evh.getLastSequenceNumber()) { |
| oldEvh.setLastSequenceNumber(evh.getLastSequenceNumber()); |
| oldEvh.setVersionTag(evh.getVersionTag()); |
| } |
| } |
| } |
| } else { |
| evh.setEndOfLifeTimestamp(0); |
| } |
| } while (removed); |
| } |
| |
| @Override |
| public void recordEvent(InternalCacheEvent event) { |
| EventID eventID = event.getEventId(); |
| if (ignoreEvent(event, eventID)) { |
| return; // not tracked |
| } |
| |
| LocalRegion lr = (LocalRegion) event.getRegion(); |
| ThreadIdentifier membershipID = createThreadIDFromEvent(eventID); |
| |
| VersionTag tag = null; |
| if (lr.getServerProxy() == null) { |
| tag = event.getVersionTag(); |
| RegionVersionVector v = ((LocalRegion) event.getRegion()).getVersionVector(); |
| canonicalizeIDs(tag, v); |
| } |
| |
| EventSequenceNumberHolder newEvh = new EventSequenceNumberHolder(eventID.getSequenceID(), tag); |
| if (logger.isTraceEnabled()) { |
| logger.trace("region event tracker recording {}", event); |
| } |
| recordSequenceNumber(membershipID, newEvh); |
| |
| // If this is a bulkOp, and concurrency checks are enabled, we need to |
| // save the version tag in case we retry. |
| // Make recordBulkOp version tag after recordSequenceNumber, so that recordBulkOpStart |
| // in a retry bulk op would not incorrectly remove the saved version tag in |
| // recordedBulkOpVersionTags |
| if (lr.getConcurrencyChecksEnabled() |
| && (event.getOperation().isPutAll() || event.getOperation().isRemoveAll()) |
| && lr.getServerProxy() == null) { |
| recordBulkOpEvent(event, membershipID); |
| } |
| } |
| |
| private ThreadIdentifier createThreadIDFromEvent(EventID eventID) { |
| return new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); |
| } |
| |
| /** |
| * Record a version tag for a bulk operation. |
| */ |
| private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier threadID) { |
| EventID eventID = event.getEventId(); |
| |
| VersionTag tag = event.getVersionTag(); |
| if (tag == null) { |
| return; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("recording bulkOp event {} {} {} op={}", threadID.expensiveToString(), eventID, |
| tag, event.getOperation()); |
| } |
| |
| RegionVersionVector versionVector = ((LocalRegion) event.getRegion()).getVersionVector(); |
| canonicalizeIDs(tag, versionVector); |
| |
| // Loop until we can successfully update the recorded bulk operations |
| // For this thread id. |
| boolean retry = false; |
| do { |
| BulkOperationHolder bulkOpTracker = recordedBulkOpVersionTags.get(threadID); |
| if (bulkOpTracker == null) { |
| bulkOpTracker = new BulkOperationHolder(); |
| BulkOperationHolder old = recordedBulkOpVersionTags.putIfAbsent(threadID, bulkOpTracker); |
| if (old != null) { |
| retry = true; |
| continue; |
| } |
| } |
| synchronized (bulkOpTracker) { |
| if (bulkOpTracker.isRemoved()) { |
| retry = true; |
| continue; |
| } |
| |
| // Add the version tag for bulkOp event. |
| bulkOpTracker.putVersionTag(eventID, event.getVersionTag()); |
| retry = false; |
| } |
| } while (retry); |
| } |
| |
| private void canonicalizeIDs(VersionTag tag, RegionVersionVector versionVector) { |
| if (tag != null && versionVector != null) { |
| tag.setMemberID(versionVector.getCanonicalId(tag.getMemberID())); |
| if (tag.getPreviousMemberID() != null) { |
| tag.setPreviousMemberID(versionVector.getCanonicalId(tag.getPreviousMemberID())); |
| } |
| } |
| } |
| |
| @Override |
| public boolean hasSeenEvent(InternalCacheEvent event) { |
| EventID eventID = event.getEventId(); |
| if (ignoreEvent(event, eventID)) { |
| return false; // not tracked |
| } |
| return hasSeenEvent(eventID, event); |
| } |
| |
| @Override |
| public boolean hasSeenEvent(EventID eventID) { |
| return hasSeenEvent(eventID, null); |
| } |
| |
| @Override |
| public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) { |
| if (eventID == null) { |
| return false; |
| } |
| |
| EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID); |
| if (evh == null) { |
| return false; |
| } |
| |
| synchronized (evh) { |
| if (evh.isRemoved() || evh.getLastSequenceNumber() < eventID.getSequenceID()) { |
| return false; |
| } |
| // log at fine because partitioned regions can send event multiple times |
| // during normal operation during bucket region initialization |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE, |
| "Cache encountered replay of event with ID {}. Highest recorded for this source is {}", |
| eventID, evh.getLastSequenceNumber()); |
| } |
| // bug #44956 - recover version tag for duplicate event |
| if (evh.getLastSequenceNumber() == eventID.getSequenceID() && tagHolder != null |
| && evh.getVersionTag() != null) { |
| ((EntryEventImpl) tagHolder).setVersionTag(evh.getVersionTag()); |
| } |
| return true; |
| } |
| } |
| |
| private EventSequenceNumberHolder getSequenceHolderForEvent(EventID eventID) { |
| ThreadIdentifier membershipID = createThreadIDFromEvent(eventID); |
| return recordedEvents.get(membershipID); |
| } |
| |
| @Override |
| public VersionTag findVersionTagForSequence(EventID eventID) { |
| EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID); |
| if (evh == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("search for version tag failed as no event is recorded for {}", |
| createThreadIDFromEvent(eventID).expensiveToString()); |
| } |
| return null; |
| } |
| |
| synchronized (evh) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("search for version tag located last event for {}: {}", |
| createThreadIDFromEvent(eventID).expensiveToString(), evh); |
| } |
| if (evh.getLastSequenceNumber() != eventID.getSequenceID()) { |
| return null; |
| } |
| // log at fine because partitioned regions can send event multiple times |
| // during normal operation during bucket region initialization |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE) |
| && evh.getVersionTag() == null) { |
| logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE, |
| "Could not recover version tag. Found event holder with no version tag for {}", |
| eventID); |
| } |
| return evh.getVersionTag(); |
| } |
| } |
| |
| @Override |
| public VersionTag findVersionTagForBulkOp(EventID eventID) { |
| if (eventID == null) { |
| return null; |
| } |
| ThreadIdentifier threadID = createThreadIDFromEvent(eventID); |
| BulkOperationHolder evh = recordedBulkOpVersionTags.get(threadID); |
| if (evh == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("search for version tag failed as no events are recorded for {}", |
| threadID.expensiveToString()); |
| } |
| return null; |
| } |
| |
| synchronized (evh) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("search for version tag located event holder for {}: {}", |
| threadID.expensiveToString(), evh); |
| } |
| return evh.getEntryVersionTags().get(eventID); |
| } |
| } |
| |
| @Override |
| public String getName() { |
| return name; |
| } |
| |
| /** |
| * @return true if the event should not be tracked, false otherwise |
| */ |
| private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) { |
| if (eventID == null) { |
| return true; |
| } else { |
| boolean isVersioned = (event.getVersionTag() != null); |
| boolean isClient = event.hasClientOrigin(); |
| if (isVersioned && isClient) { |
| return false; // version tags for client events are kept for retries by the client |
| } |
| boolean isEntry = event.getOperation().isEntry(); |
| boolean isPr = event.getRegion().getAttributes().getDataPolicy().withPartitioning() |
| || ((LocalRegion) event.getRegion()).isUsedForPartitionedRegionBucket(); |
| return (!isClient && // ignore if it originated on a server, and |
| isEntry && // it affects an entry and |
| !isPr); // is not on a PR |
| } |
| } |
| |
| @Override |
| public void syncBulkOp(Runnable r, EventID eventID, boolean partOfTransaction) { |
| if (partOfTransaction) { |
| r.run(); |
| return; |
| } |
| Assert.assertTrue(eventID != null); |
| ThreadIdentifier membershipID = createThreadIDFromEvent(eventID); |
| Object opSyncObj = null; |
| do { |
| opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object()); |
| if (opSyncObj == null) { |
| opSyncObj = recordedBulkOps.get(membershipID); |
| } |
| } while (opSyncObj == null); |
| |
| synchronized (opSyncObj) { |
| try { |
| recordBulkOpStart(eventID, membershipID); |
| // Perform the bulk op |
| r.run(); |
| } finally { |
| recordedBulkOps.remove(membershipID); |
| } |
| } |
| } |
| |
| @Override |
| public void recordBulkOpStart(EventID eventID, ThreadIdentifier tid) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("recording bulkOp start for {}", tid.expensiveToString()); |
| } |
| EventSequenceNumberHolder evh = recordedEvents.get(tid); |
| if (evh == null) { |
| return; |
| } |
| synchronized (evh) { |
| // only remove it when a new bulk op occurs |
| if (eventID.getSequenceID() > evh.getLastSequenceNumber()) { |
| this.recordedBulkOpVersionTags.remove(tid); |
| } |
| } |
| } |
| |
| @Override |
| public boolean isInitialized() { |
| return this.initialized; |
| } |
| |
| @Override |
| public boolean isInitialImageProvider(DistributedMember mbr) { |
| return (this.initialImageProvider != null) && (mbr != null) |
| && this.initialImageProvider.equals(mbr); |
| } |
| |
| @Override |
| public ConcurrentMap<ThreadIdentifier, BulkOperationHolder> getRecordedBulkOpVersionTags() { |
| return recordedBulkOpVersionTags; |
| } |
| |
| @Override |
| public ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> getRecordedEvents() { |
| return recordedEvents; |
| } |
| |
| @Override |
| public String toString() { |
| return "" + this.name + "(initialized=" + this.initialized + ")"; |
| } |
| |
| } |