blob: 2fa8c756f540c195ad79596b3223bf7cb951e4aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.test;
import java.math.BigDecimal;
import java.math.BigInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.HyperVectorValueIterator;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.HyperVectorWrapper;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.util.Text;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An object to encapsulate the options for a Drill unit test, as well as the execution methods to perform the tests and
* validation of results.
*
* To construct an instance easily, look at the TestBuilder class. From an implementation of
* the BaseTestQuery class, and instance of the builder is accessible through the testBuilder() method.
*/
public class DrillTestWrapper {
private static final Logger logger = LoggerFactory.getLogger(DrillTestWrapper.class);
public interface TestServices {
BufferAllocator allocator();
void test(String query) throws Exception;
List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception;
}
// TODO - when in JSON, read baseline in all text mode to avoid precision loss for decimal values
// This flag will enable all of the values that are validated to be logged. For large validations this is time consuming
// so this is not exposed in a way that it can be enabled for an individual test. It can be changed here while debugging
// a test to see all of the output, but as this framework is doing full validation, there is no reason to keep it on as
// it will only make the test slower.
private static boolean VERBOSE_DEBUG = false;
// Unit test doesn't expect any specific batch count
public static final int EXPECTED_BATCH_COUNT_NOT_SET = -1;
public static final int EXPECTED_NUM_RECORDS_NOT_SET = - 1;
// The motivation behind the TestBuilder was to provide a clean API for test writers. The model is mostly designed to
// prepare all of the components necessary for running the tests, before the TestWrapper is initialized. There is however
// one case where the setup for the baseline is driven by the test query results, and this is implicit type enforcement
// for the baseline data. In this case there needs to be a call back into the TestBuilder once we know the type information
// from the test query.
private TestBuilder testBuilder;
/**
* Test query to run. Type of object depends on the {@link #queryType}
*/
private Object query;
// The type of query provided
private UserBitShared.QueryType queryType;
// The type of query provided for the baseline
private UserBitShared.QueryType baselineQueryType;
// should ordering be enforced in the baseline check
private boolean ordered;
private TestServices services;
// queries to run before the baseline or test queries, can be used to set options
private String baselineOptionSettingQueries;
private String testOptionSettingQueries;
// allow approximate equality tests for number types
private boolean approximateEquality;
// tolerance for approximate equality tests defined as |Expected - Actual|/|Expected| <= Tolerance
private double tolerance;
// two different methods are available for comparing ordered results, the default reads all of the records
// into giant lists of objects, like one giant on-heap batch of 'vectors'
// this flag enables the other approach which iterates through a hyper batch for the test query results and baseline
// while this does work faster and use less memory, it can be harder to debug as all of the elements are not in a
// single list
private boolean highPerformanceComparison;
// if the baseline is a single option test writers can provide the baseline values and columns
// without creating a file, these are provided to the builder in the baselineValues() and baselineColumns() methods
// and translated into a map in the builder
private String[] baselineColumns;
private List<Map<String, Object>> baselineRecords;
private int expectedNumBatches;
private int expectedNumRecords;
public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
String baselineOptionSettingQueries, String testOptionSettingQueries,
QueryType baselineQueryType, boolean ordered, boolean approximateEquality, double tolerance,
boolean highPerformanceComparison,
String[] baselineColumns, List<Map<String, Object>> baselineRecords, int expectedNumBatches,
int expectedNumRecords) {
this.testBuilder = testBuilder;
this.services = services;
this.query = query;
this.queryType = queryType;
this.baselineQueryType = baselineQueryType;
this.ordered = ordered;
this.approximateEquality = approximateEquality;
this.tolerance = tolerance;
this.baselineOptionSettingQueries = baselineOptionSettingQueries;
this.testOptionSettingQueries = testOptionSettingQueries;
this.highPerformanceComparison = highPerformanceComparison;
this.baselineColumns = baselineColumns;
this.baselineRecords = baselineRecords;
this.expectedNumBatches = expectedNumBatches;
this.expectedNumRecords = expectedNumRecords;
Preconditions.checkArgument(!(baselineRecords != null && !ordered && highPerformanceComparison));
Preconditions.checkArgument((baselineRecords != null && expectedNumRecords == DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET) || baselineRecords == null,
"Cannot define both baselineRecords and the expectedNumRecords.");
Preconditions.checkArgument((baselineQueryType != null && expectedNumRecords == DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET) || baselineQueryType == null,
"Cannot define both a baselineQueryType and the expectedNumRecords.");
}
public void run() throws Exception {
if (testBuilder.getExpectedSchema() != null) {
compareSchemaOnly();
} else {
if (ordered) {
compareOrderedResults();
} else {
compareUnorderedResults();
}
}
}
private BufferAllocator getAllocator() {
return services.allocator();
}
private void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords,
Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
for (String s : expectedRecords.keySet()) {
assertNotNull("Expected column '" + s + "' not found.", actualRecords.get(s));
assertEquals(expectedRecords.get(s).getTotalRecords(), actualRecords.get(s).getTotalRecords());
HyperVectorValueIterator expectedValues = expectedRecords.get(s);
HyperVectorValueIterator actualValues = actualRecords.get(s);
int i = 0;
while (expectedValues.hasNext()) {
compareValuesErrorOnMismatch(expectedValues.next(), actualValues.next(), i, s);
i++;
}
}
cleanupHyperValueIterators(expectedRecords.values());
cleanupHyperValueIterators(actualRecords.values());
}
private void cleanupHyperValueIterators(Collection<HyperVectorValueIterator> hyperBatches) {
for (HyperVectorValueIterator hvi : hyperBatches) {
for (ValueVector vv : hvi.getHyperVector().getValueVectors()) {
vv.clear();
}
}
}
public static void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception {
for (String s : actualRecords.keySet()) {
assertNotNull("Unexpected extra column " + s + " returned by query.", expectedRecords.get(s));
assertEquals("Incorrect number of rows returned by query.", expectedRecords.get(s).size(), actualRecords.get(s).size());
List<?> expectedValues = expectedRecords.get(s);
List<?> actualValues = actualRecords.get(s);
assertEquals("Different number of records returned", expectedValues.size(), actualValues.size());
for (int i = 0; i < expectedValues.size(); i++) {
try {
compareValuesErrorOnMismatch(expectedValues.get(i), actualValues.get(i), i, s);
} catch (Exception ex) {
throw new Exception(ex.getMessage() + "\n\n" + printNearbyRecords(expectedRecords, actualRecords, i), ex);
}
}
}
if (actualRecords.size() < expectedRecords.size()) {
throw new Exception(findMissingColumns(expectedRecords.keySet(), actualRecords.keySet()));
}
}
private static String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) {
StringBuilder expected = new StringBuilder();
StringBuilder actual = new StringBuilder();
expected.append("Expected Records near verification failure:\n");
actual.append("Actual Records near verification failure:\n");
int firstRecordToPrint = Math.max(0, offset - 5);
List<?> expectedValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next());
List<?> actualValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next());
int numberOfRecordsToPrint = Math.min(Math.min(10, expectedValuesInFirstColumn.size()), actualValuesInFirstColumn.size());
for (int i = firstRecordToPrint; i < numberOfRecordsToPrint; i++) {
expected.append("Record Number: ").append(i).append(" { ");
actual.append("Record Number: ").append(i).append(" { ");
for (String s : actualRecords.keySet()) {
List<?> actualValues = actualRecords.get(s);
actual.append(s).append(" : ").append(actualValues.get(i)).append(",");
}
for (String s : expectedRecords.keySet()) {
List<?> expectedValues = expectedRecords.get(s);
expected.append(s).append(" : ").append(expectedValues.get(i)).append(",");
}
expected.append(" }\n");
actual.append(" }\n");
}
return expected.append("\n\n").append(actual).toString();
}
private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records,
final RecordBatchLoader loader)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>();
long totalRecords = 0;
QueryDataBatch batch;
int size = records.size();
for (int i = 0; i < size; i++) {
batch = records.get(i);
loader.load(batch.getHeader().getDef(), batch.getData());
logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
totalRecords += loader.getRecordCount();
for (VectorWrapper<?> w : loader) {
String field = SchemaPath.getSimplePath(w.getField().getName()).toExpr();
if (!combinedVectors.containsKey(field)) {
MaterializedField mf = w.getField();
ValueVector[] vvList = (ValueVector[]) Array.newInstance(mf.getValueClass(), 1);
vvList[0] = w.getValueVector();
combinedVectors.put(field, new HyperVectorValueIterator(mf, new HyperVectorWrapper<>(mf, vvList)));
} else {
combinedVectors.get(field).getHyperVector().addVector(w.getValueVector());
}
}
}
for (HyperVectorValueIterator hvi : combinedVectors.values()) {
hvi.determineTotalSize();
}
return combinedVectors;
}
private static class BatchIterator implements Iterable<VectorAccessible>, AutoCloseable {
private final List<QueryDataBatch> dataBatches;
private final RecordBatchLoader batchLoader;
public BatchIterator(List<QueryDataBatch> dataBatches, RecordBatchLoader batchLoader) {
this.dataBatches = dataBatches;
this.batchLoader = batchLoader;
}
@Override
public Iterator<VectorAccessible> iterator() {
return new Iterator<VectorAccessible>() {
int index = -1;
@Override
public boolean hasNext() {
return index < dataBatches.size() - 1;
}
@Override
public VectorAccessible next() {
index++;
if (index == dataBatches.size()) {
throw new RuntimeException("Tried to call next when iterator had no more items.");
}
batchLoader.clear();
QueryDataBatch batch = dataBatches.get(index);
try {
batchLoader.load(batch.getHeader().getDef(), batch.getData());
} catch (SchemaChangeException e) {
throw new RuntimeException(e);
}
return batchLoader;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Removing is not supported");
}
};
}
@Override
public void close() throws Exception {
batchLoader.clear();
}
}
/**
* Iterate over batches, and combine the batches into a map, where key is schema path, and value is
* the list of column values across all the batches.
* @param batches
* @param expectedTotalRecords
* @return
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches,
Long expectedBatchSize, Integer expectedNumBatches, Integer expectedTotalRecords)
throws SchemaChangeException, UnsupportedEncodingException {
Map<String, List<Object>> combinedVectors = new TreeMap<>();
addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors, expectedTotalRecords);
return combinedVectors;
}
/**
* Add to result vectors and compare batch schema against expected schema while iterating batches.
* @param batches
* @param expectedSchema: the expected schema the batches should contain. Through SchemaChangeException
* if encounter different batch schema.
* @param combinedVectors: the vectors to hold the values when iterate the batches.
*
* @return number of batches
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema,
Long expectedBatchSize, Integer expectedNumBatches,
Map<String, List<Object>> combinedVectors, Integer expectedTotalRecords)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
int numBatch = 0;
long totalRecords = 0;
BatchSchema schema = null;
for (VectorAccessible loader : batches) {
numBatch++;
if (expectedSchema != null) {
if (! expectedSchema.isEquivalent(loader.getSchema())) {
throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" +
"Actual schema: %s. Expected schema : %s",
loader.getSchema(), expectedSchema));
}
}
if (expectedBatchSize != null) {
RecordBatchSizer sizer = new RecordBatchSizer(loader);
// Not checking actualSize as accounting is not correct when we do
// split and transfer ownership across operators.
Assert.assertTrue(sizer.getNetBatchSize() <= expectedBatchSize);
}
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throws clause above.
if (schema == null) {
schema = loader.getSchema();
for (MaterializedField mf : schema) {
combinedVectors.put(SchemaPath.getSimplePath(mf.getName()).toExpr(), new ArrayList<>());
}
} else {
// TODO - actually handle schema changes, this is just to get access to the SelectionVectorMode
// of the current batch, the check for a null schema is used to only mutate the schema once
// need to add new vectors and null fill for previous batches? distinction between null and non-existence important?
schema = loader.getSchema();
}
logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
totalRecords += loader.getRecordCount();
for (VectorWrapper<?> w : loader) {
String field = SchemaPath.getSimplePath(w.getField().getName()).toExpr();
ValueVector[] vectors;
if (w.isHyper()) {
vectors = w.getValueVectors();
} else {
vectors = new ValueVector[] {w.getValueVector()};
}
SelectionVector2 sv2 = null;
SelectionVector4 sv4 = null;
switch(schema.getSelectionVectorMode()) {
case TWO_BYTE:
sv2 = loader.getSelectionVector2();
break;
case FOUR_BYTE:
sv4 = loader.getSelectionVector4();
break;
}
if (sv4 != null) {
for (int j = 0; j < sv4.getCount(); j++) {
int complexIndex = sv4.get(j);
int batchIndex = complexIndex >> 16;
int recordIndexInBatch = complexIndex & 65535;
Object obj = vectors[batchIndex].getAccessor().getObject(recordIndexInBatch);
if (obj != null) {
if (obj instanceof Text) {
obj = obj.toString();
}
}
combinedVectors.get(field).add(obj);
}
}
else {
for (ValueVector vv : vectors) {
for (int j = 0; j < loader.getRecordCount(); j++) {
int index;
if (sv2 != null) {
index = sv2.getIndex(j);
} else {
index = j;
}
Object obj = vv.getAccessor().getObject(index);
if (obj != null) {
if (obj instanceof Text) {
obj = obj.toString();
}
}
combinedVectors.get(field).add(obj);
}
}
}
}
}
if (expectedNumBatches != null) {
// Based on how much memory is actually taken by value vectors (because of doubling stuff),
// we have to do complex math for predicting exact number of batches.
// Instead, check that number of batches is at least the minimum that is expected
// and no more than twice of that.
Assert.assertTrue(numBatch >= expectedNumBatches);
Assert.assertTrue(numBatch <= (2*expectedNumBatches));
}
if ( expectedTotalRecords != null ) {
Assert.assertEquals(expectedTotalRecords.longValue(), totalRecords);
}
return numBatch;
}
protected void compareSchemaOnly() throws Exception {
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
List<QueryDataBatch> actual = null;
QueryDataBatch batch = null;
try {
test(testOptionSettingQueries);
actual = testRunAndReturn(queryType, query);
batch = actual.get(0);
loader.load(batch.getHeader().getDef(), batch.getData());
final BatchSchema schema = loader.getSchema();
final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = testBuilder.getExpectedSchema();
if (schema.getFieldCount() != expectedSchema.size()) {
throw new Exception("Expected and actual numbers of columns do not match.");
}
for (int i = 0; i < schema.getFieldCount(); ++i) {
final String actualSchemaPath = schema.getColumn(i).getName();
final TypeProtos.MajorType actualMajorType = schema.getColumn(i).getType();
final String expectedSchemaPath = expectedSchema.get(i).getLeft().getRootSegmentPath();
final TypeProtos.MajorType expectedMajorType = expectedSchema.get(i).getValue();
if (! actualSchemaPath.equals(expectedSchemaPath) ||
! Types.isEquivalent(actualMajorType, expectedMajorType)) {
throw new Exception(String.format("Schema path or type mismatch for column #%d:\n" +
"Expected schema path: %s\nActual schema path: %s\nExpected type: %s\nActual type: %s",
i, expectedSchemaPath, actualSchemaPath, Types.toString(expectedMajorType),
Types.toString(actualMajorType)));
}
}
} finally {
if (actual != null) {
for (QueryDataBatch b : actual) {
b.release();
}
}
loader.clear();
}
}
/**
* Use this method only if necessary to validate one query against another. If
* you are just validating against a baseline file use one of the simpler
* interfaces that will write the validation query for you.
*
* @throws Exception
*/
protected void compareUnorderedResults() throws Exception {
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
List<QueryDataBatch> actual = Collections.emptyList();
List<QueryDataBatch> expected = Collections.emptyList();
List<Map<String, Object>> expectedRecords = new ArrayList<>();
List<Map<String, Object>> actualRecords = new ArrayList<>();
try {
test(testOptionSettingQueries);
actual = testRunAndReturn(queryType, query);
checkNumBatches(actual);
addTypeInfoIfMissing(actual.get(0), testBuilder);
addToMaterializedResults(actualRecords, actual, loader);
// If actual result record number is 0,
// and the baseline records does not exist, and baselineColumns provided,
// compare actual column number/names with expected columns
if (actualRecords.size() == 0
&& (baselineRecords == null || baselineRecords.size()==0)
&& baselineColumns != null) {
checkColumnDef(loader.getSchema());
}
// If baseline data was not provided to the test builder directly, we must
// run a query for the baseline, this includes
// the cases where the baseline is stored in a file.
if (baselineRecords == null) {
if (expectedNumRecords != DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET) {
Assert.assertEquals(expectedNumRecords, actualRecords.size());
return;
} else {
test(baselineOptionSettingQueries);
expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
addToMaterializedResults(expectedRecords, expected, loader);
}
} else {
expectedRecords = baselineRecords;
}
compareResults(expectedRecords, actualRecords);
} finally {
cleanupBatches(actual, expected);
}
}
public void checkColumnDef(BatchSchema batchSchema) throws Exception{
assert (batchSchema != null && batchSchema.getFieldCount()==baselineColumns.length);
for (int i=0; i<baselineColumns.length; ++i) {
if (!SchemaPath.parseFromString(baselineColumns[i]).equals(
SchemaPath.parseFromString(batchSchema.getColumn(i).getName()))) {
throw new Exception(i + "the expected column name is not matching: "
+ baselineColumns[i] + " is not " +
batchSchema.getColumn(i).getName());
}
}
}
/**
* Use this method only if necessary to validate one query against another. If you are just validating against a
* baseline file use one of the simpler interfaces that will write the validation query for you.
*
* @throws Exception
*/
protected void compareOrderedResults() throws Exception {
if (highPerformanceComparison) {
if (baselineQueryType == null) {
throw new Exception("Cannot do a high performance comparison without using a baseline file");
}
compareResultsHyperVector();
} else {
compareMergedOnHeapVectors();
}
}
public void compareMergedOnHeapVectors() throws Exception {
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
List<QueryDataBatch> actual = Collections.emptyList();
List<QueryDataBatch> expected = Collections.emptyList();
Map<String, List<Object>> actualSuperVectors;
Map<String, List<Object>> expectedSuperVectors = null;
try {
test(testOptionSettingQueries);
actual = testRunAndReturn(queryType, query);
checkNumBatches(actual);
// To avoid extra work for test writers, types can optionally be inferred from the test query
addTypeInfoIfMissing(actual.get(0), testBuilder);
BatchIterator batchIter = new BatchIterator(actual, loader);
actualSuperVectors = addToCombinedVectorResults(batchIter, null, null, null);
batchIter.close();
// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
// the cases where the baseline is stored in a file.
if (baselineRecords == null) {
if (baselineQueryType == null && baselineColumns != null) {
checkAscendingOrdering(actualSuperVectors);
return;
} else {
test(baselineOptionSettingQueries);
expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
BatchIterator exBatchIter = new BatchIterator(expected, loader);
expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null, null);
exBatchIter.close();
}
} else {
// data is built in the TestBuilder in a row major format as it is provided by the user
// translate it here to vectorized, the representation expected by the ordered comparison
expectedSuperVectors = translateRecordListToHeapVectors(baselineRecords);
}
compareMergedVectors(expectedSuperVectors, actualSuperVectors);
} catch (Exception e) {
throw new Exception(e.getMessage() + "\nFor query: " + query, e);
} finally {
cleanupBatches(expected, actual);
}
}
private void checkAscendingOrdering(Map<String, List<Object>> results) {
int numRecords = results.get(baselineColumns[0]).size();
for (int index = 1; index < numRecords; index++) {
int prevIndex = index - 1;
for (String column: baselineColumns) {
List<Object> objects = results.get(column);
Object prevObject = objects.get(prevIndex);
Object currentObject = objects.get(index);
Assert.assertTrue(RowSetComparison.ObjectComparator.INSTANCE.compare(prevObject, currentObject) <= 0);
}
}
}
public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) {
Map<String, List<Object>> ret = new TreeMap<>();
if (records == null) {
return ret;
}
for (String s : records.get(0).keySet()) {
ret.put(s, new ArrayList<>());
}
for (Map<String, Object> m : records) {
for (String s : m.keySet()) {
ret.get(s).add(m.get(s));
}
}
return ret;
}
public void compareResultsHyperVector() throws Exception {
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
test(testOptionSettingQueries);
List<QueryDataBatch> results = testRunAndReturn(queryType, query);
checkNumBatches(results);
// To avoid extra work for test writers, types can optionally be inferred from the test query
addTypeInfoIfMissing(results.get(0), testBuilder);
Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader);
test(baselineOptionSettingQueries);
List<QueryDataBatch> expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader);
compareHyperVectors(expectedSuperVectors, actualSuperVectors);
cleanupBatches(results, expected);
}
private void checkNumBatches(final List<QueryDataBatch> results) {
if (expectedNumBatches != EXPECTED_BATCH_COUNT_NOT_SET) {
final int actualNumBatches = results.size();
assertEquals(String.format("Expected %d batches but query returned %d non empty batch(es)%n", expectedNumBatches,
actualNumBatches), expectedNumBatches, actualNumBatches);
}
}
private void addTypeInfoIfMissing(QueryDataBatch batch, TestBuilder testBuilder) {
if (! testBuilder.typeInfoSet()) {
Map<SchemaPath, TypeProtos.MajorType> typeMap = getTypeMapFromBatch(batch);
testBuilder.baselineTypes(typeMap);
}
}
private Map<SchemaPath, TypeProtos.MajorType> getTypeMapFromBatch(QueryDataBatch batch) {
Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
for (int i = 0; i < batch.getHeader().getDef().getFieldCount(); i++) {
typeMap.put(SchemaPath.getSimplePath(MaterializedField.create(batch.getHeader().getDef().getField(i)).getName()),
batch.getHeader().getDef().getField(i).getMajorType());
}
return typeMap;
}
@SafeVarargs
private final void cleanupBatches(List<QueryDataBatch>... results) {
for (List<QueryDataBatch> resultList : results ) {
for (QueryDataBatch result : resultList) {
result.release();
}
}
}
public static void addToMaterializedResults(List<Map<String, Object>> materializedRecords,
List<QueryDataBatch> records,
RecordBatchLoader loader)
throws SchemaChangeException, UnsupportedEncodingException {
long totalRecords = 0;
QueryDataBatch batch;
int size = records.size();
for (int i = 0; i < size; i++) {
batch = records.get(0);
loader.load(batch.getHeader().getDef(), batch.getData());
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throws clause above.
logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
totalRecords += loader.getRecordCount();
for (int j = 0; j < loader.getRecordCount(); j++) {
Map<String, Object> record = new TreeMap<>();
for (VectorWrapper<?> w : loader) {
Object obj = w.getValueVector().getAccessor().getObject(j);
if (obj != null) {
if (obj instanceof Text) {
obj = obj.toString();
}
record.put(SchemaPath.getSimplePath(w.getField().getName()).toExpr(), obj);
}
record.put(SchemaPath.getSimplePath(w.getField().getName()).toExpr(), obj);
}
materializedRecords.add(record);
}
records.remove(0);
batch.release();
loader.clear();
}
}
public static boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception {
if (compareValues(expected, actual, counter, column)) {
return true;
}
if (expected == null) {
throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: null " +
"but received " + actual + "(" + actual.getClass().getSimpleName() + ")");
}
if (actual == null) {
throw new Exception("unexpected null at position " + counter + " column '" + column + "' should have been: " + expected);
}
if (actual instanceof byte[]) {
throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: "
+ new String((byte[])expected, "UTF-8") + " but received " + new String((byte[])actual, "UTF-8"));
}
if (!expected.equals(actual)) {
throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: "
+ expected + "(" + expected.getClass().getSimpleName() + ") but received " + actual + "(" + actual.getClass().getSimpleName() + ")");
}
return true;
}
public static boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception {
return compareValues(expected, actual, counter, column, false, 0);
}
public static boolean compareValues(Object expected, Object actual, int counter, String column,
boolean approximateEquality, double tolerance) throws Exception {
if (expected == null) {
if (actual == null) {
if (VERBOSE_DEBUG) {
logger.debug("(1) at position " + counter + " column '" + column + "' matched value: " + expected );
}
return true;
} else {
return false;
}
}
if (actual == null) {
return false;
}
if (actual instanceof byte[]) {
if (!Arrays.equals((byte[]) expected, (byte[]) actual)) {
return false;
} else {
if (VERBOSE_DEBUG) {
logger.debug("at position " + counter + " column '" + column + "' matched value " + new String((byte[])expected, "UTF-8"));
}
return true;
}
}
if (!expected.equals(actual)) {
if (approximateEquality && expected instanceof Number && actual instanceof Number) {
if (expected instanceof BigDecimal && actual instanceof BigDecimal) {
if (((((BigDecimal) expected).subtract((BigDecimal) actual)).abs().divide((BigDecimal) expected).abs()).compareTo(BigDecimal.valueOf(tolerance)) <= 0) {
return true;
}
} else if (expected instanceof BigInteger && actual instanceof BigInteger) {
BigDecimal expBD = new BigDecimal((BigInteger)expected);
BigDecimal actBD = new BigDecimal((BigInteger)actual);
if ((expBD.subtract(actBD)).abs().divide(expBD.abs()).compareTo(BigDecimal.valueOf(tolerance)) <= 0) {
return true;
}
} else if (!(expected instanceof BigDecimal || expected instanceof BigInteger) && !(actual instanceof BigDecimal || actual instanceof BigInteger)) {
// For all other types cast to double and compare
if (Math.abs((double) expected - (double) actual) / Math.abs((double) expected) <= tolerance) {
return true;
}
}
}
return false;
} else {
if (VERBOSE_DEBUG) {
logger.debug("at position " + counter + " column '" + column + "' matched value: " + expected );
}
}
return true;
}
/**
* Compare two result sets, ignoring ordering.
*
* @param expectedRecords - list of records from baseline
* @param actualRecords - list of records from test query, WARNING - this list is destroyed in this method
* @throws Exception
*/
private void compareResults(List<Map<String, Object>> expectedRecords, List<Map<String, Object>> actualRecords) throws Exception {
assertEquals("Different number of records returned", expectedRecords.size(), actualRecords.size());
int i = 0;
int counter = 0;
boolean found;
for (Map<String, Object> expectedRecord : expectedRecords) {
i = 0;
found = false;
StringBuilder mismatchHistory = new StringBuilder();
findMatch:
for (Map<String, Object> actualRecord : actualRecords) {
for (String s : actualRecord.keySet()) {
if (!expectedRecord.containsKey(s)) {
throw new Exception("Unexpected column '" + s + "' returned by query.");
}
if (! compareValues(expectedRecord.get(s), actualRecord.get(s), counter, s, approximateEquality, tolerance)) {
i++;
mismatchHistory.append("column: ").append(s)
.append(" exp: |").append(expectedRecord.get(s))
.append("| act: |").append(actualRecord.get(s)).append("|\n");
continue findMatch;
}
}
if (actualRecord.size() < expectedRecord.size()) {
throw new Exception(findMissingColumns(expectedRecord.keySet(), actualRecord.keySet()));
}
found = true;
break;
}
if (!found) {
StringBuilder sb = new StringBuilder();
for (int expectedRecordDisplayCount = 0;
expectedRecordDisplayCount < 10 && expectedRecordDisplayCount < expectedRecords.size();
expectedRecordDisplayCount++) {
sb.append(printRecord(expectedRecords.get(expectedRecordDisplayCount)));
}
String expectedRecordExamples = sb.toString();
sb.setLength(0);
for (int actualRecordDisplayCount = 0;
actualRecordDisplayCount < 10 && actualRecordDisplayCount < actualRecords.size();
actualRecordDisplayCount++) {
sb.append(printRecord(actualRecords.get(actualRecordDisplayCount)));
}
String actualRecordExamples = sb.toString();
throw new Exception(String.format("After matching %d records, did not find expected record in result set:\n %s\n\n" +
"Mismatch column: \n" + mismatchHistory + "\n" +
"Some examples of expected records:\n%s\n\n Some examples of records returned by the test query:\n%s",
counter, printRecord(expectedRecord), expectedRecordExamples, actualRecordExamples));
} else {
actualRecords.remove(i);
counter++;
}
}
assertEquals(0, actualRecords.size());
}
private static String findMissingColumns(Set<String> expected, Set<String> actual) {
String missingCols = "";
for (String colName : expected) {
if (!actual.contains(colName)) {
missingCols += colName + ", ";
}
}
return "Expected column(s) " + missingCols + " not found in result set: " + actual + ".";
}
private String printRecord(Map<String, ?> record) {
StringBuilder sb = new StringBuilder();
record.keySet().stream().sorted()
.forEach(key -> sb.append(key).append(" : ").append(record.get(key)).append(", "));
return sb.append(System.lineSeparator()).toString();
}
private void test(String query) throws Exception {
services.test(query);
}
private List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception {
return services.testRunAndReturn(type, query);
}
}