blob: 55d973ed35804c066cef8aed201a9cf43828d154 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.memory;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;
/**
* Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
* A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
*/
public class HeapMemoryAllocator implements MemoryAllocator {
@GuardedBy("this") private final Map<Long, LinkedList<WeakReference<long[]>>>
bufferPoolsBySize = new HashMap<>();
private int poolingThresholdBytes;
private boolean shouldPooling = true;
public HeapMemoryAllocator() {
poolingThresholdBytes = CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes();
boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE,
CarbonCommonConstants.IS_DRIVER_INSTANCE_DEFAULT));
// if set 'poolingThresholdBytes' to -1 or the object creation call is in driver,
// it should not go through the pooling mechanism.
if (poolingThresholdBytes == -1 || isDriver) {
shouldPooling = false;
}
}
/**
* Returns true if allocations of the given size should go through the pooling mechanism and
* false otherwise.
*/
private boolean shouldPool(long size) {
// Very small allocations are less likely to benefit from pooling.
return shouldPooling && (size >= poolingThresholdBytes);
}
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
MemoryBlock memory =
new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size, MemoryType.ONHEAP);
// reuse this MemoryBlock
memory.setFreedStatus(false);
return memory;
}
}
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[numWords];
return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size, MemoryType.ONHEAP);
}
@Override
public void free(MemoryBlock memory) {
final long size = memory.size();
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to null out its reference to the long[] array.
long[] array = (long[]) memory.obj;
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
}
memory.setFreedStatus(true);
}
}