/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal;

import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.logging.log4j.Logger;

import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.DateFormatter;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;

/**
 * DSClock tracks the system time. The most useful method is cacheTimeMillis(). The rest are for
 * clock adjustments.
 *
 * Clock adjustments can be turned off with gemfire.disable-distributed-clock
 *
 */

public class DSClock implements CacheTime {

  private static final Logger logger = LogService.getLogger();

  private final int MAX_TIME_OFFSET_DIFF = 100; /* in milliseconds */
  private final long MAX_CACHE_TIME_MILLIS = 0x00FFFFFFFFFFFFFFL;
  /**
   * Time shift received from server must be at least this far off in order for the cacheTimeMillis
   * clock to be changed. Servers are much more aggressive about it.
   */
  private static final long MINIMUM_TIME_DIFF = 5000;

  /**
   * cacheTimeMillis offset from System.currentTimeMillis
   */
  private volatile long cacheTimeDelta;
  /**
   * a task to slow down the clock if cacheTimeDelta decreases in value
   */
  private SystemTimerTask cacheTimeTask = null; // task to slow down the clock
  /**
   * State variable used by the cacheTimeTask
   */
  private final AtomicLong suspendedTime = new AtomicLong(0L);

  /**
   * Is this a client "distributed system"
   */
  private final boolean isLoner;

  /** GemFire internal test hook for unit testing */
  private DSClockTestHook testHook;


  protected DSClock(boolean lonerDS) {
    isLoner = lonerDS;
  }

  @Override
  public long cacheTimeMillis() {
    long result;
    final long offset = getCacheTimeOffset();
    final long st = getStopTime();
    if (st != 0) {
      result = st + offset;
      if (result < 0 || result > MAX_CACHE_TIME_MILLIS) {
        throw new IllegalStateException("Expected cacheTimeMillis " + result + " to be >= 0 and <= "
            + MAX_CACHE_TIME_MILLIS + " stopTime=" + st + " offset=" + offset);
      }
    } else {
      long ct = System.currentTimeMillis();
      result = ct + offset;
      if (result < 0 || result > MAX_CACHE_TIME_MILLIS) {
        throw new IllegalStateException("Expected cacheTimeMillis " + result + " to be >= 0 and <= "
            + MAX_CACHE_TIME_MILLIS + " curTime=" + ct + " offset=" + offset);
      }
    }
    return result;
  }

  /**
   * @return Offset for system time, calculated by distributed system coordinator.
   * @since GemFire 8.0
   */
  public long getCacheTimeOffset() {
    return cacheTimeDelta;
  }

