blob: 87b66f01296d82ac81133ceb04282d01916e685a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.ExpirationAction;
import org.apache.geode.cache.ExpirationAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.tcp.ConnectionTable;
/**
* ExpiryTask represents a timeout event for expiration
*/
public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
private static final Logger logger = LogService.getLogger();
private LocalRegion region; // no longer final so cancel can null it out see bug 37574
@MakeNotStatic
private static final ExecutorService executor;
static {
// default to inline expiry to fix bug 37115
int nThreads = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "EXPIRY_THREADS", 0);
if (nThreads > 0) {
executor = LoggingExecutors.newThreadPoolWithSynchronousFeed("Expiry ",
(Runnable command) -> doExpiryThread(command),
nThreads);
} else {
executor = null;
}
}
private static void doExpiryThread(Runnable command) {
ConnectionTable.threadWantsSharedResources();
try {
command.run();
} finally {
ConnectionTable.releaseThreadsSockets();
}
}
protected ExpiryTask(LocalRegion region) {
this.region = region;
}
protected abstract ExpirationAttributes getIdleAttributes();
protected abstract ExpirationAttributes getTTLAttributes();
/**
* @return the absolute time (ms since Jan 1, 1970) at which this region expires, due to either
* time-to-live or idle-timeout (whichever will occur first), or 0 if neither are used.
*/
public long getExpirationTime() throws EntryNotFoundException {
long ttl = getTTLExpirationTime();
long idle = getIdleExpirationTime();
if (ttl == 0) {
return idle;
} else if (idle == 0) {
return ttl;
}
return Math.min(ttl, idle);
}
/** Return the absolute time when TTL expiration occurs, or 0 if not used */
public long getTTLExpirationTime() throws EntryNotFoundException {
long ttl = getTTLAttributes().getTimeout();
long tilt = 0;
if (ttl > 0) {
if (getLocalRegion() != null && !getLocalRegion().EXPIRY_UNITS_MS) {
ttl *= 1000;
}
tilt = getLastModifiedTime() + ttl;
}
return tilt;
}
/** Return the absolute time when idle expiration occurs, or 0 if not used */
public long getIdleExpirationTime() throws EntryNotFoundException {
long idle = getIdleTimeoutInMillis();
if (idle > 0) {
return getLastAccessedTime() + idle;
}
return 0L;
}
protected long getIdleTimeoutInMillis() {
long idle = getIdleAttributes().getTimeout();
if (idle > 0) {
if (getLocalRegion() != null && !getLocalRegion().EXPIRY_UNITS_MS) {
idle *= 1000;
}
}
return idle;
}
/**
* Returns the number of milliseconds until this task should expire. The return value will never
* be negative.
*/
long getExpiryMillis() throws EntryNotFoundException {
long extm = getExpirationTime() - getNow();
if (extm < 0L)
return 0L;
else
return extm;
}
/**
* Return true if current task could have expired. Return false if expiration is impossible.
*/
protected boolean isExpirationPossible() throws EntryNotFoundException {
long expTime = getExpirationTime();
if (expTime > 0L) {
long now = getNow();
if (now >= expTime) {
if (isIdleExpiredOnOthers()) {
return true;
} else {
// our last access time was reset so recheck
expTime = getExpirationTime();
if (expTime > 0L && now >= expTime) {
return true;
}
}
}
}
return false;
}
/**
* Added for GEODE-3764.
*
* @return true if other members last access time indicates we have expired
*/
protected boolean isIdleExpiredOnOthers() throws EntryNotFoundException {
// by default return true since we don't need to check with others
return true;
}
/**
* Returns false if the region reliability state does not allow this expiry task to fire.
*/
protected boolean isExpirationAllowed() {
return getLocalRegion().isExpirationAllowed(this);
}
protected void performTimeout() throws CacheException {
if (logger.isDebugEnabled()) {
logger.debug("{}.performTimeout(): getExpirationTime() returns {}", this.toString(),
getExpirationTime());
}
getLocalRegion().performExpiryTimeout(this);
}
protected abstract void basicPerformTimeout(boolean isPending) throws CacheException;
/**
* @guarded.By suspendLock
*/
@MakeNotStatic
private static boolean expirationSuspended = false;
@MakeNotStatic
private static final Object suspendLock = new Object();
/**
* Test method that causes expiration to be suspended until permitExpiration is called.
*
* @since GemFire 5.0
*/
public static void suspendExpiration() {
synchronized (suspendLock) {
expirationSuspended = true;
}
}
public static void permitExpiration() {
synchronized (suspendLock) {
expirationSuspended = false;
suspendLock.notifyAll();
}
}
/**
* Wait until permission is given for expiration to be done. Tests are allowed to suspend
* expiration.
*
* @since GemFire 5.0
*/
private void waitOnExpirationSuspension() {
for (;;) {
getLocalRegion().getCancelCriterion().checkCancelInProgress(null);
synchronized (suspendLock) {
boolean interrupted = Thread.interrupted();
try {
while (expirationSuspended) {
suspendLock.wait();
}
break;
} catch (InterruptedException ex) {
interrupted = true;
getLocalRegion().getCancelCriterion().checkCancelInProgress(null);
// keep going, we can't cancel
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // synchronized
} // for
}
protected boolean expire(boolean isPending) throws CacheException {
ExpirationAction action = getAction();
if (action == null)
return false;
boolean result = expire(action, isPending);
if (result && expiryTaskListener != null) {
expiryTaskListener.afterExpire(this);
}
return result;
}
/**
* Why did this expire?
*
* @return the action to perform or null if NONE
*/
protected ExpirationAction getAction() {
long ttl = getTTLExpirationTime();
long idle = getIdleExpirationTime();
if (ttl == 0) {
if (idle == 0)
return null;
return getIdleAttributes().getAction();
}
if (idle == 0) {
// we know ttl != 0
return getTTLAttributes().getAction();
}
// Neither is 0
if (idle < ttl) {
return getIdleAttributes().getAction();
}
return getTTLAttributes().getAction();
}
/** Returns true if the ExpirationAction is a distributed action. */
protected boolean isDistributedAction() {
ExpirationAction action = getAction();
return action != null && (action.isInvalidate() || action.isDestroy());
}
LocalRegion getLocalRegion() {
return this.region;
}
protected boolean expire(ExpirationAction action, boolean isPending) throws CacheException {
if (action.isInvalidate())
return invalidate();
if (action.isDestroy())
return destroy(isPending);
if (action.isLocalInvalidate())
return localInvalidate();
if (action.isLocalDestroy())
return localDestroy();
throw new InternalGemFireError(
String.format("unrecognized expiration action: %s", action));
}
/**
* Cancel this task
*/
@Override
public boolean cancel() {
boolean superCancel = super.cancel();
LocalRegion lr = getLocalRegion();
if (lr != null) {
if (superCancel) {
this.region = null; // this is the only place it is nulled
}
}
return superCancel;
}
/**
* An ExpiryTask is sent run() to perform its task. Note that this run() method should never throw
* an exception - otherwise, it takes out the java.util.Timer thread, causing an exception
* whenever we try to schedule more expiration tasks.
*/
@Override
public void run2() {
try {
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
runInThreadPool();
}
});
} else {
// inline
runInThreadPool();
}
} catch (RejectedExecutionException ex) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Rejected execution in expiration task", ex);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
// for surviving and debugging exceptions getting the logger
t.printStackTrace();
}
} catch (CancelException e) {
return; // just bail
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable ex) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.fatal("Exception in expiration task", ex);
}
}
protected void runInThreadPool() {
try {
if (isCacheClosing() || getLocalRegion().isClosed() || getLocalRegion().isDestroyed()) {
return;
}
waitOnExpirationSuspension();
if (logger.isTraceEnabled()) {
logger.trace("{} is fired", this);
}
// do our work...
performTimeout();
} catch (RegionDestroyedException re) {
// Ignore - our job is done
} catch (EntryNotFoundException ex) {
// Ignore
} catch (CancelException ex) {
// ignore
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable ex) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.fatal("Exception in expiration task", ex);
} finally {
if (expiryTaskListener != null) {
expiryTaskListener.afterTaskRan(this);
}
}
}
protected boolean isCacheClosing() {
return getLocalRegion().getCache().isClosed();
}
/**
* Reschedule (or not) this task for later consideration
*/
protected abstract void reschedule() throws CacheException;
@Override
public String toString() {
String expTtl = "<unavailable>";
String expIdle = "<unavailable>";
try {
if (getTTLAttributes() != null) {
expTtl = String.valueOf(getTTLExpirationTime());
}
if (getIdleAttributes() != null) {
expIdle = String.valueOf(getIdleExpirationTime());
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
}
return super.toString() + " for " + getLocalRegion() + ", ttl expiration time: " + expTtl
+ ", idle expiration time: " + expIdle + ("[now:" + calculateNow() + "]");
}
////// Abstract methods ///////
protected abstract long getLastModifiedTime() throws EntryNotFoundException;
protected abstract long getLastAccessedTime() throws EntryNotFoundException;
protected abstract boolean invalidate() throws CacheException;
protected abstract boolean destroy(boolean isPending) throws CacheException;
protected abstract boolean localInvalidate() throws EntryNotFoundException;
protected abstract boolean localDestroy() throws CacheException;
protected abstract void addExpiryTask() throws EntryNotFoundException;
public abstract boolean isPending();
public abstract Object getKey();
private static final ThreadLocal<Long> now = new ThreadLocal<Long>();
/**
* To reduce the number of times we need to call calculateNow, you can call this method to set now
* in a thread local. When the run returns the thread local is cleared.
*/
static void doWithNowSet(LocalRegion lr, Runnable runnable) {
now.set(calculateNow(lr.getCache()));
try {
runnable.run();
} finally {
now.remove();
}
}
/**
* Returns the current time in milliseconds. If the current thread has set the now thread local
* then that time is return. Otherwise now is calculated and returned.
*
* @return the current time in milliseconds
*/
protected long getNow() {
long result;
Long tl = now.get();
if (tl != null) {
result = tl.longValue();
} else {
result = calculateNow();
}
return result;
}
public long calculateNow() {
return calculateNow(getLocalRegion().getCache());
}
public static long calculateNow(InternalCache cache) {
if (cache != null) {
// Use cache.cacheTimeMillis here. See bug 52267.
InternalDistributedSystem ids = cache.getInternalDistributedSystem();
if (ids != null) {
return ids.getClock().cacheTimeMillis();
}
}
return 0L;
}
// Should only be set by unit tests
@MutableForTesting
public static ExpiryTaskListener expiryTaskListener;
/**
* Used by tests to determine if events related to an ExpiryTask have happened.
*/
public interface ExpiryTaskListener {
/**
* Called after entry is schedule for expiration.
*/
void afterSchedule(ExpiryTask et);
/**
* Called after the given expiry task has run. This means that the time it was originally
* scheduled to run has elapsed and the scheduler has run the task. While running the task it
* may decide to expire it or reschedule it.
*/
void afterTaskRan(ExpiryTask et);
/**
* Called after the given expiry task has been rescheduled. afterTaskRan can still be called on
* the same task. In some cases a task is rescheduled without expiring it. In others it is
* expired and rescheduled.
*/
void afterReschedule(ExpiryTask et);
/**
* Called after the given expiry task has expired.
*/
void afterExpire(ExpiryTask et);
/**
* Called when task has been canceled
*/
void afterCancel(ExpiryTask et);
}
}