| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.memory; |
| |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.annotation.concurrent.ThreadSafe; |
| |
| import org.apache.drill.exec.exception.OutOfMemoryException; |
| import org.apache.drill.exec.util.AssertionUtil; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All |
| * operations are threadsafe (except for close). |
| */ |
| @ThreadSafe |
| @VisibleForTesting |
| public class Accountant implements AutoCloseable { |
| private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class); |
| |
| // See DRILL-5808 |
| // Allow a "grace margin" above the allocator limit for those operators |
| // that are trying to stay within the limit, but are hindered by the |
| // current power-of-two allocations and random vector doublings. Instead |
| // of failing the query, the accountant allows an excess allocation within |
| // the grace, but issues a warning to the log. |
| // |
| // Normally, the "grace margin" (lenient) allocation is allowed only for |
| // operators that request leniency (because they attempt to manage memory) |
| // and only in production mode. Leniency is usually denied in a debug |
| // run (assertions enabled.) Use the following system option to force |
| // allowing lenient allocation in a debug run (or, to disable lenient |
| // allocation in a production run.) |
| // |
| // Force allowing lenient allocation: |
| // |
| // -Ddrill.memory.lenient.allocator=false |
| // |
| // Force strict allocation; |
| // |
| // -Ddrill.memory.lenient.allocator=false |
| |
| public static final String ALLOW_LENIENT_ALLOCATION = "drill.memory.lenient.allocator"; |
| |
| // Allow clients to allocate beyond the limit by this factor. |
| // If the limit is 10K, then with a margin of 1, the system |
| // will allow a grace of another 10K for a hard limit of 20K. |
| // Warnings are issued for allocations that use the grace. |
| |
| public static final int GRACE_MARGIN = 1; |
| |
| // In large-memory allocators, don't allow more than 100 MB |
| // grace. |
| |
| public static final int MAX_GRACE = 100 * 1024 * 1024; |
| |
| // Use the strictness set in a system property. If not set, then |
| // default to strict in debug mode, lenient in production mode. |
| |
| public static final boolean ALLOW_LENIENCY = |
| System.getProperty(ALLOW_LENIENT_ALLOCATION) == null |
| ? ! AssertionUtil.isAssertionsEnabled() |
| : Boolean.parseBoolean(System.getProperty(ALLOW_LENIENT_ALLOCATION)); |
| |
| // Whether leniency has been requested, and granted for this allocator. |
| |
| private boolean lenient = false; |
| |
| /** |
| * The parent allocator |
| */ |
| protected final Accountant parent; |
| |
| /** |
| * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the |
| * parent Accountant until this Accountant is closed. |
| */ |
| protected final long reservation; |
| |
| private final AtomicLong peakAllocation = new AtomicLong(); |
| |
| /** |
| * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to |
| * change but will change responses to future allocation efforts |
| */ |
| private final AtomicLong allocationLimit = new AtomicLong(); |
| |
| /** |
| * Currently allocated amount of memory; |
| */ |
| private final AtomicLong locallyHeldMemory = new AtomicLong(); |
| |
| public Accountant(Accountant parent, long reservation, long maxAllocation) { |
| Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative."); |
| Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative."); |
| Preconditions.checkArgument(reservation <= maxAllocation, |
| "The initial reservation size must be <= the maximum allocation."); |
| Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory."); |
| |
| this.parent = parent; |
| this.reservation = reservation; |
| this.allocationLimit.set(maxAllocation); |
| |
| if (reservation != 0) { |
| // we will allocate a reservation from our parent. |
| final AllocationOutcome outcome = parent.allocateBytes(reservation); |
| if (!outcome.isOk()) { |
| throw new OutOfMemoryException(String.format( |
| "Failure trying to allocate initial reservation for Allocator. " |
| + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name())); |
| } |
| } |
| } |
| |
| /** |
| * Request lenient allocations: allows exceeding the allocation limit |
| * by the configured grace amount. The request is granted only if strict |
| * limits are not required. |
| * |
| * @return true if the leniency was granted, false if the current |
| * execution mode, or system property, disallows leniency |
| */ |
| |
| public boolean setLenient() { |
| lenient = ALLOW_LENIENCY; |
| return lenient; |
| } |
| |
| /** |
| * Force lenient allocation. Used for testing to avoid the need to muck |
| * with global settings (assertions or system properties.) <b>Do not</b> |
| * use in production code! |
| */ |
| |
| @VisibleForTesting |
| public void forceLenient() { |
| lenient = true; |
| } |
| |
| /** |
| * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a |
| * log of delta |
| * |
| * If it fails, no changes are made to accounting. |
| * |
| * @param size |
| * The amount of memory to reserve in bytes. |
| * @return True if the allocation was successful, false if the allocation failed. |
| */ |
| AllocationOutcome allocateBytes(long size) { |
| final AllocationOutcome outcome = allocate(size, true, false); |
| if (!outcome.isOk()) { |
| releaseBytes(size); |
| } |
| return outcome; |
| } |
| |
| private void updatePeak() { |
| final long currentMemory = locallyHeldMemory.get(); |
| while (true) { |
| |
| final long previousPeak = peakAllocation.get(); |
| if (currentMemory > previousPeak) { |
| if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) { |
| // peak allocation changed underneath us. try again. |
| continue; |
| } |
| } |
| |
| // we either succeeded to set peak allocation or we weren't above the previous peak, exit. |
| return; |
| } |
| } |
| |
| |
| /** |
| * Increase the accounting. Returns whether the allocation fit within limits. |
| * |
| * @param size |
| * to increase |
| * @return Whether the allocation fit within limits. |
| */ |
| boolean forceAllocate(long size) { |
| final AllocationOutcome outcome = allocate(size, true, true); |
| return outcome.isOk(); |
| } |
| |
| /** |
| * Internal method for allocation. This takes a forced approach to allocation to ensure that we manage reservation |
| * boundary issues consistently. Allocation is always done through the entire tree. The two options that we influence |
| * are whether the allocation should be forced and whether or not the peak memory allocation should be updated. If at |
| * some point during allocation escalation we determine that the allocation is no longer possible, we will continue to |
| * do a complete and consistent allocation but we will stop updating the peak allocation. We do this because we know |
| * that we will be directly unwinding this allocation (and thus never actually making the allocation). If force |
| * allocation is passed, then we continue to update the peak limits since we now know that this allocation will occur |
| * despite our moving past one or more limits. |
| * |
| * @param size |
| * The size of the allocation. |
| * @param incomingUpdatePeak |
| * Whether we should update the local peak for this allocation. |
| * @param forceAllocation |
| * Whether we should force the allocation. |
| * @return The outcome of the allocation. |
| */ |
| private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final boolean forceAllocation) { |
| final long newLocal = locallyHeldMemory.addAndGet(size); |
| final long beyondReservation = newLocal - reservation; |
| boolean beyondLimit = newLocal > allocationLimit.get(); |
| |
| // Non-strict allocation |
| |
| if (beyondLimit && lenient) { |
| long grace = Math.min(MAX_GRACE, allocationLimit.get() * GRACE_MARGIN); |
| long graceLimit = allocationLimit.get() + grace; |
| beyondLimit = newLocal > graceLimit; |
| if (! beyondLimit) { |
| logger.warn("Excess allocation of {} bytes beyond limit of {}, " + |
| "within grace of {}, new total allocation: {}", |
| size, allocationLimit.get(), grace, newLocal); |
| } |
| } |
| final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit); |
| |
| AllocationOutcome parentOutcome = AllocationOutcome.SUCCESS; |
| if (beyondReservation > 0 && parent != null) { |
| // we need to get memory from our parent. |
| final long parentRequest = Math.min(beyondReservation, size); |
| parentOutcome = parent.allocate(parentRequest, updatePeak, forceAllocation); |
| } |
| |
| final AllocationOutcome finalOutcome = beyondLimit ? AllocationOutcome.FAILED_LOCAL : |
| parentOutcome.ok ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT; |
| |
| if (updatePeak) { |
| updatePeak(); |
| } |
| |
| return finalOutcome; |
| } |
| |
| public void releaseBytes(long size) { |
| // reduce local memory. all memory released above reservation should be released up the tree. |
| final long newSize = locallyHeldMemory.addAndGet(-size); |
| |
| Preconditions.checkArgument(newSize >= 0, "Accounted size went negative."); |
| |
| final long originalSize = newSize + size; |
| if(originalSize > reservation && parent != null){ |
| // we deallocated memory that we should release to our parent. |
| final long possibleAmountToReleaseToParent = originalSize - reservation; |
| final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent); |
| parent.releaseBytes(actualToReleaseToParent); |
| } |
| |
| } |
| |
| /** |
| * Set the maximum amount of memory that can be allocated in the this Accountant before failing an allocation. |
| * |
| * @param newLimit |
| * The limit in bytes. |
| */ |
| public void setLimit(long newLimit) { |
| allocationLimit.set(newLimit); |
| } |
| |
| public boolean isOverLimit() { |
| return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit()); |
| } |
| |
| /** |
| * Close this Accountant. This will release any reservation bytes back to a parent Accountant. |
| */ |
| @Override |
| public void close() { |
| // return memory reservation to parent allocator. |
| if (parent != null) { |
| parent.releaseBytes(reservation); |
| } |
| } |
| |
| /** |
| * Return the current limit of this Accountant. |
| * |
| * @return Limit in bytes. |
| */ |
| public long getLimit() { |
| return allocationLimit.get(); |
| } |
| |
| /** |
| * Return the current amount of allocated memory that this Accountant is managing accounting for. Note this does not |
| * include reservation memory that hasn't been allocated. |
| * |
| * @return Currently allocate memory in bytes. |
| */ |
| public long getAllocatedMemory() { |
| return locallyHeldMemory.get(); |
| } |
| |
| /** |
| * The peak memory allocated by this Accountant. |
| * |
| * @return The peak allocated memory in bytes. |
| */ |
| public long getPeakMemoryAllocation() { |
| return peakAllocation.get(); |
| } |
| |
| /** |
| * Describes the type of outcome that occurred when trying to account for allocation of memory. |
| */ |
| public static enum AllocationOutcome { |
| |
| /** |
| * Allocation succeeded. |
| */ |
| SUCCESS(true), |
| |
| /** |
| * Allocation succeeded but only because the allocator was forced to move beyond a limit. |
| */ |
| FORCED_SUCESS(true), |
| |
| /** |
| * Allocation failed because the local allocator's limits were exceeded. |
| */ |
| FAILED_LOCAL(false), |
| |
| /** |
| * Allocation failed because a parent allocator's limits were exceeded. |
| */ |
| FAILED_PARENT(false); |
| |
| private final boolean ok; |
| |
| AllocationOutcome(boolean ok) { |
| this.ok = ok; |
| } |
| |
| public boolean isOk() { |
| return ok; |
| } |
| } |
| } |