blob: 6dbd78d00a05bb8eab665a1db9b104f633fbb1ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.memory;
import com.google.common.annotations.VisibleForTesting;
import org.apache.http.annotation.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Global memory manager to track course grained memory usage across all requests.
*
*
* @since 0.1
*/
public class GlobalMemoryManager implements MemoryManager {
private static final Logger logger = LoggerFactory.getLogger(GlobalMemoryManager.class);
private final Object sync = new Object();
private final long maxMemoryBytes;
private final int maxWaitMs;
@GuardedBy("sync")
private volatile long usedMemoryBytes;
public GlobalMemoryManager(long maxBytes, int maxWaitMs) {
if (maxBytes <= 0) {
throw new IllegalStateException("Total number of available bytes (" + maxBytes + ") must be greater than zero");
}
if (maxWaitMs < 0) {
throw new IllegalStateException("Maximum wait time (" + maxWaitMs + ") must be greater than or equal to zero");
}
this.maxMemoryBytes = maxBytes;
this.maxWaitMs = maxWaitMs;
this.usedMemoryBytes = 0;
}
@Override
public long getAvailableMemory() {
synchronized(sync) {
return maxMemoryBytes - usedMemoryBytes;
}
}
@Override
public long getMaxMemory() {
return maxMemoryBytes;
}
// TODO: Work on fairness: One big memory request can cause all others to block here.
private long allocateBytes(long minBytes, long reqBytes) {
if (minBytes < 0 || reqBytes < 0) {
throw new IllegalStateException("Minimum requested bytes (" + minBytes + ") and requested bytes (" + reqBytes + ") must be greater than zero");
}
if (minBytes > maxMemoryBytes) { // No need to wait, since we'll never have this much available
throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes is larger than global pool of " + maxMemoryBytes + " bytes.");
}
long startTimeMs = System.currentTimeMillis(); // Get time outside of sync block to account for waiting for lock
long nBytes;
synchronized(sync) {
while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait if minBytes not available
waitForBytesToFree(minBytes, startTimeMs);
}
// Allocate at most reqBytes, but at least minBytes
nBytes = Math.min(reqBytes, maxMemoryBytes - usedMemoryBytes);
if (nBytes < minBytes) {
throw new IllegalStateException("Allocated bytes (" + nBytes + ") should be at least the minimum requested bytes (" + minBytes + ")");
}
usedMemoryBytes += nBytes;
}
return nBytes;
}
@VisibleForTesting
void waitForBytesToFree(long minBytes, long startTimeMs) {
try {
logger.debug("Waiting for " + (usedMemoryBytes + minBytes - maxMemoryBytes) + " bytes to be free " + startTimeMs);
long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() - startTimeMs);
if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for some memory to get freed up
throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes could not be allocated. Using memory of " + usedMemoryBytes + " bytes from global pool of " + maxMemoryBytes + " bytes after waiting for " + maxWaitMs + "ms.");
}
sync.wait(remainingWaitTimeMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted allocation of " + minBytes + " bytes", ie);
}
}
@Override
public MemoryChunk allocate(long minBytes, long reqBytes) {
long nBytes = allocateBytes(minBytes, reqBytes);
return newMemoryChunk(nBytes);
}
@Override
public MemoryChunk allocate(long nBytes) {
return allocate(nBytes,nBytes);
}
private MemoryChunk newMemoryChunk(long sizeBytes) {
return new GlobalMemoryChunk(sizeBytes);
}
private class GlobalMemoryChunk implements MemoryChunk {
private volatile long size;
//private volatile String stack;
private GlobalMemoryChunk(long size) {
if (size < 0) {
throw new IllegalStateException("Size of memory chunk must be greater than zero, but instead is " + size);
}
this.size = size;
// Useful for debugging where a piece of memory was allocated
// this.stack = ExceptionUtils.getStackTrace(new Throwable());
}
@Override
public long getSize() {
synchronized(sync) {
return size; // TODO: does this need to be synchronized?
}
}
@Override
public void resize(long nBytes) {
if (nBytes < 0) {
throw new IllegalStateException("Number of bytes to resize to must be greater than zero, but instead is " + nBytes);
}
synchronized(sync) {
long nAdditionalBytes = (nBytes - size);
if (nAdditionalBytes < 0) {
usedMemoryBytes += nAdditionalBytes;
size = nBytes;
sync.notifyAll();
} else {
allocateBytes(nAdditionalBytes, nAdditionalBytes);
size = nBytes;
//this.stack = ExceptionUtils.getStackTrace(new Throwable());
}
}
}
/**
* Check that MemoryChunk has previously been closed.
*/
@Override
protected void finalize() throws Throwable {
try {
if (size > 0) {
logger.warn("Orphaned chunk of " + size + " bytes found during finalize");
//logger.warn("Orphaned chunk of " + size + " bytes found during finalize allocated here:\n" + stack);
}
freeMemory();
} finally {
super.finalize();
}
}
private void freeMemory() {
synchronized(sync) {
usedMemoryBytes -= size;
size = 0;
sync.notifyAll();
}
}
@Override
public void close() {
freeMemory();
}
}
}