blob: c8aee80f4c64292cc98a55b34ee37fe3bb008347 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.logging.LogService;
public class QueueStateImpl implements QueueState {
private static final Logger logger = LogService.getLogger();
protected QueueManager qManager = null;
private boolean processedMarker = false;
private final AtomicInteger invalidateCount = new AtomicInteger();
/**
* This will store the ThreadId to latest received sequence Id
*
* Keys are instances of {@link ThreadIdentifier} Values are instances of
* {@link SequenceIdAndExpirationObject}
*/
protected final Map threadIdToSequenceId = new LinkedHashMap();
public QueueStateImpl(QueueManager qm) {
this.qManager = qm;
}
@Override
public void processMarker() {
if (!this.processedMarker) {
handleMarker();
this.processedMarker = true;
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: extra marker received", this);
}
}
}
@Override
public boolean getProcessedMarker() {
return this.processedMarker;
}
public void handleMarker() {
ArrayList regions = new ArrayList();
Cache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
return;
}
Set rootRegions = cache.rootRegions();
for (Iterator iter1 = rootRegions.iterator(); iter1.hasNext();) {
Region rootRegion = (Region) iter1.next();
regions.add(rootRegion);
try {
Set subRegions = rootRegion.subregions(true); // throws RDE
for (Iterator iter2 = subRegions.iterator(); iter2.hasNext();) {
regions.add(iter2.next());
}
} catch (RegionDestroyedException e) {
continue; // region is gone go to the next one bug 38705
}
}
for (Iterator iter = regions.iterator(); iter.hasNext();) {
LocalRegion region = (LocalRegion) iter.next();
try {
if (region.getAttributes().getPoolName() != null
&& region.getAttributes().getPoolName().equals(qManager.getPool().getName())) {
region.handleMarker(); // can this throw RDE??
}
} catch (RegionDestroyedException e) {
continue; // region is gone go to the next one bug 38705
}
}
}
@Override
public void incrementInvalidatedStats() {
this.invalidateCount.incrementAndGet();
}
public int getInvalidateCount() {
return this.invalidateCount.get();
}
/**
* test hook - access to this map should be synchronized on the map to avoid concurrent
* modification exceptions
*/
@Override
public Map getThreadIdToSequenceIdMap() {
return this.threadIdToSequenceId;
}
@Override
public boolean verifyIfDuplicate(EventID eid) {
return verifyIfDuplicate(eid, true);
}
@Override
public boolean verifyIfDuplicate(EventID eid, boolean addToMap) {
ThreadIdentifier tid = new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID());
long seqId = eid.getSequenceID();
SequenceIdAndExpirationObject seo = null;
// Fix 36930: save the max sequence id for each non-putAll operation's thread
// There're totally 3 cases to consider:
// check the tid:
// 1) if duplicated, (both putall or non-putall): reject
// 2) if not duplicate
// 2.1)if putAll, check via real thread id again,
// if duplicate, reject (because one non-putall operation with bigger
// seqno has happened)
// otherwise save the putAllSeqno for real thread id
// and save seqno for tid
// 2.2) if not putAll,
// check putAllSequenceId with real thread id
// if request's seqno is smaller, reject (because one putAll operation
// with bigger seqno has happened)
// otherwise, update the seqno for tid
// lock taken to avoid concurrentModification
// while the objects are being expired
synchronized (this.threadIdToSequenceId) {
seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(tid);
if (seo != null && seo.getSequenceId() >= seqId) {
if (logger.isDebugEnabled()) {
logger.debug(" got a duplicate entry with EventId {}. Ignoring the entry", eid);
}
seo.setAckSend(false);
return true;
} else if (addToMap) {
ThreadIdentifier real_tid = new ThreadIdentifier(eid.getMembershipID(),
ThreadIdentifier.getRealThreadIDIncludingWan(eid.getThreadID()));
if (ThreadIdentifier.isPutAllFakeThreadID(eid.getThreadID())) {
// it's a putAll
seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
if (seo != null && seo.getSequenceId() >= seqId) {
if (logger.isDebugEnabled()) {
logger.debug(
"got a duplicate putAll entry with eventId {}. Other operation with same thread id and bigger seqno {} has happened. Ignoring the entry",
eid, seo.getSequenceId());
}
seo.setAckSend(false); // bug #41289: send ack to servers that send old events
return true;
} else {
// save the seqno for real thread id into a putAllSequenceId
this.threadIdToSequenceId.remove(real_tid);
this.threadIdToSequenceId.put(real_tid,
seo == null ? new SequenceIdAndExpirationObject(-1, seqId)
: new SequenceIdAndExpirationObject(seo.getSequenceId(), seqId));
// save seqno for tid
// here tid!=real_tid, for fake tid, putAllSeqno should be 0
this.threadIdToSequenceId.remove(tid);
this.threadIdToSequenceId.put(tid, new SequenceIdAndExpirationObject(seqId, -1));
}
} else {
// non-putAll operation:
// check putAllSeqno for real thread id
// if request's seqno is smaller, reject
// otherwise, update the seqno for tid
seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
if (seo != null && seo.getPutAllSequenceId() >= seqId) {
if (logger.isDebugEnabled()) {
logger.debug(
"got a duplicate non-putAll entry with eventId {}. One putAll operation with same real thread id and bigger seqno {} has happened. Ignoring the entry",
eid, seo.getPutAllSequenceId());
}
seo.setAckSend(false); // bug #41289: send ack to servers that send old events
return true;
} else {
// here tid==real_tid
this.threadIdToSequenceId.remove(tid);
this.threadIdToSequenceId.put(tid,
seo == null ? new SequenceIdAndExpirationObject(seqId, -1)
: new SequenceIdAndExpirationObject(seqId, seo.getPutAllSequenceId()));
}
}
}
}
return false;
}
@Override
public void start(ScheduledExecutorService timer, int interval) {
timer.scheduleWithFixedDelay(new ThreadIdToSequenceIdExpiryTask(), interval, interval,
TimeUnit.MILLISECONDS);
}
/**
*
* Thread which will iterate over threadIdToSequenceId map
*
* 1)It will send an ack primary server for all threadIds for which it has not send an ack. 2)It
* will expire the entries which have exceeded the specified expiry time and for which ack has
* been alerady sent.
*
* @since GemFire 5.1
*
*/
private class ThreadIdToSequenceIdExpiryTask extends PoolTask {
/**
* The expiry time of the entries in the map
*/
private final long expiryTime;
/**
* constructs the Thread and initializes the expiry time
*
*/
public ThreadIdToSequenceIdExpiryTask() {
expiryTime = QueueStateImpl.this.qManager.getPool().getSubscriptionMessageTrackingTimeout();
}
@Override
public void run2() {
SystemFailure.checkFailure();
if (qManager.getPool().getCancelCriterion().isCancelInProgress()) {
return;
}
if (PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG) {
ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.beforeSendingClientAck();
}
sendPeriodicAck();
checkForExpiry();
}
void checkForExpiry() {
synchronized (threadIdToSequenceId) {
Iterator iterator = threadIdToSequenceId.entrySet().iterator();
long currentTime = System.currentTimeMillis();
Map.Entry entry;
SequenceIdAndExpirationObject seo;
while (iterator.hasNext()) {
entry = (Map.Entry) iterator.next();
seo = (SequenceIdAndExpirationObject) entry.getValue();
if ((currentTime - seo.getCreationTime() > this.expiryTime)) {
if (seo.getAckSend() || (qManager.getPool().getSubscriptionRedundancy() == 0
&& !qManager.getPool().isDurableClient())) {
iterator.remove();
}
} else {
break;
}
}
}
}
/**
* Sends Periodic ack to the primary server for all threadIds for which it has not send an ack.
*/
void sendPeriodicAck() {
List events = new ArrayList();
boolean success = false;
synchronized (threadIdToSequenceId) {
Iterator iterator = threadIdToSequenceId.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry entry = (Map.Entry) iterator.next();
SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject) entry.getValue();
if (!seo.getAckSend()) {
ThreadIdentifier tid = (ThreadIdentifier) entry.getKey();
events.add(new EventID(tid.getMembershipID(), tid.getThreadID(), seo.getSequenceId()));
seo.setAckSend(true);
} // if ends
} // while ends
} // synchronized ends
if (events.size() > 0) {
try {
PrimaryAckOp.execute(qManager.getAllConnections().getPrimary(), qManager.getPool(),
events);
success = true;
} catch (Exception ex) {
if (logger.isDebugEnabled())
logger.debug("Exception while sending an ack to the primary server: {}", ex);
} finally {
if (!success) {
Iterator iter = events.iterator();
while (iter.hasNext()) {
EventID eid = (EventID) iter.next();
ThreadIdentifier tid = new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID());
synchronized (threadIdToSequenceId) {
SequenceIdAndExpirationObject seo =
(SequenceIdAndExpirationObject) threadIdToSequenceId.get(tid);
if (seo != null && seo.getAckSend()) {
seo = (SequenceIdAndExpirationObject) threadIdToSequenceId.remove(tid);
if (seo != null) {
// put back the old seqId with a new time stamp
SequenceIdAndExpirationObject siaeo = new SequenceIdAndExpirationObject(
seo.getSequenceId(), seo.getPutAllSequenceId());
threadIdToSequenceId.put(tid, siaeo);
}
} // if ends
} // synchronized ends
} // while ends
} // if(!success) ends
} // finally ends
} // if(events.size() > 0)ends
}// method ends
}
/**
* A class to store sequenceId and the creation time of the object to be used for expiring the
* entry
*
* @since GemFire 5.1
*
*/
public static class SequenceIdAndExpirationObject {
/** The sequence Id of the entry * */
private final long sequenceId;
/** The sequence Id of the putAll operations * */
private final long putAllSequenceId;
/** The time of creation of the object* */
private final long creationTime;
/** Client ack is send to server or not* */
private boolean ackSend;
SequenceIdAndExpirationObject(long sequenceId, long putAllSequenceId) {
this.sequenceId = sequenceId;
this.putAllSequenceId = putAllSequenceId;
this.creationTime = System.currentTimeMillis();
this.ackSend = false;
}
/**
* @return Returns the creationTime.
*/
public long getCreationTime() {
return creationTime;
}
/**
* @return Returns the sequenceId.
*/
public long getSequenceId() {
return sequenceId;
}
/**
* @return Returns the putAllSequenceId.
*/
public long getPutAllSequenceId() {
return putAllSequenceId;
}
/**
*
* @return Returns the ackSend
*/
public boolean getAckSend() {
return ackSend;
}
/**
* Sets the ackSend
*
*/
public void setAckSend(boolean ackSend) {
this.ackSend = ackSend;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("SequenceIdAndExpirationObject[");
sb.append("ackSend = " + this.ackSend);
sb.append("; creation = " + creationTime);
sb.append("; seq = " + sequenceId);
sb.append("; putAll seq = " + putAllSequenceId);
sb.append("]");
return sb.toString();
}
}
}