  /**
   * Sets the deviation of this process's local time from the rest of the GemFire distributed
   * system.
   *
   * @since GemFire 8.0
   */
  public void setCacheTimeOffset(DistributedMember coord, long offset, boolean isJoin) {
    if (Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "disable-distributed-clock")) {
      return;
    }
    if (isLoner) {
      setLonerCacheTimeOffset(offset);
    } else {
      setServerCacheTimeOffset(coord, offset, isJoin);
    }
  }

  // set the time offset in a client cache
  private void setLonerCacheTimeOffset(long offset) {
    if (offset > (cacheTimeDelta + MINIMUM_TIME_DIFF)) {
      long theTime = System.currentTimeMillis();
      cacheTimeDelta = offset;

      String cacheTime = DateFormatter.formatDate(new Date(theTime + offset));
      logger.info("The current cache time is {}.  Delta from the system clock is {} milliseconds.",
          new Object[] {cacheTime, cacheTimeDelta});

    } else if (offset < (cacheTimeDelta - MINIMUM_TIME_DIFF)) {
      // We don't issue a warning for client caches
      // if ((this.cacheTimeDelta - offset) >= MAX_TIME_OFFSET_DIFF /* Max offset difference allowed
      // */) {
      // this.logger.warning(String.format("New cache time offset calculated is off more than %s ms
      // from earlier offset.",
      // (this.cacheTimeDelta - offset)));
      // }
      cacheTimeDelta = offset;
      // We need to suspend the cacheTimeMillis for (cacheTimeDelta - offset) ms.
      cancelAndScheduleNewCacheTimerTask(offset);
    }
  }

  private void setServerCacheTimeOffset(DistributedMember coord, long offset, boolean isJoin) {

    if (isJoin || offset > cacheTimeDelta) {
      long theTime = System.currentTimeMillis();
      cacheTimeDelta = offset;
      if (cacheTimeDelta <= -300000 || 300000 <= cacheTimeDelta) {
        logger.warn(
            "The clock for this machine may be more than 5 minutes different than the negotiated cache time received from {}",
            coord);
      }
      String cacheTime = DateFormatter.formatDate(new Date(theTime + offset));
      if (Math.abs(cacheTimeDelta) > 1000) {
        Object src = coord;
        if (src == null) {
          src = "local clock adjustment";
        }
        logger.info(
            "The negotiated cache time from {} is {}.  Delta from the system clock is {} milliseconds.",
            new Object[] {src, cacheTime, cacheTimeDelta});
      }
    } else if (!isJoin && offset < cacheTimeDelta) {
      // We need to suspend the cacheTimeMillis for (cacheTimeDelta - offset) ms.
      if ((cacheTimeDelta
          - offset) >= MAX_TIME_OFFSET_DIFF /* Max offset difference allowed */) {
        logger.warn("New cache time offset calculated is off more than {} ms from earlier offset.",
            (cacheTimeDelta - offset));
      }

      cancelAndScheduleNewCacheTimerTask(offset);
    }
  }

  /**
   * This method is called by a timer task which takes control of cache time and increments the
   * cache time at each call of this method.
   *
   * The timer task must be called each millisecond. We need to revisit the method implementation if
   * that condition is changed.
   *
   * @param stw True if Stop the world for this cache for a while.
   */
  private void suspendCacheTimeMillis(boolean stw) {
    // Increment stop time at each call of this method.
    if (stw) {
      long oldSt;
      long newSt;
      do {
        oldSt = suspendedTime.get();
        if (oldSt == 0) {
          newSt = System.currentTimeMillis();
        } else {
          newSt = oldSt + 1;
        }
      } while (!suspendedTime.compareAndSet(oldSt, newSt));
    } else {
      suspendedTime.set(0);
    }
  }

  /**
   * Cancel the previous slow down task (If it exists) and schedule a new one.
   */
  private void cancelAndScheduleNewCacheTimerTask(long offset) {

    InternalCache cache = GemFireCacheImpl.getInstance();

    if (cache != null && !cache.isClosed()) {
      if (cacheTimeTask != null) {
        cacheTimeTask.cancel();
      }
      cacheTimeTask = new CacheTimeTask(offset);
      SystemTimer timer = cache.getCCPTimer();
      timer.scheduleAtFixedRate(cacheTimeTask, 1/* Start after 1ms */ , 2 /* Run task every 2ms */);
      if (logger.isDebugEnabled()) {
        logger.debug(
            "Started a timer task to suspend cache time for new lower offset of {}ms and current offset is: {}",
            offset, cacheTimeDelta);
      }
    }
  }

  public long getStopTime() {
    return suspendedTime.get();
  }

  /**
   * This timer task makes the cache dependent on this DM, to wait (OR in other words stop it's
   * cacheTimeMillis() to return constant value until System.currentTimeMillis() + newOffset
   * reaches/crosses over that constant time) for difference between old time offset and new one if
   * new one is < old one. Because then we need to slow down the cache time aggressively.
   *
   *
   */
  private class CacheTimeTask extends SystemTimerTask {

    private long lowerCacheTimeOffset = 0L;


    public CacheTimeTask(long cacheTimeOffset) {
      super();
      lowerCacheTimeOffset = cacheTimeOffset;
    }

    @Override
    public void run2() {
      boolean isCancelled = false;

      suspendCacheTimeMillis(true);

      long currTime = System.currentTimeMillis();
      long cacheTime = cacheTimeMillis();

      if (testHook != null) {
        testHook.suspendAtBreakPoint(1);
        testHook.addInformation("CacheTime", cacheTime);
        testHook.addInformation("AwaitedTime", currTime + lowerCacheTimeOffset);
      }
      if (logger.isDebugEnabled()) {
        logger.debug("CacheTime: {}ms and currTime with offset: {}", cacheTime,
            (currTime + lowerCacheTimeOffset) + "ms");
      }

      // Resume cache time as system time once cache time has slowed down enough.
      long systemTime = currTime + lowerCacheTimeOffset;

      if (cacheTime <= systemTime) {
        setCacheTimeOffset(null, lowerCacheTimeOffset, true);
        suspendCacheTimeMillis(false);
        cancel();
        isCancelled = true;
        if (testHook != null) {
          testHook.suspendAtBreakPoint(2);
          testHook.addInformation("FinalCacheTime", cacheTimeMillis());
        }
      }

      if (testHook != null && isCancelled) {
        testHook.suspendAtBreakPoint(3);
        testHook.addInformation("TimerTaskCancelled", true);
      }
    }

    @Override
    public boolean cancel() {
      InternalCache cache = GemFireCacheImpl.getInstance();
      if (cache != null && !cache.isClosed()) {
        suspendCacheTimeMillis(false);
      }
      return super.cancel();
    }
  }

  public interface DSClockTestHook {
    void suspendAtBreakPoint(int breakPoint);

    void addInformation(Object key, Object value);

    Object getInformation(Object key);
  }

  public DSClockTestHook getTestHook() {
    return testHook;
  }

  public void setTestHook(DSClockTestHook th) {
    testHook = th;
  }

}
