/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.memory;

import static org.apache.drill.exec.memory.BaseAllocator.indent;
import io.netty.buffer.DrillBuf;
import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.buffer.UnsafeDirectLittleEndian;

import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.drill.common.AutoCloseables.Closeable;
import org.apache.drill.common.HistoricalLog;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.ops.BufferManager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

/**
 * Manages the relationship between one or more allocators and a particular
 * UDLE. Ensures that one allocator owns the memory that multiple allocators may
 * be referencing. Manages a BufferLedger between each of its associated
 * allocators. This class is also responsible for managing when memory is
 * allocated and returned to the Netty-based {code PooledByteBufAllocatorL}.
 * <p>
 * The only reason that this isn't package private is we're forced to put
 * DrillBuf in Netty's package which need access to these objects or methods.
 * <p>
 * Threading: AllocationManager manages thread-safety internally. Operations
 * within the context of a single BufferLedger are lockless in nature and can be
 * leveraged by multiple threads. Operations that cross the context of two
 * ledgers will acquire a lock on the AllocationManager instance. Important
 * note, there is one AllocationManager per UnsafeDirectLittleEndian buffer
 * allocation. As such, there will be thousands of these in a typical query. The
 * contention of acquiring a lock on AllocationManager should be very low.
 */
public class AllocationManager {

