blob: d6fabb2dc3e52b22b3c72b23b2a9f45446a90e8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;
import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Implements a memory manager that keeps a global context of how many Parquet
* writers there are and manages the memory between them. For use cases with
* dynamic partitions, it is easy to end up with many writers in the same task.
* By managing the size of each allocation, we try to cut down the size of each
* allocation and keep the task from running out of memory.
*
* This class balances the allocation size of each writer by resize them averagely.
* When the sum of each writer's allocation size is less than total memory pool,
* keep them original value.
* When the sum exceeds, decrease each writer's allocation size by a ratio.
*/
public class MemoryManager {
private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
static final float DEFAULT_MEMORY_POOL_RATIO = 0.95f;
static final long DEFAULT_MIN_MEMORY_ALLOCATION = 1 * 1024 * 1024; // 1MB
private final float memoryPoolRatio;
private final long totalMemoryPool;
private final long minMemoryAllocation;
private final Map<InternalParquetRecordWriter, Long> writerList = new
HashMap<InternalParquetRecordWriter, Long>();
private final Map<String, Runnable> callBacks = new HashMap<String, Runnable>();
private double scale = 1.0;
public MemoryManager(float ratio, long minAllocation) {
checkRatio(ratio);
memoryPoolRatio = ratio;
minMemoryAllocation = minAllocation;
totalMemoryPool = Math.round((double) ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
() * ratio);
LOG.debug("Allocated total memory pool is: {}", totalMemoryPool);
}
private void checkRatio(float ratio) {
if (ratio <= 0 || ratio > 1) {
throw new IllegalArgumentException("The configured memory pool ratio " + ratio + " is " +
"not between 0 and 1.");
}
}
/**
* Add a new writer and its memory allocation to the memory manager.
* @param writer the new created writer
* @param allocation the requested buffer size
*/
synchronized void addWriter(InternalParquetRecordWriter writer, Long allocation) {
Long oldValue = writerList.get(writer);
if (oldValue == null) {
writerList.put(writer, allocation);
} else {
throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
"instance of InternalParquetRecordWriter more than once. The Manager already contains " +
"the writer: " + writer);
}
updateAllocation();
}
/**
* Remove the given writer from the memory manager.
* @param writer the writer that has been closed
*/
synchronized void removeWriter(InternalParquetRecordWriter writer) {
writerList.remove(writer);
if (!writerList.isEmpty()) {
updateAllocation();
}
}
/**
* Update the allocated size of each writer based on the current allocations and pool size.
*/
private void updateAllocation() {
long totalAllocations = 0;
for (Long allocation : writerList.values()) {
totalAllocations += allocation;
}
if (totalAllocations <= totalMemoryPool) {
scale = 1.0;
} else {
scale = (double) totalMemoryPool / totalAllocations;
LOG.warn(String.format(
"Total allocation exceeds %.2f%% (%,d bytes) of heap memory\n" +
"Scaling row group sizes to %.2f%% for %d writers",
100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
for (Runnable callBack : callBacks.values()) {
// we do not really want to start a new thread here.
callBack.run();
}
}
int maxColCount = 0;
for (InternalParquetRecordWriter w : writerList.keySet()) {
maxColCount = Math.max(w.getSchema().getColumns().size(), maxColCount);
}
for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
long newSize = (long) Math.floor(entry.getValue() * scale);
if(scale < 1.0 && minMemoryAllocation > 0 && newSize < minMemoryAllocation) {
throw new ParquetRuntimeException(String.format("New Memory allocation %d bytes" +
" is smaller than the minimum allocation size of %d bytes.",
newSize, minMemoryAllocation)){};
}
entry.getKey().setRowGroupSizeThreshold(newSize);
LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
entry.getValue(), newSize, entry.getKey()));
}
}
/**
* Get the total memory pool size that is available for writers.
* @return the number of bytes in the memory pool
*/
long getTotalMemoryPool() {
return totalMemoryPool;
}
/**
* Get the writers list
* @return the writers in this memory manager
*/
Map<InternalParquetRecordWriter, Long> getWriterList() {
return writerList;
}
/**
* Get the ratio of memory allocated for all the writers.
* @return the memory pool ratio
*/
float getMemoryPoolRatio() {
return memoryPoolRatio;
}
/**
* Register callback and deduplicate it if any.
* @param callBackName the name of callback. It should be identical.
* @param callBack the callback passed in from upper layer, such as Hive.
*/
public void registerScaleCallBack(String callBackName, Runnable callBack) {
Preconditions.checkNotNull(callBackName, "callBackName");
Preconditions.checkNotNull(callBack, "callBack");
if (callBacks.containsKey(callBackName)) {
throw new IllegalArgumentException("The callBackName " + callBackName +
" is duplicated and has been registered already.");
} else {
callBacks.put(callBackName, callBack);
}
}
/**
* Get the registered callbacks.
* @return
*/
Map<String, Runnable> getScaleCallBacks() {
return Collections.unmodifiableMap(callBacks);
}
/**
* Get the internal scale value of MemoryManger
* @return
*/
double getScale() {
return scale;
}
}