| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.memory; |
| |
| import static org.apache.drill.exec.memory.BaseAllocator.indent; |
| import io.netty.buffer.DrillBuf; |
| import io.netty.buffer.PooledByteBufAllocatorL; |
| import io.netty.buffer.UnsafeDirectLittleEndian; |
| |
| import java.util.IdentityHashMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.drill.common.AutoCloseables.Closeable; |
| import org.apache.drill.common.HistoricalLog; |
| import org.apache.drill.common.concurrent.AutoCloseableLock; |
| import org.apache.drill.exec.memory.BaseAllocator.Verbosity; |
| import org.apache.drill.exec.metrics.DrillMetrics; |
| import org.apache.drill.exec.ops.BufferManager; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Manages the relationship between one or more allocators and a particular |
| * UDLE. Ensures that one allocator owns the memory that multiple allocators may |
| * be referencing. Manages a BufferLedger between each of its associated |
| * allocators. This class is also responsible for managing when memory is |
| * allocated and returned to the Netty-based {code PooledByteBufAllocatorL}. |
| * <p> |
| * The only reason that this isn't package private is we're forced to put |
| * DrillBuf in Netty's package which need access to these objects or methods. |
| * <p> |
| * Threading: AllocationManager manages thread-safety internally. Operations |
| * within the context of a single BufferLedger are lockless in nature and can be |
| * leveraged by multiple threads. Operations that cross the context of two |
| * ledgers will acquire a lock on the AllocationManager instance. Important |
| * note, there is one AllocationManager per UnsafeDirectLittleEndian buffer |
| * allocation. As such, there will be thousands of these in a typical query. The |
| * contention of acquiring a lock on AllocationManager should be very low. |
| */ |
| public class AllocationManager { |
| |
| private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); |
| private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0); |
| static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getRegistry()); |
| |
| private final RootAllocator root; |
| private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); |
| private final int size; |
| private final UnsafeDirectLittleEndian underlying; |
| private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>(); |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
| private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock()); |
| private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock()); |
| private final long amCreationTime = System.nanoTime(); |
| |
| private volatile BufferLedger owningLedger; |
| private volatile long amDestructionTime = 0; |
| |
| AllocationManager(BaseAllocator accountingAllocator, int size) { |
| Preconditions.checkNotNull(accountingAllocator); |
| accountingAllocator.assertOpen(); |
| |
| this.root = accountingAllocator.root; |
| this.underlying = INNER_ALLOCATOR.allocate(size); |
| |
| // we do a no retain association since our creator will want to retrieve the newly created ledger and will create a |
| // reference count at that point |
| this.owningLedger = associate(accountingAllocator, false); |
| this.size = underlying.capacity(); |
| } |
| |
| /** |
| * Associate the existing underlying buffer with a new allocator. This will |
| * increase the reference count to the provided ledger by 1. |
| * |
| * @param allocator |
| * The target allocator to associate this buffer with. |
| * @return The Ledger (new or existing) that associates the underlying buffer |
| * to this new ledger. |
| */ |
| BufferLedger associate(final BaseAllocator allocator) { |
| return associate(allocator, true); |
| } |
| |
| private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { |
| allocator.assertOpen(); |
| |
| if (root != allocator.root) { |
| throw new IllegalStateException( |
| "A buffer can only be associated between two allocators that share the same root."); |
| } |
| |
| try (@SuppressWarnings("unused") Closeable read = readLock.open()) { |
| |
| final BufferLedger ledger = map.get(allocator); |
| if (ledger != null) { |
| if (retain) { |
| ledger.inc(); |
| } |
| return ledger; |
| } |
| } |
| try (@SuppressWarnings("unused") Closeable write = writeLock.open()) { |
| // we have to recheck existing ledger since a second reader => writer could be competing with us. |
| |
| final BufferLedger existingLedger = map.get(allocator); |
| if (existingLedger != null) { |
| if (retain) { |
| existingLedger.inc(); |
| } |
| return existingLedger; |
| } |
| |
| final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator)); |
| if (retain) { |
| ledger.inc(); |
| } |
| BufferLedger oldLedger = map.put(allocator, ledger); |
| Preconditions.checkArgument(oldLedger == null); |
| allocator.associateLedger(ledger); |
| return ledger; |
| } |
| } |
| |
| public static int chunkSize() { |
| return INNER_ALLOCATOR.getChunkSize(); |
| } |
| |
| /** |
| * The way that a particular BufferLedger communicates back to the |
| * AllocationManager that it now longer needs to hold a reference to |
| * particular piece of memory. |
| */ |
| private class ReleaseListener { |
| |
| private final BufferAllocator allocator; |
| |
| public ReleaseListener(BufferAllocator allocator) { |
| this.allocator = allocator; |
| } |
| |
| /** |
| * Can only be called when you already hold the writeLock. |
| */ |
| public void release() { |
| allocator.assertOpen(); |
| |
| final BufferLedger oldLedger = map.remove(allocator); |
| oldLedger.allocator.dissociateLedger(oldLedger); |
| |
| if (oldLedger == owningLedger) { |
| if (map.isEmpty()) { |
| // no one else owns, lets release. |
| oldLedger.allocator.releaseBytes(size); |
| underlying.release(); |
| amDestructionTime = System.nanoTime(); |
| owningLedger = null; |
| } else { |
| // we need to change the owning allocator. we've been removed so we'll get whatever is top of list |
| BufferLedger newLedger = map.values().iterator().next(); |
| |
| // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit |
| // since this consumer can't do anything with this. |
| oldLedger.transferBalance(newLedger); |
| } |
| } else { |
| if (map.isEmpty()) { |
| throw new IllegalStateException("The final removal of a ledger should be connected to the owning ledger."); |
| } |
| } |
| } |
| } |
| |
| /** |
| * The reference manager that binds an allocator manager to a particular |
| * BaseAllocator. Also responsible for creating a set of DrillBufs that share |
| * a common fate and set of reference counts. As with AllocationManager, the |
| * only reason this is public is due to DrillBuf being in io.netty.buffer |
| * package. |
| */ |
| public class BufferLedger { |
| |
| private final IdentityHashMap<DrillBuf, Object> buffers = |
| BaseAllocator.DEBUG ? new IdentityHashMap<>() : null; |
| |
| private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger |
| private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain |
| // correctly |
| private final long lCreationTime = System.nanoTime(); |
| private volatile long lDestructionTime = 0; |
| private final BaseAllocator allocator; |
| private final ReleaseListener listener; |
| private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, |
| "BufferLedger[%d]", 1) |
| : null; |
| |
| private BufferLedger(BaseAllocator allocator, ReleaseListener listener) { |
| this.allocator = allocator; |
| this.listener = listener; |
| } |
| |
| /** |
| * Transfer any balance the current ledger has to the target ledger. In the |
| * case that the current ledger holds no memory, no transfer is made to the |
| * new ledger. |
| * |
| * @param target |
| * The ledger to transfer ownership account to. |
| * @return Whether transfer fit within target ledgers limits. |
| */ |
| public boolean transferBalance(final BufferLedger target) { |
| Preconditions.checkNotNull(target); |
| Preconditions.checkArgument(allocator.root == target.allocator.root, |
| "You can only transfer between two allocators that share the same root."); |
| allocator.assertOpen(); |
| |
| target.allocator.assertOpen(); |
| // if we're transferring to ourself, just return. |
| if (target == this) { |
| return true; |
| } |
| |
| // since two balance transfers out from the allocator manager could cause incorrect accounting, |
| // we need to ensure that this won't happen by synchronizing on the allocator manager instance. |
| try (@SuppressWarnings("unused") Closeable write = writeLock.open()) { |
| if (owningLedger != this) { |
| return true; |
| } |
| |
| if (BaseAllocator.DEBUG) { |
| this.historicalLog.recordEvent("transferBalance(%s)", target.allocator.name); |
| target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name); |
| } |
| |
| boolean overlimit = target.allocator.forceAllocate(size); |
| allocator.releaseBytes(size); |
| owningLedger = target; |
| return overlimit; |
| } |
| } |
| |
| /** |
| * Print the current ledger state to a the provided StringBuilder. |
| * @param sb |
| * The StringBuilder to populate. |
| * @param indent |
| * The level of indentation to position the data. |
| * @param verbosity |
| * The level of verbosity to print. |
| */ |
| public void print(StringBuilder sb, int indent, Verbosity verbosity) { |
| indent(sb, indent) |
| .append("ledger[") |
| .append(ledgerId) |
| .append("] allocator: ") |
| .append(allocator.name) |
| .append("), isOwning: ") |
| .append(owningLedger == this) |
| .append(", size: ") |
| .append(size) |
| .append(", references: ") |
| .append(bufRefCnt.get()) |
| .append(", life: ") |
| .append(lCreationTime) |
| .append("..") |
| .append(lDestructionTime) |
| .append(", allocatorManager: [") |
| .append(AllocationManager.this.allocatorManagerId) |
| .append(", life: ") |
| .append(amCreationTime) |
| .append("..") |
| .append(amDestructionTime); |
| |
| if (!BaseAllocator.DEBUG) { |
| sb.append("]\n"); |
| } else { |
| synchronized (buffers) { |
| sb.append("] holds ") |
| .append(buffers.size()) |
| .append(" buffers. \n"); |
| for (DrillBuf buf : buffers.keySet()) { |
| buf.print(sb, indent + 2, verbosity); |
| sb.append('\n'); |
| } |
| } |
| } |
| } |
| |
| private void inc() { |
| bufRefCnt.incrementAndGet(); |
| } |
| |
| /** |
| * Decrement the ledger's reference count. If the ledger is decremented to |
| * zero, this ledger should release its ownership back to the |
| * AllocationManager |
| */ |
| public int decrement(int decrement) { |
| allocator.assertOpen(); |
| |
| final int outcome; |
| try (@SuppressWarnings("unused") Closeable write = writeLock.open()) { |
| outcome = bufRefCnt.addAndGet(-decrement); |
| if (outcome == 0) { |
| lDestructionTime = System.nanoTime(); |
| listener.release(); |
| } |
| } |
| |
| return outcome; |
| } |
| |
| /** |
| * Returns the ledger associated with a particular BufferAllocator. If the |
| * BufferAllocator doesn't currently have a ledger associated with this |
| * AllocationManager, a new one is created. This is placed on BufferLedger |
| * rather than AllocationManager directly because DrillBufs don't have |
| * access to AllocationManager and they are the ones responsible for |
| * exposing the ability to associate multiple allocators with a particular |
| * piece of underlying memory. Note that this will increment the reference |
| * count of this ledger by one to ensure the ledger isn't destroyed before |
| * use. |
| * |
| * @param allocator |
| * @return The ledger associated with a particular BufferAllocator. |
| */ |
| public BufferLedger getLedgerForAllocator(BufferAllocator allocator) { |
| return associate((BaseAllocator) allocator); |
| } |
| |
| /** |
| * Create a new DrillBuf associated with this AllocationManager and memory. |
| * Does not impact reference count. Typically used for slicing. |
| * |
| * @param offset |
| * The offset in bytes to start this new DrillBuf. |
| * @param length |
| * The length in bytes that this DrillBuf will provide access to. |
| * @return A new DrillBuf that shares references with all DrillBufs |
| * associated with this BufferLedger |
| */ |
| public DrillBuf newDrillBuf(int offset, int length) { |
| allocator.assertOpen(); |
| return newDrillBuf(offset, length, null); |
| } |
| |
| /** |
| * Create a new DrillBuf associated with this AllocationManager and memory. |
| * @param offset |
| * The offset in bytes to start this new DrillBuf. |
| * @param length |
| * The length in bytes that this DrillBuf will provide access to. |
| * @param manager |
| * An optional BufferManager argument that can be used to manage expansion of this DrillBuf. |
| * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger. |
| */ |
| public DrillBuf newDrillBuf(int offset, int length, BufferManager manager) { |
| allocator.assertOpen(); |
| |
| final DrillBuf buf = new DrillBuf( |
| bufRefCnt, |
| this, |
| underlying, |
| manager, |
| allocator.getAsByteBufAllocator(), |
| offset, |
| length, |
| false); |
| |
| if (BaseAllocator.DEBUG) { |
| historicalLog.recordEvent( |
| "DrillBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == " |
| + "%d](%s)) => ledger hc == %d", |
| allocator.name, System.identityHashCode(buf), buf.toString(), |
| System.identityHashCode(this)); |
| |
| synchronized (buffers) { |
| buffers.put(buf, null); |
| } |
| } |
| |
| return buf; |
| } |
| |
| /** |
| * The total size (in bytes) of memory underlying this ledger. |
| * |
| * @return Size in bytes |
| */ |
| public int getSize() { |
| return size; |
| } |
| |
| /** |
| * Amount of memory accounted for by this ledger. This is either getSize() if this is the owning ledger for the |
| * memory or zero in the case that this is not the owning ledger associated with this memory. |
| * |
| * @return Amount of accounted(owned) memory associated with this ledger. |
| */ |
| public int getAccountedSize() { |
| try (@SuppressWarnings("unused") Closeable read = readLock.open()) { |
| if (owningLedger == this) { |
| return size; |
| } else { |
| return 0; |
| } |
| } |
| } |
| |
| /** |
| * Package visible for debugging/verification only. |
| */ |
| @VisibleForTesting |
| protected UnsafeDirectLittleEndian getUnderlying() { |
| return underlying; |
| } |
| |
| /** |
| * Package visible for debugging/verification only. |
| */ |
| @VisibleForTesting |
| protected boolean isOwningLedger() { |
| return this == owningLedger; |
| } |
| } |
| } |