DRILL-7324: Final set of "batch count" fixes
Final set of fixes for batch count/record count issues. Enables
vector checking for all operators.
closes #1912
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 3e658cb..f464b27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -575,7 +575,6 @@
}
}
-
@Override
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 5ae6e76..baef314 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -559,12 +559,8 @@
// Transfers count number of records from hyperBatch to simple container
final int copiedRecords = copier.copyRecords(0, count);
assert copiedRecords == count;
- for (VectorWrapper<?> v : newContainer) {
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(count);
- }
newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- newContainer.setRecordCount(count);
+ newContainer.setValueCount(count);
// Store all the batches containing limit number of records
batchBuilder.add(newBatch);
} while (queueSv4.next());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 45c670b..38fb14e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -269,7 +269,7 @@
for (VectorWrapper<?> w : container) {
AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0);
}
- container.setValueCount(0);
+ container.setEmpty();
if (incoming.getRecordCount() > 0) {
hashAggMemoryManager.update();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c3b504a..586fa32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -186,9 +186,7 @@
if (!createAggregator()) {
state = BatchState.DONE;
}
- for (VectorWrapper<?> w : container) {
- w.getValueVector().allocateNew();
- }
+ container.allocateNew();
if (complexWriters != null) {
container.buildSchema(SelectionVectorMode.NONE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index c189367..e3b6070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -66,12 +66,7 @@
if (recordCount == 0) {
outgoingSelectionVector.setRecordCount(0);
outgoingSelectionVector.setBatchActualRecordCount(0);
-
- // Must allocate vectors, then set count to zero. Allocation
- // is needed since offset vectors must contain at least one
- // item (the required value of 0 in index location 0.)
- outgoing.getContainer().allocateNew();
- outgoing.getContainer().setValueCount(0);
+ outgoing.getContainer().setEmpty();
return;
}
if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index eab38ec..cded844 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -77,6 +77,7 @@
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
@@ -703,9 +704,7 @@
private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
batch.kill(true);
while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
- for (VectorWrapper<?> wrapper : batch) {
- wrapper.getValueVector().clear();
- }
+ VectorAccessibleUtilities.clear(batch);
upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 6b7edd2..c84f954 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -87,10 +87,10 @@
private int outputRecords;
// We accumulate all the batches on the right side in a hyper container.
- private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer();
+ private final ExpandableHyperContainer rightContainer = new ExpandableHyperContainer();
// Record count of the individual batches in the right hyper container
- private LinkedList<Integer> rightCounts = new LinkedList<>();
+ private final LinkedList<Integer> rightCounts = new LinkedList<>();
// Generator mapping for the right side
@@ -372,9 +372,7 @@
if (leftUpstream != IterOutcome.NONE) {
leftSchema = left.getSchema();
- for (final VectorWrapper<?> vw : left) {
- container.addOrGet(vw.getField());
- }
+ container.copySchemaFrom(left);
}
if (rightUpstream != IterOutcome.NONE) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 9ccae49..ab82769 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -17,6 +17,17 @@
*/
package org.apache.drill.exec.physical.impl.metadata;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
@@ -24,6 +35,7 @@
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils;
import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MetadataControllerPOP;
@@ -32,7 +44,6 @@
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.WriterPrel;
-import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
@@ -80,17 +91,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
/**
* Terminal operator for producing ANALYZE statement. This operator is responsible for converting
* obtained metadata, fetching absent metadata from the Metastore and storing resulting metadata into the Metastore.
@@ -109,9 +109,9 @@
private boolean firstLeft = true;
private boolean firstRight = true;
- private boolean finished = false;
- private boolean finishedRight = false;
- private int recordCount = 0;
+ private boolean finished;
+ private boolean finishedRight;
+ private int recordCount;
protected MetadataControllerBatch(MetadataControllerPOP popConfig,
FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
@@ -129,13 +129,10 @@
protected boolean setupNewSchema() {
container.clear();
-
container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, Types.required(TypeProtos.MinorType.BIT), null);
container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, Types.required(TypeProtos.MinorType.VARCHAR), null);
-
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
container.setEmpty();
-
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 11d307b..7a61489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -184,5 +184,4 @@
logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, recordCount={}, partitionIdVector={}]",
container, numPartitions, recordCount, partitionIdVector);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 15962ad..921c92b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
@@ -46,11 +47,12 @@
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.metastore.statistics.Statistic;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- *
* Example input and output:
- * Schema of incoming batch:
+ * Schema of incoming batch:<pre>
* "columns" : MAP - Column names
* "region_id" : VARCHAR
* "sales_city" : VARCHAR
@@ -65,7 +67,7 @@
* "sales_city" : BIGINT - nonnullstatcount(sales_city)
* "cnt" : BIGINT - nonnullstatcount(cnt)
* .... another map for next stats function ....
- * Schema of outgoing batch:
+ * </pre>Schema of outgoing batch:<pre>
* "schema" : BIGINT - Schema number. For each schema change this number is incremented.
* "computed" : DATE - What time is it computed?
* "columns" : MAP - Column names
@@ -82,17 +84,19 @@
* "sales_city" : BIGINT - nonnullstatcount(sales_city)
* "cnt" : BIGINT - nonnullstatcount(cnt)
* .... another map for next stats function ....
+ * </pre>
*/
+
public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMerge> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatisticsMergeBatch.class);
- private Map<String, String> functions;
+ private static final Logger logger = LoggerFactory.getLogger(StatisticsMergeBatch.class);
+
+ private final Map<String, String> functions;
private boolean first = true;
- private boolean finished = false;
- private int schema = 0;
- private int recordCount = 0;
- private List<String> columnsList = null;
+ private boolean finished;
+ private int schema;
+ private List<String> columnsList;
private double samplePercent = 100.0;
- private List<MergedStatistic> mergedStatisticList = null;
+ private final List<MergedStatistic> mergedStatisticList;
public StatisticsMergeBatch(StatisticsMerge popConfig, RecordBatch incoming,
FragmentContext context) throws OutOfMemoryException {
@@ -115,20 +119,6 @@
}
/*
- * Adds the `name` column value vector in the `parent` map vector. These `name` columns are
- * table columns for which statistics will be computed.
- */
- private ValueVector addMapVector(String name, MapVector parent, LogicalExpression expr)
- throws SchemaChangeException {
- LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, incoming, context);
- Class<? extends ValueVector> vvc =
- TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(),
- mle.getMajorType().getMode());
- ValueVector vector = parent.addOrGet(name, mle.getMajorType(), vvc);
- return vector;
- }
-
- /*
* Identify the list of fields within a map which are generated by StatisticsMerge. Perform
* basic sanity check i.e. all maps have the same number of columns and those columns are
* the same in each map
@@ -229,8 +219,7 @@
}
}
}
- container.setRecordCount(0);
- recordCount = 0;
+ container.setEmpty();
container.buildSchema(incoming.getSchema().getSelectionVectorMode());
}
@@ -238,7 +227,7 @@
* Determines the MajorType based on the incoming value vector. Please look at the
* comments above the class definition which describes the incoming/outgoing batch schema
*/
- private void addVectorToOutgoingContainer(String outStatName, VectorWrapper vw)
+ private void addVectorToOutgoingContainer(String outStatName, VectorWrapper<?> vw)
throws SchemaChangeException {
// Input map vector
MapVector inputVector = (MapVector) vw.getValueVector();
@@ -306,9 +295,8 @@
}
}
}
- ++recordCount;
// Populate the number of records (1) inside the outgoing batch.
- container.setRecordCount(1);
+ container.setValueCount(1);
return IterOutcome.OK;
}
@@ -343,9 +331,7 @@
}
@Override
- public void dump() {
-
- }
+ public void dump() { }
@Override
public IterOutcome innerNext() {
@@ -404,6 +390,6 @@
@Override
public int getRecordCount() {
- return recordCount;
+ return container.getRecordCount();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4471248..a9584bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -25,13 +25,16 @@
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(RemovingRecordBatch.class);
private Copier copier;
- public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context,
+ RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, incoming);
logger.debug("Created.");
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 25dae80..ed2b66e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.union;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -68,8 +69,8 @@
private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private UnionAller unionall;
- private final List<TransferPair> transfers = Lists.newArrayList();
- private final List<ValueVector> allocationVectors = Lists.newArrayList();
+ private final List<TransferPair> transfers = new ArrayList<>();
+ private final List<ValueVector> allocationVectors = new ArrayList<>();
private int recordCount;
private UnionInputIterator unionInputIterator;
@@ -341,7 +342,7 @@
}
private class UnionInputIterator implements Iterator<Pair<IterOutcome, BatchStatusWrappper>> {
- private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
+ private final Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 1715c99..85eceea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -295,6 +295,7 @@
remainderIndex = 0;
logger.debug("IterOutcome: EMIT.");
}
+ rowIdVector.getMutator().setValueCount(outputRecords);
container.setValueCount(outputRecords);
memoryManager.updateOutgoingStats(outputRecords);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 99bd6d1..72a337a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -64,8 +64,8 @@
* "sales_city" : BIGINT - nonnullstatcount(sales_city)
* "cnt" : BIGINT - nonnullstatcount(cnt)
* .... another map for next stats function ....
- *
- * Schema of output:
+ * </pre>
+ * Schema of output: <pre>
* "schema" : BIGINT - Schema number. For each schema change this number is incremented.
* "computed" : BIGINT - What time is this computed?
* "column" : column name
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 8793a65..e1ffd7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -17,41 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.validate;
-import java.util.IdentityHashMap;
-import java.util.Map;
-
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
-import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
-import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
-import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
-import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
-import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
-import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
-import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
-import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch;
-import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
-import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
-import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
-import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataHashAggBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataStreamAggBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataControllerBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataHandlerBatch;
-import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
-import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
-import org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
-import org.apache.drill.exec.physical.impl.trace.TraceRecordBatch;
-import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
-import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
-import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
-import org.apache.drill.exec.physical.impl.unpivot.UnpivotMapsRecordBatch;
-import org.apache.drill.exec.physical.impl.window.WindowFrameRecordBatch;
-import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
-import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SimpleVectorWrapper;
import org.apache.drill.exec.record.VectorAccessible;
@@ -200,89 +166,13 @@
}
}
- private enum CheckMode {
- /** No checking. */
- NONE,
- /** Check only batch, container counts. */
- COUNTS,
- /** Check vector value counts. */
- VECTORS
- };
-
- private static final Map<Class<? extends CloseableRecordBatch>, CheckMode> checkRules = buildRules();
-
private final ErrorReporter errorReporter;
public BatchValidator(ErrorReporter errorReporter) {
this.errorReporter = errorReporter;
}
- /**
- * At present, most operators will not pass the checks here. The following
- * table identifies those that should be checked, and the degree of check.
- * Over time, this table should include all operators, and thus become
- * unnecessary.
- */
- private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() {
- Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>();
- rules.put(OperatorRecordBatch.class, CheckMode.VECTORS);
- rules.put(ScanBatch.class, CheckMode.VECTORS);
- rules.put(ProjectRecordBatch.class, CheckMode.VECTORS);
- rules.put(FilterRecordBatch.class, CheckMode.VECTORS);
- rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS);
- rules.put(UnnestRecordBatch.class, CheckMode.VECTORS);
- rules.put(HashAggBatch.class, CheckMode.VECTORS);
- rules.put(RemovingRecordBatch.class, CheckMode.VECTORS);
- rules.put(StreamingAggBatch.class, CheckMode.VECTORS);
- rules.put(RuntimeFilterRecordBatch.class, CheckMode.VECTORS);
- rules.put(FlattenRecordBatch.class, CheckMode.VECTORS);
- rules.put(MergeJoinBatch.class, CheckMode.VECTORS);
- rules.put(NestedLoopJoinBatch.class, CheckMode.VECTORS);
- rules.put(LimitRecordBatch.class, CheckMode.VECTORS);
- rules.put(MergingRecordBatch.class, CheckMode.VECTORS);
- rules.put(OrderedPartitionRecordBatch.class, CheckMode.VECTORS);
- rules.put(RangePartitionRecordBatch.class, CheckMode.VECTORS);
- rules.put(TraceRecordBatch.class, CheckMode.VECTORS);
- rules.put(UnionAllRecordBatch.class, CheckMode.VECTORS);
- rules.put(UnorderedReceiverBatch.class, CheckMode.VECTORS);
- rules.put(UnpivotMapsRecordBatch.class, CheckMode.VECTORS);
- rules.put(WindowFrameRecordBatch.class, CheckMode.VECTORS);
- rules.put(TopNBatch.class, CheckMode.VECTORS);
- rules.put(HashJoinBatch.class, CheckMode.VECTORS);
- rules.put(ExternalSortBatch.class, CheckMode.VECTORS);
- rules.put(WriterRecordBatch.class, CheckMode.VECTORS);
- rules.put(MetadataStreamAggBatch.class, CheckMode.VECTORS);
- rules.put(MetadataHashAggBatch.class, CheckMode.VECTORS);
- rules.put(MetadataHandlerBatch.class, CheckMode.VECTORS);
- rules.put(MetadataControllerBatch.class, CheckMode.VECTORS);
- return rules;
- }
-
- private static CheckMode lookup(Object subject) {
- CheckMode checkMode = checkRules.get(subject.getClass());
- return checkMode == null ? CheckMode.NONE : checkMode;
- }
-
public static boolean validate(RecordBatch batch) {
- // This is a handy place to trace batches as they flow up
- // the DAG. Works best for single-threaded runs with few records.
- // System.out.println(batch.getClass().getSimpleName());
- // RowSetFormatter.print(batch);
-
- CheckMode checkMode = lookup(batch);
-
- // If no rule, don't check this batch.
-
- if (checkMode == CheckMode.NONE) {
-
- // As work proceeds, might want to log those batches not checked.
- // For now, there are too many.
-
- return true;
- }
-
- // All batches that do any checks will at least check counts.
-
ErrorReporter reporter = errorReporter(batch);
int rowCount = batch.getRecordCount();
int valueCount = rowCount;
@@ -340,9 +230,7 @@
break;
}
}
- if (checkMode == CheckMode.VECTORS) {
- new BatchValidator(reporter).validateBatch(batch, valueCount);
- }
+ new BatchValidator(reporter).validateBatch(batch, valueCount);
return reporter.errorCount() == 0;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 6ed004f..07a5c76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.window;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -47,7 +48,6 @@
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +64,7 @@
private List<WindowDataBatch> batches;
private WindowFramer[] framers;
- private final List<WindowFunction> functions = Lists.newArrayList();
+ private final List<WindowFunction> functions = new ArrayList<>();
private boolean noMoreBatches; // true when downstream returns NONE
private BatchSchema schema;
@@ -75,7 +75,7 @@
RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
- batches = Lists.newArrayList();
+ batches = new ArrayList<>();
}
/**
@@ -260,17 +260,15 @@
logger.trace("creating framer(s)");
- List<LogicalExpression> keyExprs = Lists.newArrayList();
- List<LogicalExpression> orderExprs = Lists.newArrayList();
+ List<LogicalExpression> keyExprs = new ArrayList<>();
+ List<LogicalExpression> orderExprs = new ArrayList<>();
boolean requireFullPartition = false;
boolean useDefaultFrame = false; // at least one window function uses the DefaultFrameTemplate
boolean useCustomFrame = false; // at least one window function uses the CustomFrameTemplate
// all existing vectors will be transferred to the outgoing container in framer.doWork()
- for (VectorWrapper<?> wrapper : batch) {
- container.addOrGet(wrapper.getField());
- }
+ container.copySchemaFrom(batch);
// add aggregation vectors to the container, and materialize corresponding expressions
for (NamedExpression ne : popConfig.getAggregations()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index f51f521..03e8ffa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -21,10 +21,8 @@
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-// TODO javadoc
public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
// TODO are these <?> related in any way? Should they be the same one?
- // TODO javadoc
VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 1cfc61d..3796e5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -553,4 +553,10 @@
// in the offset vectors that need it.
setValueCount(0);
}
+
+ public void copySchemaFrom(VectorAccessible other) {
+ for (VectorWrapper<?> wrapper : other) {
+ addOrGet(wrapper.getField());
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index da42b27..0ab4181 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -79,8 +79,8 @@
* @param columns pathnames of columns/subfields to read
* @throws OutOfMemoryException
*/
- public JSONRecordReader(final FragmentContext fragmentContext, final Path inputPath, final DrillFileSystem fileSystem,
- final List<SchemaPath> columns) throws OutOfMemoryException {
+ public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, DrillFileSystem fileSystem,
+ List<SchemaPath> columns) throws OutOfMemoryException {
this(fragmentContext, inputPath, null, fileSystem, columns);
}
@@ -137,15 +137,15 @@
}
@Override
- public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
try{
if (hadoopPath != null) {
- this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
+ stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
}
- this.writer = new VectorContainerWriter(output, unionEnabled);
+ writer = new VectorContainerWriter(output, unionEnabled);
if (isSkipQuery()) {
- this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, enableEscapeAnyChar);
+ jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, enableEscapeAnyChar);
} else {
this.jsonReader = new JsonReader.Builder(fragmentContext.getManagedBuffer())
.schemaPathColumns(ImmutableList.copyOf(getColumns()))
@@ -157,7 +157,7 @@
.build();
}
setupParser();
- } catch (final Exception e){
+ } catch (Exception e){
handleAndRaise("Failure reading JSON file", e);
}
}
@@ -182,7 +182,7 @@
int columnNr = -1;
if (e instanceof JsonParseException) {
- final JsonParseException ex = (JsonParseException) e;
+ JsonParseException ex = (JsonParseException) e;
message = ex.getOriginalMessage();
columnNr = ex.getLocation().getColumnNr();
}
@@ -226,7 +226,8 @@
}
++parseErrorCount;
if (printSkippedMalformedJSONRecordLineNumber) {
- logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount));
+ logger.debug("Error parsing JSON in {}: line: {}",
+ hadoopPath.getName(), recordCount + parseErrorCount);
}
if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
break;
@@ -254,8 +255,9 @@
@Override
public void close() throws Exception {
- if(stream != null) {
+ if (stream != null) {
stream.close();
+ stream = null;
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 79aa1d3..04bc67d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -81,6 +81,9 @@
@Test
public void schemaChange() throws Exception {
+ // Verifies that the schema change does not cause a
+ // crash. A pretty minimal test.
+ // TODO: Verify actual results.
test("select b from dfs.`vector/complex/writer/schemaChange/`");
}
@@ -267,12 +270,15 @@
@Test
public void testAllTextMode() throws Exception {
- test("alter system set `store.json.all_text_mode` = true");
- String[] queries = {"select * from cp.`store/json/schema_change_int_to_string.json`"};
- long[] rowCounts = {3};
- String filename = "/store/json/schema_change_int_to_string.json";
- runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
- test("alter system set `store.json.all_text_mode` = false");
+ try {
+ alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+ String[] queries = {"select * from cp.`store/json/schema_change_int_to_string.json`"};
+ long[] rowCounts = {3};
+ String filename = "/store/json/schema_change_int_to_string.json";
+ runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+ } finally {
+ resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+ }
}
@Test
@@ -293,58 +299,87 @@
@Test
public void testNullWhereListExpected() throws Exception {
- test("alter system set `store.json.all_text_mode` = true");
- String[] queries = {"select * from cp.`store/json/null_where_list_expected.json`"};
- long[] rowCounts = {3};
- String filename = "/store/json/null_where_list_expected.json";
- runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
- test("alter system set `store.json.all_text_mode` = false");
+ try {
+ alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+ String[] queries = {"select * from cp.`store/json/null_where_list_expected.json`"};
+ long[] rowCounts = {3};
+ String filename = "/store/json/null_where_list_expected.json";
+ runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+ }
+ finally {
+ resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+ }
}
@Test
public void testNullWhereMapExpected() throws Exception {
- test("alter system set `store.json.all_text_mode` = true");
- String[] queries = {"select * from cp.`store/json/null_where_map_expected.json`"};
- long[] rowCounts = {3};
- String filename = "/store/json/null_where_map_expected.json";
- runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
- test("alter system set `store.json.all_text_mode` = false");
+ try {
+ alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+ String[] queries = {"select * from cp.`store/json/null_where_map_expected.json`"};
+ long[] rowCounts = {3};
+ String filename = "/store/json/null_where_map_expected.json";
+ runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+ }
+ finally {
+ resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+ }
}
@Test
public void ensureProjectionPushdown() throws Exception {
- // Tests to make sure that we are correctly eliminating schema changing columns. If completes, means that the projection pushdown was successful.
- test("alter system set `store.json.all_text_mode` = false; "
- + "select t.field_1, t.field_3.inner_1, t.field_3.inner_2, t.field_4.inner_1 "
- + "from cp.`store/json/schema_change_int_to_string.json` t");
+ try {
+ // Tests to make sure that we are correctly eliminating schema changing
+ // columns. If completes, means that the projection pushdown was
+ // successful.
+ test("alter system set `store.json.all_text_mode` = false; "
+ + "select t.field_1, t.field_3.inner_1, t.field_3.inner_2, t.field_4.inner_1 "
+ + "from cp.`store/json/schema_change_int_to_string.json` t");
+ } finally {
+ resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+ }
}
- // The project pushdown rule is correctly adding the projected columns to the scan, however it is not removing
- // the redundant project operator after the scan, this tests runs a physical plan generated from one of the tests to
- // ensure that the project is filtering out the correct data in the scan alone
+ // The project pushdown rule is correctly adding the projected columns to the
+ // scan, however it is not removing the redundant project operator after the
+ // scan, this tests runs a physical plan generated from one of the tests to
+ // ensure that the project is filtering out the correct data in the scan alone.
@Test
public void testProjectPushdown() throws Exception {
- String[] queries = {Files.asCharSource(DrillFileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8).read()};
- long[] rowCounts = {3};
- String filename = "/store/json/schema_change_int_to_string.json";
- test("alter system set `store.json.all_text_mode` = false");
- runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts);
+ try {
+ String[] queries = {Files.asCharSource(DrillFileUtils.getResourceAsFile(
+ "/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8).read()};
+ String filename = "/store/json/schema_change_int_to_string.json";
+ alterSession(ExecConstants.JSON_ALL_TEXT_MODE, false);
+ long[] rowCounts = {3};
+ runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts);
- List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
- assertEquals(1, results.size());
- // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`"
+ List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
+ assertEquals(1, results.size());
+ // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`"
- RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
- QueryDataBatch batch = results.get(0);
- assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+ QueryDataBatch batch = results.get(0);
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
- // this used to be five. It is now three. This is because the plan doesn't have a project.
- // Scanners are not responsible for projecting non-existent columns (as long as they project one column)
- assertEquals(3, batchLoader.getSchema().getFieldCount());
- testExistentColumns(batchLoader);
+ // this used to be five. It is now four. This is because the plan doesn't
+ // have a project. Scanners are not responsible for projecting non-existent
+ // columns (as long as they project one column)
+ //
+ // That said, the JSON format plugin does claim it can do project
+ // push-down, which means it will ensure columns for any column
+ // mentioned in the project list, in a form consistent with the schema
+ // path. In this case, `non_existent`.`nested`.`field` appears in
+ // the query. But, even more oddly, the missing field is inserted only
+ // if all text mode is true, omitted if all text mode is false.
+ // Seems overly complex.
+ assertEquals(3, batchLoader.getSchema().getFieldCount());
+ testExistentColumns(batchLoader);
- batch.release();
- batchLoader.clear();
+ batch.release();
+ batchLoader.clear();
+ } finally {
+ resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+ }
}
@Test
@@ -360,32 +395,32 @@
private void testExistentColumns(RecordBatchLoader batchLoader) throws SchemaChangeException {
VectorWrapper<?> vw = batchLoader.getValueAccessorById(
- RepeatedBigIntVector.class, //
- batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds() //
+ RepeatedBigIntVector.class,
+ batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds()
);
assertEquals("[1]", vw.getValueVector().getAccessor().getObject(0).toString());
assertEquals("[5]", vw.getValueVector().getAccessor().getObject(1).toString());
assertEquals("[5,10,15]", vw.getValueVector().getAccessor().getObject(2).toString());
vw = batchLoader.getValueAccessorById(
- IntVector.class, //
- batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_1")).getFieldIds() //
+ IntVector.class,
+ batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_1")).getFieldIds()
);
assertNull(vw.getValueVector().getAccessor().getObject(0));
assertEquals(2l, vw.getValueVector().getAccessor().getObject(1));
assertEquals(5l, vw.getValueVector().getAccessor().getObject(2));
vw = batchLoader.getValueAccessorById(
- IntVector.class, //
- batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_2")).getFieldIds() //
+ IntVector.class,
+ batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_2")).getFieldIds()
);
assertNull(vw.getValueVector().getAccessor().getObject(0));
assertNull(vw.getValueVector().getAccessor().getObject(1));
assertEquals(3l, vw.getValueVector().getAccessor().getObject(2));
vw = batchLoader.getValueAccessorById(
- RepeatedBigIntVector.class, //
- batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", "inner_1")).getFieldIds() //
+ RepeatedBigIntVector.class,
+ batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", "inner_1")).getFieldIds()
);
assertEquals("[]", vw.getValueVector().getAccessor().getObject(0).toString());
assertEquals("[1,2,3]", vw.getValueVector().getAccessor().getObject(1).toString());
@@ -440,7 +475,7 @@
)
).go();
} finally {
- testNoResult("alter session set `exec.enable_union_type` = false");
+ resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
@@ -457,7 +492,7 @@
.baselineValues(13L, "BIGINT")
.go();
} finally {
- testNoResult("alter session set `exec.enable_union_type` = false");
+ resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
@@ -477,7 +512,7 @@
.baselineValues(3L)
.go();
} finally {
- testNoResult("alter session set `exec.enable_union_type` = false");
+ resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
@@ -495,7 +530,7 @@
.baselineValues(9L)
.go();
} finally {
- testNoResult("alter session set `exec.enable_union_type` = false");
+ resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
@@ -512,7 +547,7 @@
.baselineValues(11.0)
.go();
} finally {
- testNoResult("alter session set `exec.enable_union_type` = false");
+ resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
@@ -536,7 +571,7 @@
.baselineValues(20000L)
.go();
} finally {
- testNoResult("alter session set `exec.enable_union_type` = false");
+ resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
@@ -565,7 +600,7 @@
.baselineValues(20000L)
.go();
} finally {
- testNoResult("alter session set `exec.enable_union_type` = false");
+ resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
}
@@ -628,7 +663,7 @@
.go();
} finally {
- testNoResult("alter session set `store.json.all_text_mode` = false");
+ resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
}
}