/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.control;

import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState;
import org.apache.geode.internal.cache.control.ResourceAdvisor.ResourceManagerProfile;
import org.apache.geode.internal.offheap.MemoryAllocator;
import org.apache.geode.internal.offheap.MemoryUsageListener;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * Allows for the setting of eviction and critical thresholds. These thresholds are compared against
 * current off-heap usage and, with the help of {#link InternalResourceManager}, dispatches events
 * when the thresholds are crossed.
 *
 * @since Geode 1.0
 */
public class OffHeapMemoryMonitor implements MemoryMonitor, MemoryUsageListener {
  private static final Logger logger = LogService.getLogger();
  private volatile MemoryThresholds thresholds = new MemoryThresholds(0);
  private volatile MemoryEvent mostRecentEvent = new MemoryEvent(ResourceType.OFFHEAP_MEMORY,
      MemoryState.DISABLED, MemoryState.DISABLED, null, 0L, true, this.thresholds);
  private volatile MemoryState currentState = MemoryState.DISABLED;

  // Set when startMonitoring() and stopMonitoring() are called
  // Package private for testing
  Boolean started = false;

  // Set to true when setEvictionThreshold(...) is called.
  private boolean hasEvictionThreshold = false;

  private Thread memoryListenerThread;

  private final OffHeapMemoryUsageListener offHeapMemoryUsageListener;
  private final InternalResourceManager resourceManager;
  private final ResourceAdvisor resourceAdvisor;
  private final InternalCache cache;
  private final ResourceManagerStats stats;
  /**
   * InternalResoruceManager insists on creating a OffHeapMemoryMonitor even when it does not have
   * off-heap memory. So we need to handle memoryAllocator being null.
   */
  private final MemoryAllocator memoryAllocator;

  OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final InternalCache cache,
      final MemoryAllocator memoryAllocator, final ResourceManagerStats stats) {
    this.resourceManager = resourceManager;
    this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
    this.cache = cache;
    this.stats = stats;

    this.memoryAllocator = memoryAllocator;
    if (memoryAllocator != null) {
      this.thresholds = new MemoryThresholds(this.memoryAllocator.getTotalMemory());
    }

    this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener();
  }

  /**
   * Start monitoring off-heap memory usage by adding this as a listener to the off-heap memory
   * allocator.
   */
  private void startMonitoring() {
    synchronized (this) {
      if (this.started) {
        return;
      }

      Thread t =
          new LoggingThread("OffHeapMemoryListener", this.offHeapMemoryUsageListener);
      t.setPriority(Thread.MAX_PRIORITY);
      t.start();
      this.memoryListenerThread = t;

      this.memoryAllocator.addMemoryUsageListener(this);

      this.started = true;
    }
  }

  /**
   * Stop monitoring off-heap usage.
   */
  @Override
  public void stopMonitoring() {
    stopMonitoring(false);
  }

  public void stopMonitoring(boolean waitForThread) {
    Thread threadToWaitFor = null;
    synchronized (this) {
      if (!this.started) {
        return;
      }

      this.memoryAllocator.removeMemoryUsageListener(this);

      this.offHeapMemoryUsageListener.stop();
      if (waitForThread) {
        threadToWaitFor = this.memoryListenerThread;
      }
      this.memoryListenerThread = null;
      this.started = false;
    }
    if (threadToWaitFor != null) {
      try {
        threadToWaitFor.join();
      } catch (InterruptedException ignore) {
        Thread.currentThread().interrupt();
      }
    }
  }

  public volatile OffHeapMemoryMonitorObserver testHook;

  /**
   * Used by unit tests to be notified when OffHeapMemoryMonitor does something.
   */
  public interface OffHeapMemoryMonitorObserver {
    /**
     * Called at the beginning of updateMemoryUsed.
     *
     * @param bytesUsed the number of bytes of off-heap memory currently used
     * @param willSendEvent true if an event will be sent to the OffHeapMemoryUsageListener.
     */
    void beginUpdateMemoryUsed(long bytesUsed, boolean willSendEvent);

    void afterNotifyUpdateMemoryUsed(long bytesUsed);

    /**
     * Called at the beginning of updateStateAndSendEvent.
     *
     * @param bytesUsed the number of bytes of off-heap memory currently used
     * @param willSendEvent true if an event will be sent to the OffHeapMemoryUsageListener.
     */
    void beginUpdateStateAndSendEvent(long bytesUsed, boolean willSendEvent);

    void updateStateAndSendEventBeforeProcess(long bytesUsed, MemoryEvent event);

    void updateStateAndSendEventBeforeAbnormalProcess(long bytesUsed, MemoryEvent event);

    void updateStateAndSendEventIgnore(long bytesUsed, MemoryState oldState, MemoryState newState,
        long mostRecentBytesUsed, boolean deliverNextAbnormalEvent);
  }

  @Override
  public void updateMemoryUsed(final long bytesUsed) {
    final boolean willSendEvent = mightSendEvent(bytesUsed);
    final OffHeapMemoryMonitorObserver _testHook = this.testHook;
    if (_testHook != null) {
      _testHook.beginUpdateMemoryUsed(bytesUsed, willSendEvent);
    }
    if (!willSendEvent) {
      return;
    }
    this.offHeapMemoryUsageListener.deliverEvent();
    if (_testHook != null) {
      _testHook.afterNotifyUpdateMemoryUsed(bytesUsed);
    }
  }

  void setCriticalThreshold(final float criticalThreshold) {
    synchronized (this) {
      // If the threshold isn't changing then don't do anything.
      if (criticalThreshold == this.thresholds.getCriticalThreshold()) {
        return;
      }

      // Do some basic sanity checking on the new threshold
      if (criticalThreshold > 100.0f || criticalThreshold < 0.0f) {
        throw new IllegalArgumentException(
            "Critical percentage must be greater than 0.0 and less than or equal to 100.0.");
      }
      if (this.memoryAllocator == null) {
        throw new IllegalStateException(
            "No off-heap memory has been configured.");
      }
      if (criticalThreshold != 0 && this.thresholds.isEvictionThresholdEnabled()
          && criticalThreshold <= this.thresholds.getEvictionThreshold()) {
        throw new IllegalArgumentException(
            "Critical percentage must be greater than the eviction percentage.");
      }

      this.cache.setQueryMonitorRequiredForResourceManager(criticalThreshold != 0);

      this.thresholds = new MemoryThresholds(this.thresholds.getMaxMemoryBytes(), criticalThreshold,
          this.thresholds.getEvictionThreshold());

      updateStateAndSendEvent(getBytesUsed());

      // Start or stop monitoring based upon whether a threshold has been set
      if (this.thresholds.isEvictionThresholdEnabled()
          || this.thresholds.isCriticalThresholdEnabled()) {
        startMonitoring();
      } else if (!this.thresholds.isEvictionThresholdEnabled()
          && !this.thresholds.isCriticalThresholdEnabled()) {
        stopMonitoring();
      }

      this.stats.changeOffHeapCriticalThreshold(this.thresholds.getCriticalThresholdBytes());
    }
  }

  @Override
  public boolean hasEvictionThreshold() {
    return this.hasEvictionThreshold;
  }

  void setEvictionThreshold(final float evictionThreshold) {
    this.hasEvictionThreshold = true;

    synchronized (this) {
      // If the threshold isn't changing then don't do anything.
      if (evictionThreshold == this.thresholds.getEvictionThreshold()) {
        return;
      }

      // Do some basic sanity checking on the new threshold
      if (evictionThreshold > 100.0f || evictionThreshold < 0.0f) {
        throw new IllegalArgumentException(
            "Eviction percentage must be greater than 0.0 and less than or equal to 100.0.");
      }
      if (this.memoryAllocator == null) {
        throw new IllegalStateException(
            "No off-heap memory has been configured.");
      }
      if (evictionThreshold != 0 && this.thresholds.isCriticalThresholdEnabled()
          && evictionThreshold >= this.thresholds.getCriticalThreshold()) {
        throw new IllegalArgumentException(
            "Eviction percentage must be less than the critical percentage.");
      }

      this.thresholds = new MemoryThresholds(this.thresholds.getMaxMemoryBytes(),
          this.thresholds.getCriticalThreshold(), evictionThreshold);

      updateStateAndSendEvent(getBytesUsed());

      // Start or stop monitoring based upon whether a threshold has been set
      if (this.thresholds.isEvictionThresholdEnabled()
          || this.thresholds.isCriticalThresholdEnabled()) {
        startMonitoring();
      } else if (!this.thresholds.isEvictionThresholdEnabled()
          && !this.thresholds.isCriticalThresholdEnabled()) {
        stopMonitoring();
      }

      this.stats.changeOffHeapEvictionThreshold(this.thresholds.getEvictionThresholdBytes());
    }
  }

  /**
   * Compare the number of bytes used (fetched from the JVM) to the thresholds. If necessary, change
   * the state and send an event for the state change.
   *
   * @return true if an event was sent
   */
  public boolean updateStateAndSendEvent() {
    return updateStateAndSendEvent(getBytesUsed());
  }

  /**
   * Compare the number of bytes used to the thresholds. If necessary, change the state and send an
   * event for the state change.
   *
   * Public for testing.
   *
   * @param bytesUsed Number of bytes of off-heap memory currently used.
   * @return true if an event was sent
   */
  public boolean updateStateAndSendEvent(long bytesUsed) {
    boolean result = false;
    synchronized (this) {
      final MemoryEvent mre = this.mostRecentEvent;
      final MemoryState oldState = mre.getState();
      final MemoryThresholds thresholds = this.thresholds;
      final OffHeapMemoryMonitorObserver _testHook = this.testHook;
      MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
      if (oldState != newState) {
        this.currentState = newState;

        MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState,
            this.cache.getMyId(), bytesUsed, true, thresholds);
        if (_testHook != null) {
          _testHook.updateStateAndSendEventBeforeProcess(bytesUsed, event);
        }
        this.mostRecentEvent = event;
        processLocalEvent(event);
        updateStatsFromEvent(event);
        result = true;

      } else if (!oldState.isNormal() && bytesUsed != mre.getBytesUsed()
          && this.deliverNextAbnormalEvent) {
        this.deliverNextAbnormalEvent = false;
        MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState,
            this.cache.getMyId(), bytesUsed, true, thresholds);
        if (_testHook != null) {
          _testHook.updateStateAndSendEventBeforeAbnormalProcess(bytesUsed, event);
        }
        this.mostRecentEvent = event;
        processLocalEvent(event);
        result = true;
      } else {
        if (_testHook != null) {
          _testHook.updateStateAndSendEventIgnore(bytesUsed, oldState, newState, mre.getBytesUsed(),
              this.deliverNextAbnormalEvent);
        }

      }
    }
    return result;
  }

  /**
   * Return true if the given number of bytes compared to the current monitor state would generate a
   * new memory event.
   *
   * @param bytesUsed Number of bytes of off-heap memory currently used.
   * @return true if a new event might need to be sent
   */
  private boolean mightSendEvent(long bytesUsed) {
    final MemoryEvent mre = this.mostRecentEvent;
    final MemoryState oldState = mre.getState();
    final MemoryThresholds thresholds = mre.getThresholds();
    MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
    if (oldState != newState) {
      return true;
    } else if (!oldState.isNormal() && bytesUsed != mre.getBytesUsed()
        && this.deliverNextAbnormalEvent) {
      return true;
    }
    return false;
  }

  private volatile boolean deliverNextAbnormalEvent = false;

  /**
   * Used by the OffHeapMemoryUsageListener to tell us that the next abnormal event should be
   * delivered even if the state does not change as long as the memory usage changed. For some
   * reason, unknown to me, if we stay in an abnormal state for more than a second then we want to
   * send another event to update the memory usage.
   */
  void deliverNextAbnormalEvent() {
    this.deliverNextAbnormalEvent = true;
  }

  /**
   * Update resource manager stats based upon the given event.
   *
   * @param event Event from which to derive data for updating stats.
   */
  private void updateStatsFromEvent(MemoryEvent event) {
    if (event.isLocal()) {
      if (event.getState().isCritical() && !event.getPreviousState().isCritical()) {
        this.stats.incOffHeapCriticalEvents();
      } else if (!event.getState().isCritical() && event.getPreviousState().isCritical()) {
        this.stats.incOffHeapSafeEvents();
      }

      if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
        this.stats.incOffHeapEvictionStartEvents();
      } else if (!event.getState().isEviction() && event.getPreviousState().isEviction()) {
        this.stats.incOffHeapEvictionStopEvents();
      }
    }
  }

  /**
   * Populate off-heap memory data in the given profile.
   *
   * @param profile Profile to populate.
   */
  @Override
  public void fillInProfile(final ResourceManagerProfile profile) {
    final MemoryEvent eventToPopulate = this.mostRecentEvent;
    profile.setOffHeapData(eventToPopulate.getBytesUsed(), eventToPopulate.getState(),
        eventToPopulate.getThresholds());
  }

  @Override
  public MemoryState getState() {
    return this.currentState;
  }

  @Override
  public MemoryThresholds getThresholds() {
    MemoryThresholds saveThresholds = this.thresholds;

    return new MemoryThresholds(saveThresholds.getMaxMemoryBytes(),
        saveThresholds.getCriticalThreshold(), saveThresholds.getEvictionThreshold());
  }

  /**
   * Returns the number of bytes of memory reported by the memory allocator as currently in use.
   */
  @Override
  public long getBytesUsed() {
    if (this.memoryAllocator == null) {
      return 0;
    }
    return this.memoryAllocator.getUsedMemory();
  }

  /**
   * Deliver a memory event from one of the monitors to both local listeners and remote resource
   * managers. Also, if a critical event is received and a query monitor has been enabled, then the
   * query monitor will be notified.
   *
   * Package private for testing.
   *
   * @param event Event to process.
   */
  void processLocalEvent(MemoryEvent event) {
    assert event.isLocal();

    if (logger.isDebugEnabled()) {
      logger.debug("Handling new local event {}", event);
    }

    if (event.getState().isCritical() && !event.getPreviousState().isCritical()) {
      logger.error("Member: {} above {} critical threshold",
          new Object[] {event.getMember(), "off-heap"});
    } else if (!event.getState().isCritical() && event.getPreviousState().isCritical()) {
      logger.error("Member: {} below {} critical threshold",
          new Object[] {event.getMember(), "off-heap"});
    }

    if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
      logger
          .info("Member: {} above {} eviction threshold",
              new Object[] {event.getMember(), "off-heap"});
    } else if (!event.getState().isEviction() && event.getPreviousState().isEviction()) {
      logger
          .info("Member: {} below {} eviction threshold",
              new Object[] {event.getMember(), "off-heap"});
    }

    if (logger.isDebugEnabled()) {
      logger.debug("Informing remote members of event {}", event);
    }

    this.resourceAdvisor.updateRemoteProfile();
    this.resourceManager.deliverLocalEvent(event);
  }

  @Override
  public void notifyListeners(final Set<ResourceListener> listeners, final ResourceEvent event) {
    for (ResourceListener listener : listeners) {
      try {
        listener.onEvent(event);
      } catch (CancelException ignore) {
        // ignore
      } catch (Throwable t) {
        logger.error("Exception occurred when notifying listeners ", t);
      }
    }
  }

  @Override
  public String toString() {
    return "OffHeapMemoryMonitor [thresholds=" + this.thresholds + ", mostRecentEvent="
        + this.mostRecentEvent + "]";
  }

  class OffHeapMemoryUsageListener implements Runnable {
    private boolean deliverEvent = false;
    private boolean stopRequested = false;

    OffHeapMemoryUsageListener() {}

    public synchronized void deliverEvent() {
      this.deliverEvent = true;
      this.notifyAll();
    }

    public synchronized void stop() {
      this.stopRequested = true;
      this.notifyAll();
    }

    @Override
    public void run() {
      if (logger.isDebugEnabled()) {
        logger.debug("OffHeapMemoryUsageListener is starting {}", this);
      }
      int callsWithNoEvent = 0;
      final int MS_TIMEOUT = 10;
      final int MAX_CALLS_WITH_NO_EVENT = 1000 / MS_TIMEOUT;
      boolean exitRunLoop = false;
      while (!exitRunLoop) {
        if (!updateStateAndSendEvent()) {
          callsWithNoEvent++;
          if (callsWithNoEvent > MAX_CALLS_WITH_NO_EVENT) {
            deliverNextAbnormalEvent();
            callsWithNoEvent = 0;
          }
        } else {
          callsWithNoEvent = 0;
        }

        synchronized (this) {
          if (this.stopRequested) {
            exitRunLoop = true;
          } else if (this.deliverEvent) {
            // No need to wait.
            // Loop around and call updateStateAndSendEvent.
            this.deliverEvent = false;
          } else {
            // Wait to be notified that off-heap memory changed
            // or for the wait to timeout.
            // In some cases we need to generate an event even
            // when we have not been notified (see GEODE-438).
            // So we don't want this wait to be for very long.
            try {
              this.wait(MS_TIMEOUT);
              this.deliverEvent = false;
            } catch (InterruptedException ignore) {
              logger.warn("OffHeapMemoryUsageListener was interrupted {}", this);
              this.stopRequested = true;
              exitRunLoop = true;
            }
          }
        }
      }

      if (logger.isDebugEnabled()) {
        logger.debug("OffHeapMemoryUsageListener is stopping {}", this);
      }
    }

    @Override
    public String toString() {
      final StringBuilder sb = new StringBuilder(getClass().getSimpleName());
      sb.append(" Thread").append(" #").append(System.identityHashCode(this));
      return sb.toString();
    }
  }
}
