DRILL-7675: Work around for partitions sender memory use
Adds an ad-hoc system/session option to limit partition sender
memory use. See DRILL-7686 for the underlying issue.
Also includes code cleanup and diagnostic tools.
closes #2047
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c421c73..3a16353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -162,7 +162,6 @@
public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null);
-
// Hash Aggregate Options
public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
public static final LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128,
@@ -189,6 +188,12 @@
public static final BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY,
new OptionDescription("Hash Aggregates ignore memory limits when enabled (true). When disabled (false), Hash Aggregates fail when memory is set too low."));
+ // Partitioner options
+ public static final String PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY = "exec.partition.mem_throttle";
+ public static final LongValidator PARTITIONER_MEMORY_REDUCTION_THRESHOLD_VALIDATOR =
+ new RangeLongValidator(PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY, 0, Integer.MAX_VALUE,
+ new OptionDescription("Linearly reduces partition sender buffer row count after this number of receivers. Default is 0 (disabled). (Since Drill 1.18)"));
+
public static final String SSL_PROVIDER = "drill.exec.ssl.provider"; // valid values are "JDK", "OPENSSL" // default JDK
public static final String SSL_PROTOCOL = "drill.exec.ssl.protocol"; // valid values are SSL, SSLV2, SSLV3, TLS, TLSV1, TLSv1.1, TLSv1.2(default)
public static final String SSL_KEYSTORE_TYPE = "drill.exec.ssl.keyStoreType";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
index b4790c8..2c580da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
@@ -29,6 +29,8 @@
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.server.options.OptionSet;
import org.codehaus.commons.compiler.CompileException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements the "plain Java" method of code generation and
@@ -76,10 +78,9 @@
* The setting to prefer plain Java is ignored for any remaining generated
* classes not marked as plain Java capable.
*/
-
public class ClassBuilder {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassBuilder.class);
+ private static final Logger logger = LoggerFactory.getLogger(ClassBuilder.class);
public static final String CODE_DIR_OPTION = CodeCompiler.COMPILE_BASE + ".code_dir";
private final DrillConfig config;
@@ -95,7 +96,6 @@
// point your debugger to the directory set below, and you
// can step into the code for debugging. Code is not saved
// be default because doing so is expensive and unnecessary.
-
codeDir = new File(config.getString(CODE_DIR_OPTION));
}
@@ -109,7 +109,6 @@
* @return the class that the code generator defines
* @throws ClassTransformationException
*/
-
public Class<?> getImplementationClass(CodeGenerator<?> cg) throws ClassTransformationException {
try {
return compileClass(cg);
@@ -133,23 +132,19 @@
final long t1 = System.nanoTime();
// Get the plain Java code.
-
String code = cg.getGeneratedCode();
// Get the class names (dotted, file path, etc.)
-
String className = cg.getMaterializedClassName();
ClassTransformer.ClassNames name = new ClassTransformer.ClassNames(className);
// A key advantage of this method is that the code can be
// saved and debugged, if needed.
-
if (cg.isCodeToBeSaved()) {
saveCode(code, name);
}
// Compile the code and load it into a class loader.
-
CachedClassLoader classLoader = new CachedClassLoader();
ClassCompilerSelector compilerSelector = new ClassCompilerSelector(classLoader, config, options);
Map<String,byte[]> results = compilerSelector.compile(name, code);
@@ -165,7 +160,6 @@
(System.nanoTime() - t1 + 500_000) / 1_000_000);
// Get the class from the class loader.
-
try {
return classLoader.findClass(className);
} catch (ClassNotFoundException e) {
@@ -184,7 +178,6 @@
* @param code the source code
* @param name the class name
*/
-
private void saveCode(String code, ClassNames name) {
String pathName = name.slash + ".java";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 843d20b..00d5bae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -945,7 +945,7 @@
// everything in memory
String message = String.format(
"When using the minimum number of partitions %d we require %s memory but only have %s available. "
- + "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
+ + "Forcing legacy behavior of using unbounded memory in order to prevent regressions.",
numPartitions,
FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
FileUtils.byteCountToDisplaySize(allocator.getLimit()));
@@ -984,7 +984,7 @@
// is enabled
if (reason == null) {
boolean fallbackEnabled = context.getOptions()
- .getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+ .getBoolean(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY);
if (fallbackEnabled) {
logger.warn(
"Spilling is disabled - not enough memory available for internal partitioning. Falling back"
@@ -992,8 +992,9 @@
} else {
throw UserException.resourceError().message(String.format(
"Not enough memory for internal partitioning and fallback mechanism for "
- + "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter "
- + "session/system command or increase memory limit for Drillbit",
+ + "HashJoin to use unbounded memory is disabled.\n" +
+ "Either enable fallback option %s using ALTER "
+ + "SESSION/SYSTEM command or increase the memory limit for the Drillbit",
ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
}
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 76c60e8..4f22c5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -48,8 +48,9 @@
void initialize();
void clear();
List<? extends PartitionOutgoingBatch> getOutgoingBatches();
+
/**
- * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner
+ * Get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner
* @param index
* @return PartitionOutgoingBatch that matches index within Partitioner. This method can
* return null if index does not fall within boundary of this Partitioner
@@ -58,4 +59,4 @@
OperatorStats getStats();
TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index a3149b8..95e62e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -24,10 +24,11 @@
import javax.inject.Named;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.ExchangeFragmentContext;
@@ -50,7 +51,6 @@
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,9 +104,44 @@
this.end = end;
doSetup(context, incoming, null);
- // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory
- // allocated.
- if (popConfig.getDestinations().size() > 1000) {
+ // Consider the system/session option to allow the buffer size to shrink
+ // linearly with the increase in slice count, over some limit:
+ // exec.partition.mem_throttle:
+ // The default is 0, which leaves the current logic unchanged.
+ // If set to a positive value, then when the slice count exceeds that
+ // amount, the buffer size per sender is reduced.
+ // The reduction factor is 1 / (slice count - threshold), with a minimum
+ // batch size of 256 records.
+ //
+ // So, if we set the threshold at 2, and run 10 slices, each slice will
+ // get 1024 / 8 = 256 records.
+ //
+ // This option controls memory, but at an obvious cost of increasing overhead.
+ // One could argue that this is a good thing. As the number of senders
+ // increases, the number of records going to each sender decreases, which
+ // increases the time that batches must accumulate before they are sent.
+ //
+ // If the option is enabled, and buffer size reduction kicks in, you'll
+ // find an info-level log message which details the reduction:
+ // exec.partition.mem_throttle is set to 2: 10 receivers,
+ // reduced send buffer size from 1024 to 256 rows
+ //
+ // See DRILL-7675, DRILL-7686.
+ int destinationCount = popConfig.getDestinations().size();
+ int reductionCutoff = oContext.getFragmentContext().getOptions().getInt(
+ ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY);
+ if (reductionCutoff > 0 && destinationCount >= reductionCutoff) {
+ int reducedBatchSize = Math.max(256,
+ (DEFAULT_RECORD_BATCH_SIZE + 1) / (destinationCount - reductionCutoff));
+ outgoingRecordBatchSize = BaseAllocator.nextPowerOfTwo(reducedBatchSize) - 1;
+ logger.info("{} is set to {}: {} receivers, reduced send buffer size from {} to {} rows",
+ ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY,
+ reductionCutoff, destinationCount,
+ DEFAULT_RECORD_BATCH_SIZE, outgoingRecordBatchSize);
+ } else if (destinationCount > 1000) {
+ // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory
+ // allocated.
+
// Always keep the recordCount as (2^x) - 1 to better utilize the memory allocation in ValueVectors
outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1;
}
@@ -114,7 +149,7 @@
int fieldId = 0;
for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
// create outgoingBatches only for subset of Destination Points
- if ( fieldId >= start && fieldId < end ) {
+ if (fieldId >= start && fieldId < end) {
logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId);
outgoingBatches.add(newOutgoingRecordBatch(stats, popConfig,
context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId()));
@@ -149,7 +184,6 @@
* generated inner class. Byte-code manipulation appears to fix up the byte codes
* directly. The name is special, it must be "new" + inner class name.
*/
-
protected OutgoingRecordBatch newOutgoingRecordBatch(
OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
@@ -259,24 +293,23 @@
private final AccountingDataTunnel tunnel;
private final HashPartitionSender operator;
private final FragmentContext context;
- private final BufferAllocator allocator;
- private final VectorContainer vectorContainer = new VectorContainer();
+ private final VectorContainer vectorContainer;
private final int oppositeMinorFragmentId;
private final OperatorStats stats;
- private boolean isLast = false;
- private boolean dropAll = false;
+ private boolean isLast;
+ private boolean dropAll;
private int recordCount;
private int totalRecords;
public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
this.context = context;
- this.allocator = allocator;
this.operator = operator;
this.tunnel = tunnel;
this.stats = stats;
this.oppositeMinorFragmentId = oppositeMinorFragmentId;
+ this.vectorContainer = new VectorContainer(allocator);
}
protected void copy(int inIndex) throws IOException {
@@ -376,9 +409,7 @@
}
private void allocateOutgoingRecordBatch() {
- for (VectorWrapper<?> v : vectorContainer) {
- v.getValueVector().allocateNew();
- }
+ vectorContainer.allocate(outgoingRecordBatchSize);
}
public void updateStats(FragmentWritableBatch writableBatch) {
@@ -391,12 +422,7 @@
* Initialize the OutgoingBatch based on the current schema in incoming RecordBatch
*/
public void initializeBatch() {
- for (VectorWrapper<?> v : incoming) {
- // create new vector
- ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator);
- outgoingVector.setInitialCapacity(outgoingRecordBatchSize);
- vectorContainer.add(outgoingVector);
- }
+ vectorContainer.buildFrom(incoming.getSchema());
allocateOutgoingRecordBatch();
try {
doSetup(incoming, vectorContainer);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 61850b7..6d8865d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -123,7 +123,6 @@
* Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} method instead since
* {@link MaterializedField#isEquivalent(MaterializedField)} method is updated to remove the reference check.
*/
-
@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -145,7 +144,6 @@
// Compare names.
// (DRILL-5525: actually compares all fields.)
-
if (!fields.equals(other.fields)) {
return false;
}
@@ -153,7 +151,6 @@
// Compare types
// (DRILL-5525: this code is redundant because any differences
// will fail above.)
-
for (int i = 0; i < fields.size(); i++) {
MajorType t1 = fields.get(i).getType();
MajorType t2 = other.fields.get(i).getType();
@@ -181,7 +178,6 @@
* the {@link MaterializedField#isEquivalent(MaterializedField)} rules,
* false otherwise
*/
-
public boolean isEquivalent(BatchSchema other) {
if (this == other) {
return true;
@@ -209,17 +205,15 @@
private boolean majorTypeEqual(MajorType t1, MajorType t2) {
if (t1.equals(t2)) {
return true;
- }
- if (!t1.getMinorType().equals(t2.getMinorType())) {
+ } else if (!t1.getMinorType().equals(t2.getMinorType())) {
return false;
- }
- if (!t1.getMode().equals(t2.getMode())) {
+ } else if (!t1.getMode().equals(t2.getMode())) {
return false;
- }
- if (!Sets.newHashSet(t1.getSubTypeList()).equals(Sets.newHashSet(t2.getSubTypeList()))) {
+ } else if (!Sets.newHashSet(t1.getSubTypeList()).equals(Sets.newHashSet(t2.getSubTypeList()))) {
return false;
+ } else {
+ return true;
}
- return true;
}
/**
@@ -237,7 +231,6 @@
* @param otherSchema the schema to merge with this one
* @return the new, merged, schema
*/
-
public BatchSchema merge(BatchSchema otherSchema) {
if (selectionVectorMode != SelectionVectorMode.NONE ||
otherSchema.selectionVectorMode != SelectionVectorMode.NONE) {
@@ -249,4 +242,17 @@
mergedFields.addAll(otherSchema.fields);
return new BatchSchema(selectionVectorMode, mergedFields);
}
+
+ /**
+ * Format the schema into a multi-line format. Useful when debugging a query with
+ * a very wide schema as the usual single-line format is far too hard to read.
+ */
+ public String format() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("Batch Schema:\n");
+ for (MaterializedField field : fields) {
+ field.format(buf, 1);
+ }
+ return buf.toString();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 3796e5a..4ec0b8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -440,6 +440,14 @@
return wrappers.size();
}
+ public void allocate(int recordCount) {
+ for (VectorWrapper<?> w : wrappers) {
+ ValueVector v = w.getValueVector();
+ v.setInitialCapacity(recordCount);
+ v.allocateNew();
+ }
+ }
+
public void allocateNew() {
for (VectorWrapper<?> w : wrappers) {
w.getValueVector().allocateNew();
@@ -559,4 +567,10 @@
addOrGet(wrapper.getField());
}
}
+
+ public void buildFrom(BatchSchema sourceSchema) {
+ for (MaterializedField field : sourceSchema) {
+ add(TypeHelper.getNewVector(field, allocator));
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
index a502e07..ee786c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
@@ -69,6 +69,11 @@
}
@Override
+ public int getInt(String name) {
+ return (int) getLong(name);
+ }
+
+ @Override
public long getLong(String name) {
return getByType(name, Kind.LONG).num_val;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java
index e96d571..7e85a76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java
@@ -83,10 +83,19 @@
* @throws IllegalArgumentException if the option is undefined or
* is not of the correct data type
*/
-
boolean getBoolean(String name);
/**
+ * Return the value of a long option as an int
+ *
+ * @param name option name
+ * @return the long value
+ * @throws IllegalArgumentException if the option is undefined or
+ * is not of the correct data type
+ */
+ int getInt(String name);
+
+ /**
* Return the value of a long option.
*
* @param name option name
@@ -94,7 +103,6 @@
* @throws IllegalArgumentException if the option is undefined or
* is not of the correct data type
*/
-
long getLong(String name);
/**
@@ -105,7 +113,6 @@
* @throws IllegalArgumentException if the option is undefined or
* is not of the correct data type
*/
-
double getDouble(String name);
/**
@@ -116,6 +123,5 @@
* @throws IllegalArgumentException if the option is undefined or
* is not of the correct data type
*/
-
String getString(String name);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 5d2598c..3eb643c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -188,6 +188,7 @@
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+ new OptionDefinition(ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
index 28a98aa..f24ea09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
@@ -28,15 +28,18 @@
import com.sun.codemodel.JVar;
public class CopyUtil {
- public static void generateCopies(ClassGenerator<?> g, VectorAccessible batch, boolean hyper){
+
+ public static void generateCopies(ClassGenerator<?> g, VectorAccessible batch, boolean hyper) {
// we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
int fieldId = 0;
JExpression inIndex = JExpr.direct("inIndex");
JExpression outIndex = JExpr.direct("outIndex");
- for(VectorWrapper<?> vv : batch) {
+ for (VectorWrapper<?> vv : batch) {
String copyMethod;
- if (!Types.isFixedWidthType(vv.getField().getType()) || Types.isRepeated(vv.getField().getType()) || Types.isComplex(vv.getField().getType())) {
+ if (!Types.isFixedWidthType(vv.getField().getType()) ||
+ Types.isRepeated(vv.getField().getType()) ||
+ Types.isComplex(vv.getField().getType())) {
copyMethod = "copyFromSafe";
} else {
copyMethod = "copyFrom";
@@ -53,19 +56,13 @@
.build();
JVar outVV = g.declareVectorValueSetupAndMember("outgoing", outFieldId);
- if(hyper){
-
- g.getEvalBlock().add(
- outVV
+ if (hyper) {
+ g.getEvalBlock().add(outVV
.invoke(copyMethod)
- .arg(
- inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
.arg(outIndex)
- .arg(
- inVV.component(inIndex.shrz(JExpr.lit(16)))
- )
- );
- }else{
+ .arg(inVV.component(inIndex.shrz(JExpr.lit(16)))));
+ } else {
g.getEvalBlock().add(outVV.invoke(copyMethod).arg(inIndex).arg(outIndex).arg(inVV));
}
@@ -73,5 +70,4 @@
fieldId++;
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 962d74e..1cecbd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -55,7 +55,7 @@
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.FailureUtils;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
@@ -65,6 +65,8 @@
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Date;
@@ -93,8 +95,8 @@
*/
public class Foreman implements Runnable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
- private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
+ private static final Logger logger = LoggerFactory.getLogger(Foreman.class);
+ private static final Logger queryLogger = LoggerFactory.getLogger("query.logger");
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class);
public enum ProfileOption { SYNC, ASYNC, NONE }
@@ -108,8 +110,7 @@
private final QueryManager queryManager; // handles lower-level details of query execution
private final DrillbitContext drillbitContext;
private final UserClientConnection initiatingClient; // used to send responses
- private boolean resume = false;
- private final ProfileOption profileOption;
+ private boolean resume;
private final QueryResourceManager queryRM;
@@ -122,7 +123,7 @@
private String queryText;
private RuntimeFilterRouter runtimeFilterRouter;
- private boolean enableRuntimeFilter;
+ private final boolean enableRuntimeFilter;
/**
* Constructor. Sets up the Foreman, but does not initiate any execution.
@@ -154,11 +155,9 @@
this.queryRM = drillbitContext.getResourceManager().newQueryRM(this);
this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
- this.profileOption = setProfileOption(queryContext.getOptions());
this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
}
-
/**
* @return query id
*/
@@ -326,11 +325,15 @@
}
}
- private ProfileOption setProfileOption(OptionManager options) {
- if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) {
+ private ProfileOption getProfileOption(QueryContext queryContext) {
+ if (queryContext.isSkipProfileWrite()) {
return ProfileOption.NONE;
}
- if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) {
+ OptionSet options = queryContext.getOptions();
+ if (!options.getBoolean(ExecConstants.ENABLE_QUERY_PROFILE_OPTION)) {
+ return ProfileOption.NONE;
+ }
+ if (options.getBoolean(ExecConstants.QUERY_PROFILE_DEBUG_OPTION)) {
return ProfileOption.SYNC;
} else {
return ProfileOption.ASYNC;
@@ -792,8 +795,8 @@
// Debug option: write query profile before sending final results so that
// the client can be certain the profile exists.
- final boolean skipProfileWrite = queryContext.isSkipProfileWrite();
- if (profileOption == ProfileOption.SYNC && !skipProfileWrite) {
+ ProfileOption profileOption = getProfileOption(queryContext);
+ if (profileOption == ProfileOption.SYNC) {
queryManager.writeFinalProfile(uex);
}
@@ -823,7 +826,7 @@
// storage write; query completion occurs in parallel with profile
// persistence.
- if (profileOption == ProfileOption.ASYNC && !skipProfileWrite) {
+ if (profileOption == ProfileOption.ASYNC) {
queryManager.writeFinalProfile(uex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 0c140a4..66739de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -58,12 +58,15 @@
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments.
+ * Each Foreman holds its own QueryManager. This manages the events associated
+ * with execution of a particular query across all fragments.
*/
public class QueryManager implements AutoCloseable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(QueryManager.class);
private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
private final QueryId queryId;
@@ -240,10 +243,10 @@
public void close() throws Exception { }
/*
- * This assumes that the FragmentStatusListener implementation takes action when it hears
- * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
- * but log messages.
- */
+ * This assumes that the FragmentStatusListener implementation takes action when it hears
+ * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
+ * but log messages.
+ */
private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
/**
* An enum of possible signals that {@link SignalListener} listens to.
@@ -461,13 +464,10 @@
* there is a node failure, we can then correctly track how many outstanding messages will never arrive.
*/
private class NodeTracker {
- private final DrillbitEndpoint endpoint;
private final AtomicInteger totalFragments = new AtomicInteger(0);
private final AtomicInteger completedFragments = new AtomicInteger(0);
- public NodeTracker(final DrillbitEndpoint endpoint) {
- this.endpoint = endpoint;
- }
+ public NodeTracker(final DrillbitEndpoint endpoint) { }
/**
* Increments the number of fragment this node is running.
@@ -506,7 +506,6 @@
}
return true;
}
-
}
/**
@@ -556,7 +555,6 @@
}
};
-
public DrillbitStatusListener getDrillbitStatusListener() {
return drillbitStatusListener;
}
@@ -601,7 +599,6 @@
new ForemanException(String.format("One more more nodes lost connectivity during query. Identified nodes were [%s].",
failedNodeList)));
}
-
}
};
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 9e306e6..46ae53a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -541,6 +541,7 @@
exec.java_compiler_janino_maxsize: 262144,
exec.max_hash_table_size: 1073741824,
exec.min_hash_table_size: 65536,
+ exec.partition.mem_throttle: 0,
exec.persistent_table.umask: "002",
exec.query.progress.update: true,
exec.query_profile.debug_mode: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 80e2a26..c8b64a4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -80,7 +80,6 @@
{
// Properties here mimic those in drill-root/pom.xml, Surefire plugin
// configuration. They allow tests to run successfully in Eclipse.
-
put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false);
// The CTTAS function requires that the default temporary workspace be
@@ -88,36 +87,30 @@
// dfs.tmp. But, the test setup marks dfs.tmp as read-only. To work
// around this, tests are supposed to use dfs. So, we need to
// set the default temporary workspace to dfs.tmp.
-
put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, DFS_TMP_SCHEMA);
put(ExecConstants.HTTP_ENABLE, false);
put("drill.catastrophic_to_standard_out", true);
// Verbose errors.
-
put(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY, true);
// See Drillbit.close. The Drillbit normally waits a specified amount
// of time for ZK registration to drop. But, embedded Drillbits normally
// don't use ZK, so no need to wait.
-
put(ExecConstants.ZK_REFRESH, 0);
// This is just a test, no need to be heavy-duty on threads.
// This is the number of server and client RPC threads. The
// production default is DEFAULT_SERVER_RPC_THREADS.
-
put(ExecConstants.BIT_SERVER_RPC_THREADS, 2);
// No need for many scanners except when explicitly testing that
// behavior. Production default is DEFAULT_SCAN_THREADS
-
put(ExecConstants.SCAN_THREADPOOL_SIZE, 4);
// Define a useful root location for the ZK persistent
// storage. Profiles will go here when running in distributed
// mode.
-
put(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT,
"/tmp/drill/tests");
}
@@ -147,9 +140,9 @@
startDrillbits();
applyOptions();
} catch (Exception e) {
+
// Translate exceptions to unchecked to avoid cluttering
// tests. Failures will simply fail the test itself.
-
throw new IllegalStateException("Cluster fixture setup failed", e);
}
}
@@ -158,7 +151,7 @@
* Set the client properties to be used by client fixture.
*/
private void setClientProps() {
- clientProps = builder.clientProps;
+ clientProps = builder.clientProps;
}
public Properties getClientProps() {
@@ -166,17 +159,17 @@
}
private void configureZk() {
- // Start ZK if requested.
+ // Start ZK if requested.
String zkConnect;
if (builder.zkHelper != null) {
- // Case where the test itself started ZK and we're only using it.
+ // Case where the test itself started ZK and we're only using it.
zkHelper = builder.zkHelper;
ownsZK = false;
} else if (builder.localZkCount > 0) {
- // Case where we need a local ZK just for this test cluster.
+ // Case where we need a local ZK just for this test cluster.
zkHelper = new ZookeeperHelper();
zkHelper.startZookeeper(builder.localZkCount);
ownsZK = true;
@@ -189,7 +182,6 @@
// in config properties defined at run time. Drill does not allow
// combining locally-set properties and a config file: it is one
// or the other.
-
if (builder.configBuilder().hasResource()) {
throw new IllegalArgumentException("Cannot specify a local ZK while using an external config file.");
}
@@ -202,27 +194,27 @@
}
private void createConfig() throws Exception {
+
// Create a config
// Because of the way DrillConfig works, we can set the ZK
// connection string only if a property set is provided.
-
config = builder.configBuilder.build();
if (builder.usingZk) {
- // Distribute drillbit using ZK (in-process or external)
+ // Distribute drillbit using ZK (in-process or external)
serviceSet = null;
usesZk = true;
} else {
- // Embedded Drillbit.
+ // Embedded Drillbit.
serviceSet = RemoteServiceSet.getLocalServiceSet();
}
}
private void startDrillbits() throws Exception {
- // Start the Drillbits.
+ // Start the Drillbits.
Preconditions.checkArgument(builder.bitCount > 0);
int bitCount = builder.bitCount;
for (int i = 0; i < bitCount; i++) {
@@ -230,7 +222,6 @@
bit.run();
// Bit name and registration.
-
String name;
if (builder.bitNames != null && i < builder.bitNames.length) {
name = builder.bitNames[i];
@@ -239,7 +230,6 @@
// Name the Drillbit by default. Most tests use one Drillbit,
// so make the name simple: "drillbit." Only add a numeric suffix
// when the test creates multiple bits.
-
if (bitCount == 1) {
name = DEFAULT_BIT_NAME;
} else {
@@ -250,7 +240,6 @@
// Remember the first Drillbit, this is the default one returned from
// drillbit().
-
if (i == 0) {
defaultDrillbit = bit;
}
@@ -330,7 +319,6 @@
* @return a test client. Client will be closed when this cluster
* fixture closes, or can be closed early
*/
-
public ClientFixture client(String host, int port) {
return clientBuilder()
.property(DrillProperties.DRILLBIT_CONNECTION, String.format("%s:%d", host, port))
@@ -573,6 +561,8 @@
props.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, dirTestWatcher.getHomeDir().getAbsolutePath());
props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath());
props.setProperty(ExecConstants.UDF_DIRECTORY_FS, FileSystem.DEFAULT_FS);
+ // ALTER SESSION profiles are seldom interesting
+ props.setProperty(ExecConstants.SKIP_ALTER_SESSION_QUERY_PROFILE, Boolean.TRUE.toString());
builder.configBuilder.configProps(props);
return builder;
@@ -640,7 +630,6 @@
* @param value the value to encode
* @return the SQL-acceptable string equivalent
*/
-
public static String stringify(Object value) {
if (value == null) {
return null;
@@ -654,8 +643,8 @@
}
public static String getResource(String resource) throws IOException {
- // Unlike the Java routines, Guava does not like a leading slash.
+ // Unlike the Java routines, Guava does not like a leading slash.
final URL url = Resources.getResource(trimSlash(resource));
if (url == null) {
throw new IOException(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
index 24beb3a..9bc5312 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
@@ -214,7 +214,6 @@
usingZk = true;
// Using ZK. Turn refresh wait back on.
-
return configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
}
@@ -236,7 +235,6 @@
usingZk = true;
// Using ZK. Turn refresh wait back on.
-
configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
return this;
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index e1b5d99..40fe40e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -23,6 +23,7 @@
import java.util.LinkedHashSet;
import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -393,27 +394,9 @@
*/
public String toString(boolean includeChildren) {
final int maxLen = 10;
- final StringBuilder builder = new StringBuilder();
- builder
- .append("[`")
- .append(name)
- .append("` (")
- .append(type.getMinorType().name());
-
- if (type.hasPrecision() && (type.getPrecision() > 0 || Types.isDecimalType(type))) {
- builder.append("(");
- builder.append(type.getPrecision());
- if (type.hasScale() && type.getScale() > 0) {
- builder.append(", ");
- builder.append(type.getScale());
- }
- builder.append(")");
- }
-
- builder
- .append(":")
- .append(type.getMode().name())
- .append(")");
+ final StringBuilder builder = new StringBuilder()
+ .append("[`");
+ prefix(builder);
if (includeChildren) {
if (type.getSubTypeCount() > 0) {
@@ -436,17 +419,63 @@
.toString();
}
+ private void prefix(StringBuilder builder) {
+ builder
+ .append(name)
+ .append("` (")
+ .append(type.getMinorType().name());
+
+ if (type.hasPrecision() && (type.getPrecision() > 0 || Types.isDecimalType(type))) {
+ builder.append("(");
+ builder.append(type.getPrecision());
+ if (type.hasScale() && type.getScale() > 0) {
+ builder.append(", ");
+ builder.append(type.getScale());
+ }
+ builder.append(")");
+ }
+
+ builder
+ .append(":")
+ .append(type.getMode().name())
+ .append(")");
+ }
+
@Override
public String toString() {
return toString(true);
}
+ public String format() {
+ final StringBuilder builder = new StringBuilder();
+ format(builder, 0);
+ return builder.toString();
+ }
+
+ /**
+ * Format the field in a multi-line format, with children (but not subtypes)
+ * indented. Useful for wide rows where the single-line format is too hard
+ * to read.
+ */
+ public void format(StringBuilder builder, int level) {
+ builder.append(StringUtils.repeat(' ', level));
+ prefix(builder);
+ if (children != null && ! children.isEmpty()) {
+ builder.append(":\n");
+ for (MaterializedField child : children) {
+ child.format(builder, level + 1);
+ }
+ } else {
+ builder.append("\n");
+ }
+ }
+
/**
* Return true if two fields have identical MinorType and Mode.
*/
public boolean hasSameTypeAndMode(MaterializedField that) {
- return (getType().getMinorType() == that.getType().getMinorType())
- && (getType().getMode() == that.getType().getMode());
+ return getType().getMinorType() == that.getType().getMinorType()
+ && getType().getMode() == that.getType().getMode();
}
private String toString(Collection<?> collection, int maxLen) {