blob: 7b1b6ce0e5021e61d573af883b4c7cf8cd85a393 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.SettableFuture;
public class ParquetResultListener implements UserResultsListener {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetResultListener.class);
private final SettableFuture<Void> future = SettableFuture.create();
int count = 0;
int totalRecords;
private final boolean testValues;
private final BufferAllocator allocator;
int batchCounter = 1;
private final HashMap<String, Integer> valuesChecked = new HashMap<>();
private final ParquetTestProperties props;
ParquetResultListener(BufferAllocator allocator, ParquetTestProperties props,
int numberOfTimesRead, boolean testValues) {
this.allocator = allocator;
this.props = props;
this.totalRecords = props.recordsPerRowGroup * props.numberRowGroups * numberOfTimesRead;
this.testValues = testValues;
}
@Override
public void submissionFailed(UserException ex) {
logger.error("Submission failed.", ex);
future.setException(ex);
}
@Override
public void queryCompleted(QueryState state) {
checkLastChunk();
}
private <T> void assertField(ValueVector valueVector, int index,
TypeProtos.MinorType expectedMinorType, Object value, String name) {
assertField(valueVector, index, expectedMinorType, value, name, 0);
}
@SuppressWarnings("unchecked")
private <T> void assertField(ValueVector valueVector, int index,
TypeProtos.MinorType expectedMinorType, T value, String name, int parentFieldId) {
if (expectedMinorType == TypeProtos.MinorType.MAP) {
return;
}
final T val;
try {
val = (T) valueVector.getAccessor().getObject(index);
} catch (Throwable ex) {
throw ex;
}
if (val instanceof byte[]) {
assertTrue(Arrays.equals((byte[]) value, (byte[]) val));
} else {
assertEquals(value, val);
}
}
@Override
synchronized public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
logger.debug("result arrived in test batch listener.");
int columnValCounter = 0;
FieldInfo currentField;
count += result.getHeader().getRowCount();
boolean schemaChanged = false;
final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData());
// used to make sure each vector in the batch has the same number of records
int valueCount = batchLoader.getRecordCount();
// print headers.
if (schemaChanged) {
} // do not believe any change is needed for when the schema changes, with the current mock scan use case
for (final VectorWrapper<?> vw : batchLoader) {
final ValueVector vv = vw.getValueVector();
currentField = props.fields.get(vv.getField().getName());
if (!valuesChecked.containsKey(vv.getField().getName())) {
valuesChecked.put(vv.getField().getName(), 0);
columnValCounter = 0;
} else {
columnValCounter = valuesChecked.get(vv.getField().getName());
}
printColumnMajor(vv);
if (testValues) {
for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
assertField(vv, j, currentField.type,
currentField.values[columnValCounter % 3], currentField.name + "/");
columnValCounter++;
}
} else {
columnValCounter += vv.getAccessor().getValueCount();
}
valuesChecked.remove(vv.getField().getName());
assertEquals("Mismatched value count for vectors in the same batch.", valueCount, vv.getAccessor().getValueCount());
valuesChecked.put(vv.getField().getName(), columnValCounter);
}
if (ParquetRecordReaderTest.VERBOSE_DEBUG){
printRowMajor(batchLoader);
}
batchCounter++;
batchLoader.clear();
result.release();
}
private void checkLastChunk() {
int recordsInBatch = -1;
// ensure the right number of columns was returned, especially important to ensure selective column read is working
if (testValues) {
assertEquals( "Unexpected number of output columns from parquet scan.", props.fields.keySet().size(), valuesChecked.keySet().size() );
}
for (final String s : valuesChecked.keySet()) {
try {
if (recordsInBatch == -1 ){
recordsInBatch = valuesChecked.get(s);
} else {
assertEquals("Mismatched record counts in vectors.", recordsInBatch, valuesChecked.get(s).intValue());
}
assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s));
} catch (AssertionError e) {
submissionFailed(UserException.systemError(e).build(logger));
}
}
assertTrue(valuesChecked.keySet().size() > 0);
future.set(null);
}
public void printColumnMajor(ValueVector vv) {
logger.debug("\n{}", vv.getField().getName());
final StringBuilder sb = new StringBuilder();
for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
if (logger.isDebugEnabled()) {
Object o = vv.getAccessor().getObject(j);
if (o instanceof byte[]) {
try {
o = new String((byte[])o, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
sb.append(Strings.padStart(o + "", 20, ' ') + " ");
sb.append(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
}
}
logger.debug(sb.toString());
logger.debug(Integer.toString(vv.getAccessor().getValueCount()));
}
public void printRowMajor(RecordBatchLoader batchLoader) {
for (int i = 0; i < batchLoader.getRecordCount(); i++) {
if (i % 50 == 0) {
final StringBuilder sb = new StringBuilder();
for (VectorWrapper<?> vw : batchLoader) {
ValueVector v = vw.getValueVector();
sb.append(Strings.padStart(v.getField().getName(), 20, ' ') + " ");
}
logger.debug(sb.toString());
}
final StringBuilder sb = new StringBuilder();
for (final VectorWrapper<?> vw : batchLoader) {
final ValueVector v = vw.getValueVector();
Object o = v.getAccessor().getObject(i);
if (o instanceof byte[]) {
try {
// TODO - in the dictionary read error test there is some data that does not look correct
// the output of our reader matches the values of the parquet-mr cat/head tools (no full comparison was made,
// but from a quick check of a few values it looked consistent
// this might have gotten corrupted by pig somehow, or maybe this is just how the data is supposed ot look
// TODO - check this!!
// for (int k = 0; k < ((byte[])o).length; k++ ) {
// // check that the value at each position is a valid single character ascii value.
//
// if (((byte[])o)[k] > 128) {
// }
// }
o = new String((byte[])o, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
sb.append(Strings.padStart(o + "", 20, ' ') + " ");
}
logger.debug(sb.toString());
}
}
public void getResults() throws RpcException {
try {
future.get();
} catch(Throwable t) {
throw RpcException.mapException(t);
}
}
@Override
public void queryIdArrived(UserBitShared.QueryId queryId) {
}
}