blob: ce76461f43aacc9fac197bc2577fa4b852377e66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.utils.JvmUtils;
import org.skife.config.Config;
import java.util.concurrent.atomic.AtomicReference;
public abstract class DruidProcessingConfig extends ExecutorServiceConfig implements ColumnConfig
{
private static final Logger log = new Logger(DruidProcessingConfig.class);
public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
public static final int DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = -1;
public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024;
public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
private AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();
@Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"})
public int intermediateComputeSizeBytesConfigured()
{
return DEFAULT_PROCESSING_BUFFER_SIZE_BYTES;
}
public int intermediateComputeSizeBytes()
{
int sizeBytesConfigured = intermediateComputeSizeBytesConfigured();
if (sizeBytesConfigured != DEFAULT_PROCESSING_BUFFER_SIZE_BYTES) {
return sizeBytesConfigured;
} else if (computedBufferSizeBytes.get() != null) {
return computedBufferSizeBytes.get();
}
long directSizeBytes;
try {
directSizeBytes = JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
log.info(
"Detected max direct memory size of [%,d] bytes",
directSizeBytes
);
}
catch (UnsupportedOperationException e) {
// max direct memory defaults to max heap size on recent JDK version, unless set explicitly
directSizeBytes = computeMaxMemoryFromMaxHeapSize();
log.info(
"Defaulting to at most [%,d] bytes (25%% of max heap size) of direct memory for computation buffers",
directSizeBytes
);
}
int numProcessingThreads = getNumThreads();
int numMergeBuffers = getNumMergeBuffers();
int totalNumBuffers = numMergeBuffers + numProcessingThreads;
int sizePerBuffer = (int) ((double) directSizeBytes / (double) (totalNumBuffers + 1));
final int computedSizePerBuffer = Math.min(sizePerBuffer, MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES);
if (computedBufferSizeBytes.compareAndSet(null, computedSizePerBuffer)) {
log.info(
"Auto sizing buffers to [%,d] bytes each for [%,d] processing and [%,d] merge buffers",
computedSizePerBuffer,
numProcessingThreads,
numMergeBuffers
);
}
return computedSizePerBuffer;
}
public static long computeMaxMemoryFromMaxHeapSize()
{
return Runtime.getRuntime().maxMemory() / 4;
}
@Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"})
public int poolCacheMaxCount()
{
return Integer.MAX_VALUE;
}
@Override
@Config(value = "${base_path}.numThreads")
public int getNumThreadsConfigured()
{
return DEFAULT_NUM_THREADS;
}
public int getNumMergeBuffers()
{
int numMergeBuffersConfigured = getNumMergeBuffersConfigured();
if (numMergeBuffersConfigured != DEFAULT_NUM_MERGE_BUFFERS) {
return numMergeBuffersConfigured;
} else {
return Math.max(2, getNumThreads() / 4);
}
}
/**
* Returns the number of merge buffers _explicitly_ configured, or -1 if it is not explicitly configured, that is not
* a valid number of buffers. To get the configured value or the default (valid) number, use {@link
* #getNumMergeBuffers()}. This method exists for ability to distinguish between the default value set when there is
* no explicit config, and an explicitly configured value.
*/
@Config("${base_path}.numMergeBuffers")
public int getNumMergeBuffersConfigured()
{
return DEFAULT_NUM_MERGE_BUFFERS;
}
@Override
@Config(value = "${base_path}.columnCache.sizeBytes")
public int columnCacheSizeBytes()
{
return 0;
}
@Config(value = "${base_path}.fifo")
public boolean isFifo()
{
return false;
}
@Config(value = "${base_path}.tmpDir")
public String getTmpDir()
{
return System.getProperty("java.io.tmpdir");
}
@Config(value = "${base_path}.merge.useParallelMergePool")
public boolean useParallelMergePoolConfigured()
{
return true;
}
public boolean useParallelMergePool()
{
final boolean useParallelMergePoolConfigured = useParallelMergePoolConfigured();
final int parallelism = getMergePoolParallelism();
// need at least 3 to do 2 layer merge
if (parallelism > 2) {
return useParallelMergePoolConfigured;
}
if (useParallelMergePoolConfigured) {
log.debug(
"Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s",
parallelism
);
}
return false;
}
@Config(value = "${base_path}.merge.pool.parallelism")
public int getMergePoolParallelismConfigured()
{
return DEFAULT_NUM_THREADS;
}
public int getMergePoolParallelism()
{
int poolParallelismConfigured = getMergePoolParallelismConfigured();
if (poolParallelismConfigured != DEFAULT_NUM_THREADS) {
return poolParallelismConfigured;
} else {
// assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5
return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75);
}
}
@Config(value = "${base_path}.merge.pool.awaitShutdownMillis")
public long getMergePoolAwaitShutdownMillis()
{
return DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS;
}
@Config(value = "${base_path}.merge.pool.defaultMaxQueryParallelism")
public int getMergePoolDefaultMaxQueryParallelism()
{
// assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores
return (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1);
}
@Config(value = "${base_path}.merge.task.targetRunTimeMillis")
public int getMergePoolTargetTaskRunTimeMillis()
{
return ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS;
}
@Config(value = "${base_path}.merge.task.initialYieldNumRows")
public int getMergePoolTaskInitialYieldRows()
{
return ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS;
}
@Config(value = "${base_path}.merge.task.smallBatchNumRows")
public int getMergePoolSmallBatchRows()
{
return ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS;
}
}