blob: bb4312a29b4d907554671ac85ed15ebea00439ec [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.distributed.internal;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.Logger;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.DateFormatter;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;
* DSClock tracks the system time. The most useful method is cacheTimeMillis(). The rest are for
* clock adjustments.
* Clock adjustments can be turned off with gemfire.disable-distributed-clock
public class DSClock implements CacheTime {
private static final Logger logger = LogService.getLogger();
private final int MAX_TIME_OFFSET_DIFF = 100; /* in milliseconds */
* Time shift received from server must be at least this far off in order for the cacheTimeMillis
* clock to be changed. Servers are much more aggressive about it.
private static final long MINIMUM_TIME_DIFF = 5000;
* cacheTimeMillis offset from System.currentTimeMillis
private volatile long cacheTimeDelta;
* a task to slow down the clock if cacheTimeDelta decreases in value
private SystemTimerTask cacheTimeTask = null; // task to slow down the clock
* State variable used by the cacheTimeTask
private final AtomicLong suspendedTime = new AtomicLong(0L);
* Is this a client "distributed system"
private final boolean isLoner;
/** GemFire internal test hook for unit testing */
private DSClockTestHook testHook;
protected DSClock(boolean lonerDS) {
isLoner = lonerDS;
public long cacheTimeMillis() {
long result;
final long offset = getCacheTimeOffset();
final long st = getStopTime();
if (st != 0) {
result = st + offset;
if (result < 0 || result > MAX_CACHE_TIME_MILLIS) {
throw new IllegalStateException("Expected cacheTimeMillis " + result + " to be >= 0 and <= "
+ MAX_CACHE_TIME_MILLIS + " stopTime=" + st + " offset=" + offset);
} else {
long ct = System.currentTimeMillis();
result = ct + offset;
if (result < 0 || result > MAX_CACHE_TIME_MILLIS) {
throw new IllegalStateException("Expected cacheTimeMillis " + result + " to be >= 0 and <= "
+ MAX_CACHE_TIME_MILLIS + " curTime=" + ct + " offset=" + offset);
return result;
* @return Offset for system time, calculated by distributed system coordinator.
* @since GemFire 8.0
public long getCacheTimeOffset() {
return cacheTimeDelta;
* Sets the deviation of this process's local time from the rest of the GemFire distributed
* system.
* @since GemFire 8.0
public void setCacheTimeOffset(DistributedMember coord, long offset, boolean isJoin) {
if (Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "disable-distributed-clock")) {
if (isLoner) {
} else {
setServerCacheTimeOffset(coord, offset, isJoin);
// set the time offset in a client cache
private void setLonerCacheTimeOffset(long offset) {
if (offset > (cacheTimeDelta + MINIMUM_TIME_DIFF)) {
long theTime = System.currentTimeMillis();
cacheTimeDelta = offset;
String cacheTime = DateFormatter.formatDate(new Date(theTime + offset));"The current cache time is {}. Delta from the system clock is {} milliseconds.",
new Object[] {cacheTime, cacheTimeDelta});
} else if (offset < (cacheTimeDelta - MINIMUM_TIME_DIFF)) {
// We don't issue a warning for client caches
// if ((this.cacheTimeDelta - offset) >= MAX_TIME_OFFSET_DIFF /* Max offset difference allowed
// */) {
// this.logger.warning(String.format("New cache time offset calculated is off more than %s ms
// from earlier offset.",
// (this.cacheTimeDelta - offset)));
// }
cacheTimeDelta = offset;
// We need to suspend the cacheTimeMillis for (cacheTimeDelta - offset) ms.
private void setServerCacheTimeOffset(DistributedMember coord, long offset, boolean isJoin) {
if (isJoin || offset > cacheTimeDelta) {
long theTime = System.currentTimeMillis();
cacheTimeDelta = offset;
if (cacheTimeDelta <= -300000 || 300000 <= cacheTimeDelta) {
"The clock for this machine may be more than 5 minutes different than the negotiated cache time received from {}",
String cacheTime = DateFormatter.formatDate(new Date(theTime + offset));
if (Math.abs(cacheTimeDelta) > 1000) {
Object src = coord;
if (src == null) {
src = "local clock adjustment";
"The negotiated cache time from {} is {}. Delta from the system clock is {} milliseconds.",
new Object[] {src, cacheTime, cacheTimeDelta});
} else if (!isJoin && offset < cacheTimeDelta) {
// We need to suspend the cacheTimeMillis for (cacheTimeDelta - offset) ms.
if ((cacheTimeDelta
- offset) >= MAX_TIME_OFFSET_DIFF /* Max offset difference allowed */) {
logger.warn("New cache time offset calculated is off more than {} ms from earlier offset.",
(cacheTimeDelta - offset));
* This method is called by a timer task which takes control of cache time and increments the
* cache time at each call of this method.
* The timer task must be called each millisecond. We need to revisit the method implementation if
* that condition is changed.
* @param stw True if Stop the world for this cache for a while.
private void suspendCacheTimeMillis(boolean stw) {
// Increment stop time at each call of this method.
if (stw) {
long oldSt;
long newSt;
do {
oldSt = suspendedTime.get();
if (oldSt == 0) {
newSt = System.currentTimeMillis();
} else {
newSt = oldSt + 1;
} while (!suspendedTime.compareAndSet(oldSt, newSt));
} else {
* Cancel the previous slow down task (If it exists) and schedule a new one.
private void cancelAndScheduleNewCacheTimerTask(long offset) {
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.isClosed()) {
if (cacheTimeTask != null) {
cacheTimeTask = new CacheTimeTask(offset);
SystemTimer timer = cache.getCCPTimer();
timer.scheduleAtFixedRate(cacheTimeTask, 1/* Start after 1ms */ , 2 /* Run task every 2ms */);
if (logger.isDebugEnabled()) {
"Started a timer task to suspend cache time for new lower offset of {}ms and current offset is: {}",
offset, cacheTimeDelta);
public long getStopTime() {
return suspendedTime.get();
* This timer task makes the cache dependent on this DM, to wait (OR in other words stop it's
* cacheTimeMillis() to return constant value until System.currentTimeMillis() + newOffset
* reaches/crosses over that constant time) for difference between old time offset and new one if
* new one is < old one. Because then we need to slow down the cache time aggressively.
private class CacheTimeTask extends SystemTimerTask {
private long lowerCacheTimeOffset = 0L;
public CacheTimeTask(long cacheTimeOffset) {
lowerCacheTimeOffset = cacheTimeOffset;
public void run2() {
boolean isCancelled = false;
long currTime = System.currentTimeMillis();
long cacheTime = cacheTimeMillis();
if (testHook != null) {
testHook.addInformation("CacheTime", cacheTime);
testHook.addInformation("AwaitedTime", currTime + lowerCacheTimeOffset);
if (logger.isDebugEnabled()) {
logger.debug("CacheTime: {}ms and currTime with offset: {}", cacheTime,
(currTime + lowerCacheTimeOffset) + "ms");
// Resume cache time as system time once cache time has slowed down enough.
long systemTime = currTime + lowerCacheTimeOffset;
if (cacheTime <= systemTime) {
setCacheTimeOffset(null, lowerCacheTimeOffset, true);
isCancelled = true;
if (testHook != null) {
testHook.addInformation("FinalCacheTime", cacheTimeMillis());
if (testHook != null && isCancelled) {
testHook.addInformation("TimerTaskCancelled", true);
public boolean cancel() {
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.isClosed()) {
return super.cancel();
public interface DSClockTestHook {
void suspendAtBreakPoint(int breakPoint);
void addInformation(Object key, Object value);
Object getInformation(Object key);
public DSClockTestHook getTestHook() {
return testHook;
public void setTestHook(DSClockTestHook th) {
testHook = th;