  private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
  private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
  static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getRegistry());

  private final RootAllocator root;
  private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
  private final int size;
  private final UnsafeDirectLittleEndian underlying;
  private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
  private final ReadWriteLock lock = new ReentrantReadWriteLock();
  private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
  private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
  private final long amCreationTime = System.nanoTime();

  private volatile BufferLedger owningLedger;
  private volatile long amDestructionTime = 0;

  AllocationManager(BaseAllocator accountingAllocator, int size) {
    Preconditions.checkNotNull(accountingAllocator);
    accountingAllocator.assertOpen();

    this.root = accountingAllocator.root;
    this.underlying = INNER_ALLOCATOR.allocate(size);

    // we do a no retain association since our creator will want to retrieve the newly created ledger and will create a
    // reference count at that point
    this.owningLedger = associate(accountingAllocator, false);
    this.size = underlying.capacity();
  }

  /**
   * Associate the existing underlying buffer with a new allocator. This will
   * increase the reference count to the provided ledger by 1.
   *
   * @param allocator
   *          The target allocator to associate this buffer with.
   * @return The Ledger (new or existing) that associates the underlying buffer
   *         to this new ledger.
   */
  BufferLedger associate(final BaseAllocator allocator) {
    return associate(allocator, true);
  }

  private BufferLedger associate(final BaseAllocator allocator, final boolean retain) {
    allocator.assertOpen();

    if (root != allocator.root) {
      throw new IllegalStateException(
          "A buffer can only be associated between two allocators that share the same root.");
    }

    try (@SuppressWarnings("unused") Closeable read = readLock.open()) {

      final BufferLedger ledger = map.get(allocator);
      if (ledger != null) {
        if (retain) {
          ledger.inc();
        }
        return ledger;
      }
    }
    try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
      // we have to recheck existing ledger since a second reader => writer could be competing with us.

      final BufferLedger existingLedger = map.get(allocator);
      if (existingLedger != null) {
        if (retain) {
          existingLedger.inc();
        }
        return existingLedger;
      }

      final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
      if (retain) {
        ledger.inc();
      }
      BufferLedger oldLedger = map.put(allocator, ledger);
      Preconditions.checkArgument(oldLedger == null);
      allocator.associateLedger(ledger);
      return ledger;
    }
  }

  public static int chunkSize() {
    return INNER_ALLOCATOR.getChunkSize();
  }

  /**
   * The way that a particular BufferLedger communicates back to the
   * AllocationManager that it now longer needs to hold a reference to
   * particular piece of memory.
   */
  private class ReleaseListener {

    private final BufferAllocator allocator;

    public ReleaseListener(BufferAllocator allocator) {
      this.allocator = allocator;
    }

    /**
     * Can only be called when you already hold the writeLock.
     */
    public void release() {
      allocator.assertOpen();

      final BufferLedger oldLedger = map.remove(allocator);
      oldLedger.allocator.dissociateLedger(oldLedger);

      if (oldLedger == owningLedger) {
        if (map.isEmpty()) {
          // no one else owns, lets release.
          oldLedger.allocator.releaseBytes(size);
          underlying.release();
          amDestructionTime = System.nanoTime();
          owningLedger = null;
        } else {
          // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
          BufferLedger newLedger = map.values().iterator().next();

          // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
          // since this consumer can't do anything with this.
          oldLedger.transferBalance(newLedger);
        }
      } else {
        if (map.isEmpty()) {
          throw new IllegalStateException("The final removal of a ledger should be connected to the owning ledger.");
        }
      }
    }
  }

  /**
   * The reference manager that binds an allocator manager to a particular
   * BaseAllocator. Also responsible for creating a set of DrillBufs that share
   * a common fate and set of reference counts. As with AllocationManager, the
   * only reason this is public is due to DrillBuf being in io.netty.buffer
   * package.
   */
  public class BufferLedger {

    private final IdentityHashMap<DrillBuf, Object> buffers =
        BaseAllocator.DEBUG ? new IdentityHashMap<>() : null;

    private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger
    private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain
                                                                  // correctly
    private final long lCreationTime = System.nanoTime();
    private volatile long lDestructionTime = 0;
    private final BaseAllocator allocator;
    private final ReleaseListener listener;
    private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
        "BufferLedger[%d]", 1)
        : null;

    private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
      this.allocator = allocator;
      this.listener = listener;
    }

    /**
     * Transfer any balance the current ledger has to the target ledger. In the
     * case that the current ledger holds no memory, no transfer is made to the
     * new ledger.
     *
     * @param target
     *          The ledger to transfer ownership account to.
     * @return Whether transfer fit within target ledgers limits.
     */
    public boolean transferBalance(final BufferLedger target) {
      Preconditions.checkNotNull(target);
      Preconditions.checkArgument(allocator.root == target.allocator.root,
          "You can only transfer between two allocators that share the same root.");
      allocator.assertOpen();

      target.allocator.assertOpen();
      // if we're transferring to ourself, just return.
      if (target == this) {
        return true;
      }

      // since two balance transfers out from the allocator manager could cause incorrect accounting,
      // we need to ensure that this won't happen by synchronizing on the allocator manager instance.
      try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
        if (owningLedger != this) {
          return true;
        }

        if (BaseAllocator.DEBUG) {
          this.historicalLog.recordEvent("transferBalance(%s)", target.allocator.name);
          target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
        }

        boolean overlimit = target.allocator.forceAllocate(size);
        allocator.releaseBytes(size);
        owningLedger = target;
        return overlimit;
      }
    }

    /**
     * Print the current ledger state to a the provided StringBuilder.
     * @param sb
     *          The StringBuilder to populate.
     * @param indent
     *          The level of indentation to position the data.
     * @param verbosity
     *          The level of verbosity to print.
     */
    public void print(StringBuilder sb, int indent, Verbosity verbosity) {
      indent(sb, indent)
          .append("ledger[")
          .append(ledgerId)
          .append("] allocator: ")
          .append(allocator.name)
          .append("), isOwning: ")
          .append(owningLedger == this)
          .append(", size: ")
          .append(size)
          .append(", references: ")
          .append(bufRefCnt.get())
          .append(", life: ")
          .append(lCreationTime)
          .append("..")
          .append(lDestructionTime)
          .append(", allocatorManager: [")
          .append(AllocationManager.this.allocatorManagerId)
          .append(", life: ")
          .append(amCreationTime)
          .append("..")
          .append(amDestructionTime);

      if (!BaseAllocator.DEBUG) {
        sb.append("]\n");
      } else {
        synchronized (buffers) {
          sb.append("] holds ")
              .append(buffers.size())
              .append(" buffers. \n");
          for (DrillBuf buf : buffers.keySet()) {
            buf.print(sb, indent + 2, verbosity);
            sb.append('\n');
          }
        }
      }
    }

    private void inc() {
      bufRefCnt.incrementAndGet();
    }

    /**
     * Decrement the ledger's reference count. If the ledger is decremented to
     * zero, this ledger should release its ownership back to the
     * AllocationManager
     */
    public int decrement(int decrement) {
      allocator.assertOpen();

      final int outcome;
      try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
        outcome = bufRefCnt.addAndGet(-decrement);
        if (outcome == 0) {
          lDestructionTime = System.nanoTime();
          listener.release();
        }
      }

      return outcome;
    }

    /**
     * Returns the ledger associated with a particular BufferAllocator. If the
     * BufferAllocator doesn't currently have a ledger associated with this
     * AllocationManager, a new one is created. This is placed on BufferLedger
     * rather than AllocationManager directly because DrillBufs don't have
     * access to AllocationManager and they are the ones responsible for
     * exposing the ability to associate multiple allocators with a particular
     * piece of underlying memory. Note that this will increment the reference
     * count of this ledger by one to ensure the ledger isn't destroyed before
     * use.
     *
     * @param allocator
     * @return The ledger associated with a particular BufferAllocator.
     */
    public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
      return associate((BaseAllocator) allocator);
    }

    /**
     * Create a new DrillBuf associated with this AllocationManager and memory.
     * Does not impact reference count. Typically used for slicing.
     *
     * @param offset
     *          The offset in bytes to start this new DrillBuf.
     * @param length
     *          The length in bytes that this DrillBuf will provide access to.
     * @return A new DrillBuf that shares references with all DrillBufs
     *         associated with this BufferLedger
     */
    public DrillBuf newDrillBuf(int offset, int length) {
      allocator.assertOpen();
      return newDrillBuf(offset, length, null);
    }

    /**
     * Create a new DrillBuf associated with this AllocationManager and memory.
     * @param offset
     *          The offset in bytes to start this new DrillBuf.
     * @param length
     *          The length in bytes that this DrillBuf will provide access to.
     * @param manager
     *          An optional BufferManager argument that can be used to manage expansion of this DrillBuf.
     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger.
     */
    public DrillBuf newDrillBuf(int offset, int length, BufferManager manager) {
      allocator.assertOpen();

      final DrillBuf buf = new DrillBuf(
          bufRefCnt,
          this,
          underlying,
          manager,
          allocator.getAsByteBufAllocator(),
          offset,
          length,
          false);

      if (BaseAllocator.DEBUG) {
        historicalLog.recordEvent(
            "DrillBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == "
                + "%d](%s)) => ledger hc == %d",
            allocator.name, System.identityHashCode(buf), buf.toString(),
            System.identityHashCode(this));

        synchronized (buffers) {
          buffers.put(buf, null);
        }
      }

      return buf;
    }

    /**
     * The total size (in bytes) of memory underlying this ledger.
     *
     * @return Size in bytes
     */
    public int getSize() {
      return size;
    }

    /**
     * Amount of memory accounted for by this ledger. This is either getSize() if this is the owning ledger for the
     * memory or zero in the case that this is not the owning ledger associated with this memory.
     *
     * @return Amount of accounted(owned) memory associated with this ledger.
     */
    public int getAccountedSize() {
      try (@SuppressWarnings("unused") Closeable read = readLock.open()) {
        if (owningLedger == this) {
          return size;
        } else {
          return 0;
        }
      }
    }

    /**
     * Package visible for debugging/verification only.
     */
    @VisibleForTesting
    protected UnsafeDirectLittleEndian getUnderlying() {
      return underlying;
    }

    /**
     * Package visible for debugging/verification only.
     */
    @VisibleForTesting
    protected boolean isOwningLedger() {
      return this == owningLedger;
    }
  }
}
