blob: 5f06d8bed015aaaa260e7788f299095d55fffe52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;
import static org.apache.drill.exec.store.parquet.TestFileGenerator.populateFieldInfoMap;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import io.netty.buffer.DrillBuf;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.drill.exec.ops.FragmentContextImpl;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.DrillFileUtils;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.CachedSingleFileSystem;
import org.apache.drill.exec.store.TestOutputMutator;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.util.Text;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.Footer;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
@Ignore
public class ParquetRecordReaderTest extends BaseTestQuery {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
static final boolean VERBOSE_DEBUG = false;
private static final int numberRowGroups = 1;
private static final int recordsPerRowGroup = 300;
private static int DEFAULT_BYTES_PER_PAGE = 1024 * 1024 * 1;
private static final String fileName = "tmp/parquet_test_file_many_types";
@BeforeClass
public static void generateFile() throws Exception {
final File f = new File(fileName);
final ParquetTestProperties props =
new ParquetTestProperties(numberRowGroups, recordsPerRowGroup, DEFAULT_BYTES_PER_PAGE, new HashMap<String, FieldInfo>());
populateFieldInfoMap(props);
if (!f.exists()) {
TestFileGenerator.generateParquetFile(fileName, props);
}
}
@Test
public void testMultipleRowGroupsAndReads3() throws Exception {
final String planName = "parquet/parquet_scan_screen.json";
testParquetFullEngineLocalPath(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
}
public String getPlanForFile(String pathFileName, String parquetFileName) throws IOException {
return Files.asCharSource(DrillFileUtils.getResourceAsFile(pathFileName), StandardCharsets.UTF_8).read()
.replaceFirst("&REPLACED_IN_PARQUET_TEST&", parquetFileName);
}
@Test
public void testMultipleRowGroupsAndReads2() throws Exception {
final StringBuilder readEntries = new StringBuilder();
// number of times to read the file
int i = 3;
for (int j = 0; j < i; j++) {
readEntries.append('"');
readEntries.append(fileName);
readEntries.append('"');
if (j < i - 1) {
readEntries.append(',');
}
}
final String planText = Files.asCharSource(DrillFileUtils.getResourceAsFile(
"parquet/parquet_scan_screen_read_entry_replace.json"), StandardCharsets.UTF_8).read().replaceFirst(
"&REPLACED_IN_PARQUET_TEST&", readEntries.toString());
testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup, true);
}
@Test
public void testDictionaryError() throws Exception {
testFull(QueryType.SQL, "select L_RECEIPTDATE from dfs.`tmp/lineitem_null_dict.parquet`", "", 1, 1, 100000, false);
}
@Test
public void testNullableAgg() throws Exception {
final List<QueryDataBatch> result = testSqlWithResults(
"select sum(a) as total_sum from dfs.`tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
final RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
final QueryDataBatch b = result.get(0);
loader.load(b.getHeader().getDef(), b.getData());
final VectorWrapper vw = loader.getValueAccessorById(
NullableBigIntVector.class,
loader.getValueVectorId(SchemaPath.getCompoundPath("total_sum")).getFieldIds()
);
assertEquals(4999950000l, vw.getValueVector().getAccessor().getObject(0));
b.release();
loader.clear();
}
@Test
public void testNullableFilter() throws Exception {
final List<QueryDataBatch> result = testSqlWithResults(
"select count(wr_return_quantity) as row_count from dfs.`tmp/web_returns` where wr_return_quantity = 1");
assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
final RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
final QueryDataBatch b = result.get(0);
loader.load(b.getHeader().getDef(), b.getData());
final VectorWrapper vw = loader.getValueAccessorById(
BigIntVector.class,
loader.getValueVectorId(SchemaPath.getCompoundPath("row_count")).getFieldIds()
);
assertEquals(3573l, vw.getValueVector().getAccessor().getObject(0));
b.release();
loader.clear();
}
@Test
public void testFixedBinary() throws Exception {
final String readEntries = "\"tmp/drilltest/fixed_binary.parquet\"";
final String planText = Files.asCharSource(DrillFileUtils.getResourceAsFile(
"parquet/parquet_scan_screen_read_entry_replace.json"), StandardCharsets.UTF_8).read()
.replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
testParquetFullEngineLocalText(planText, fileName, 1, 1, 1000000, false);
}
@Test
public void testNonNullableDictionaries() throws Exception {
testFull(QueryType.SQL,
"select * from dfs.`tmp/drilltest/non_nullable_dictionary.parquet`", "", 1, 1, 30000000, false);
}
@Test
public void testNullableVarCharMemory() throws Exception {
testFull(QueryType.SQL,
"select s_comment,s_suppkey from dfs.`tmp/sf100_supplier.parquet`", "", 1, 1, 1000, false);
}
@Test
public void testReadVoter() throws Exception {
testFull(QueryType.SQL, "select * from dfs.`tmp/voter.parquet`", "", 1, 1, 1000, false);
}
@Test
public void testDrill_1314() throws Exception {
testFull(QueryType.SQL, "select l_partkey " +
"from dfs.`tmp/drill_1314.parquet`", "", 1, 1, 10000, false);
}
@Test
public void testDrill_1314_all_columns() throws Exception {
testFull(QueryType.SQL, "select * from dfs.`tmp/drill_1314.parquet`", "", 1, 1, 10000, false);
}
@Test
public void testDictionaryError_419() throws Exception {
testFull(QueryType.SQL,
"select c_address from dfs.`tmp/customer_snappyimpala_drill_419.parquet`", "", 1, 1, 150000, false);
}
@Test
public void testNonExistentColumn() throws Exception {
testFull(QueryType.SQL,
"select non_existent_column from cp.`tpch/nation.parquet`", "", 1, 1, 150000, false);
}
@Test
public void testNonExistentColumnLargeFile() throws Exception {
testFull(QueryType.SQL,
"select non_existent_column, non_existent_col_2 from dfs.`tmp/customer.dict.parquet`", "", 1, 1, 150000, false);
}
@Test
public void testNonExistentColumnsSomePresentColumnsLargeFile() throws Exception {
testFull(QueryType.SQL,
"select cust_key, address, non_existent_column, non_existent_col_2 from dfs.`tmp/customer.dict.parquet`",
"", 1, 1, 150000, false);
}
@Ignore // ignored for now for performance
@Test
public void testTPCHPerformace_SF1() throws Exception {
testFull(QueryType.SQL,
"select * from dfs.`tmp/orders_part-m-00001.parquet`", "", 1, 1, 150000, false);
}
@Test
public void testLocalDistributed() throws Exception {
final String planName = "parquet/parquet_scan_union_screen_physical.json";
testParquetFullEngineLocalTextDistributed(planName, fileName, 1, numberRowGroups, recordsPerRowGroup);
}
@Test
@Ignore
public void testRemoteDistributed() throws Exception {
final String planName = "parquet/parquet_scan_union_screen_physical.json";
testParquetFullEngineRemote(planName, fileName, 1, numberRowGroups, recordsPerRowGroup);
}
public void testParquetFullEngineLocalPath(String planFileName, String filename,
int numberOfTimesRead /* specified in json plan */,
int numberOfRowGroups, int recordsPerRowGroup) throws Exception {
testParquetFullEngineLocalText(Files.asCharSource(DrillFileUtils.getResourceAsFile(planFileName), StandardCharsets.UTF_8).read(), filename,
numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup, true);
}
//specific tests should call this method, but it is not marked as a test itself intentionally
public void testParquetFullEngineLocalText(String planText, String filename,
int numberOfTimesRead /* specified in json plan */,
int numberOfRowGroups, int recordsPerRowGroup, boolean testValues) throws Exception {
testFull(QueryType.LOGICAL, planText, filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup, testValues);
}
private void testFull(QueryType type, String planText, String filename,
int numberOfTimesRead /* specified in json plan */,
int numberOfRowGroups, int recordsPerRowGroup, boolean testValues) throws Exception {
// final RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
final HashMap<String, FieldInfo> fields = new HashMap<>();
final ParquetTestProperties props =
new ParquetTestProperties(numberRowGroups, recordsPerRowGroup, DEFAULT_BYTES_PER_PAGE, fields);
TestFileGenerator.populateFieldInfoMap(props);
final ParquetResultListener resultListener =
new ParquetResultListener(getAllocator(), props, numberOfTimesRead, testValues);
final Stopwatch watch = Stopwatch.createStarted();
testWithListener(type, planText, resultListener);
resultListener.getResults();
}
//use this method to submit physical plan
public void testParquetFullEngineLocalTextDistributed(String planName, String filename,
int numberOfTimesRead /* specified in json plan */,
int numberOfRowGroups, int recordsPerRowGroup) throws Exception {
String planText = Files.asCharSource(DrillFileUtils.getResourceAsFile(planName), StandardCharsets.UTF_8).read();
testFull(QueryType.PHYSICAL, planText, filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup, true);
}
public String pad(String value, int length) {
return pad(value, length, " ");
}
public String pad(String value, int length, String with) {
final StringBuilder result = new StringBuilder(length);
result.append(value);
while (result.length() < length) {
result.insert(0, with);
}
return result.toString();
}
public void testParquetFullEngineRemote(String plan, String filename,
int numberOfTimesRead /* specified in json plan */,
int numberOfRowGroups, int recordsPerRowGroup) throws Exception {
final HashMap<String, FieldInfo> fields = new HashMap<>();
final ParquetTestProperties props =
new ParquetTestProperties(numberRowGroups, recordsPerRowGroup, DEFAULT_BYTES_PER_PAGE, fields);
TestFileGenerator.populateFieldInfoMap(props);
final ParquetResultListener resultListener =
new ParquetResultListener(getAllocator(), props, numberOfTimesRead, true);
testWithListener(QueryType.PHYSICAL, Files.asCharSource(DrillFileUtils.getResourceAsFile(plan), StandardCharsets.UTF_8).read(), resultListener);
resultListener.getResults();
}
private static class MockOutputMutator implements OutputMutator {
private final List<MaterializedField> removedFields = Lists.newArrayList();
private final List<ValueVector> addFields = Lists.newArrayList();
List<MaterializedField> getRemovedFields() {
return removedFields;
}
List<ValueVector> getAddFields() {
return addFields;
}
@Override
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
return null;
}
@Override
public void allocate(int recordCount) {
}
@Override
public boolean isNewSchema() {
return false;
}
@Override
public DrillBuf getManagedBuffer() {
return allocator.buffer(255);
}
@Override
public CallBack getCallBack() {
return null;
}
@Override
public void clear() {
// Nothing to do!
}
}
private void validateFooters(final List<Footer> metadata) {
logger.debug(metadata.toString());
assertEquals(3, metadata.size());
for (Footer footer : metadata) {
final File file = new File(footer.getFile().toUri());
assertTrue(file.getName(), file.getName().startsWith("part"));
assertTrue(file.getPath(), file.exists());
final ParquetMetadata parquetMetadata = footer.getParquetMetadata();
assertEquals(2, parquetMetadata.getBlocks().size());
final Map<String, String> keyValueMetaData = parquetMetadata.getFileMetaData().getKeyValueMetaData();
assertEquals("bar", keyValueMetaData.get("foo"));
assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName()));
}
}
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
DataPageV1 page = (DataPageV1) pageReader.readPage();
assertEquals(values, page.getValueCount());
assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
}
@Test
public void testMultipleRowGroups() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(2, 300, DEFAULT_BYTES_PER_PAGE, fields);
populateFieldInfoMap(props);
testParquetFullEngineEventBased(true, "parquet/parquet_scan_screen.json", "/tmp/test.parquet", 1, props);
}
// TODO - Test currently marked ignore to prevent breaking of the build process, requires a binary file that was
// generated using pig. Will need to find a good place to keep files like this.
// For now I will upload it to the JIRA as an attachment.
@Test
public void testNullableColumns() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 1500000, DEFAULT_BYTES_PER_PAGE, fields);
Object[] boolVals = {true, null, null};
props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props));
testParquetFullEngineEventBased(false, "parquet/parquet_nullable.json", "/tmp/nullable_test.parquet", 1, props);
}
@Test
/**
* Tests the reading of nullable var length columns, runs the tests twice, once on a file that has
* a converted type of UTF-8 to make sure it can be read
*/
public void testNullableColumnsVarLen() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields);
byte[] val = {'b'};
byte[] val2 = {'b', '2'};
byte[] val3 = {'b', '3'};
byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'};
Object[] byteArrayVals = { val, val2, val4};
props.fields.put("a", new FieldInfo("boolean", "a", 1, byteArrayVals, TypeProtos.MinorType.BIT, props));
testParquetFullEngineEventBased(false, "parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props);
HashMap<String, FieldInfo> fields2 = new HashMap<>();
// pass strings instead of byte arrays
Object[] textVals = { new org.apache.drill.exec.util.Text("b"), new org.apache.drill.exec.util.Text("b2"),
new org.apache.drill.exec.util.Text("b3") };
ParquetTestProperties props2 = new ParquetTestProperties(1, 30000, DEFAULT_BYTES_PER_PAGE, fields2);
props2.fields.put("a", new FieldInfo("boolean", "a", 1, textVals, TypeProtos.MinorType.BIT, props2));
testParquetFullEngineEventBased(false, "parquet/parquet_scan_screen_read_entry_replace.json",
"\"/tmp/varLen.parquet/a\"", "unused", 1, props2);
}
@Test
public void testFileWithNulls() throws Exception {
HashMap<String, FieldInfo> fields3 = new HashMap<>();
ParquetTestProperties props3 = new ParquetTestProperties(1, 3000, DEFAULT_BYTES_PER_PAGE, fields3);
// actually include null values
Object[] valuesWithNull = {new Text(""), new Text("longer string"), null};
props3.fields.put("a", new FieldInfo("boolean", "a", 1, valuesWithNull, TypeProtos.MinorType.BIT, props3));
testParquetFullEngineEventBased(false, "parquet/parquet_scan_screen_read_entry_replace.json",
"\"/tmp/nullable_with_nulls.parquet\"", "unused", 1, props3);
}
@Test
public void testDictionaryEncoding() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 25, DEFAULT_BYTES_PER_PAGE, fields);
Object[] boolVals = null;
props.fields.put("n_name", null);
props.fields.put("n_nationkey", null);
props.fields.put("n_regionkey", null);
props.fields.put("n_comment", null);
testParquetFullEngineEventBased(false, false, "parquet/parquet_scan_screen_read_entry_replace.json",
"\"/tmp/nation_dictionary_fail.parquet\"", "unused", 1, props, QueryType.LOGICAL);
fields = new HashMap<>();
props = new ParquetTestProperties(1, 5, DEFAULT_BYTES_PER_PAGE, fields);
props.fields.put("employee_id", null);
props.fields.put("name", null);
props.fields.put("role", null);
props.fields.put("phone", null);
props.fields.put("password_hash", null);
props.fields.put("gender_male", null);
props.fields.put("height", null);
props.fields.put("hair_thickness", null);
testParquetFullEngineEventBased(false, false, "parquet/parquet_scan_screen_read_entry_replace.json",
"\"/tmp/employees_5_16_14.parquet\"", "unused", 1, props, QueryType.LOGICAL);
}
@Test
public void testMultipleRowGroupsAndReads() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields);
populateFieldInfoMap(props);
String readEntries = "";
// number of times to read the file
int i = 3;
for (int j = 0; j < i; j++) {
readEntries += "\"/tmp/test.parquet\"";
if (j < i - 1) {
readEntries += ",";
}
}
testParquetFullEngineEventBased(true, "parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
"/tmp/test.parquet", i, props);
}
@Test
public void testReadError_Drill_901() throws Exception {
// select cast( L_COMMENT as varchar) from dfs.`/tmp/drilltest/employee_parquet`
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 60175, DEFAULT_BYTES_PER_PAGE, fields);
testParquetFullEngineEventBased(false, false, "parquet/par_writer_test.json", null,
"unused, no file is generated", 1, props, QueryType.PHYSICAL);
}
@Test
public void testReadError_Drill_839() throws Exception {
// select cast( L_COMMENT as varchar) from dfs.`/tmp/drilltest/employee_parquet`
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 150000, DEFAULT_BYTES_PER_PAGE, fields);
String readEntries = "\"/tmp/customer_nonull.parquet\"";
testParquetFullEngineEventBased(false, false, "parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
"unused, no file is generated", 1, props, QueryType.LOGICAL);
}
@Test
public void testReadBug_Drill_418() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 150000, DEFAULT_BYTES_PER_PAGE, fields);
TestFileGenerator.populateDrill_418_fields(props);
String readEntries = "\"/tmp/customer.plain.parquet\"";
testParquetFullEngineEventBased(false, false, "parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
"unused, no file is generated", 1, props, QueryType.LOGICAL);
}
// requires binary file generated by pig from TPCH data, also have to disable assertion where data is coming in
@Test
public void testMultipleRowGroupsAndReadsPigError() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 1500000, DEFAULT_BYTES_PER_PAGE, fields);
TestFileGenerator.populatePigTPCHCustomerFields(props);
String readEntries = "\"/tmp/tpc-h/customer\"";
testParquetFullEngineEventBased(false, false, "parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
"unused, no file is generated", 1, props, QueryType.LOGICAL);
fields = new HashMap<>();
props = new ParquetTestProperties(1, 100000, DEFAULT_BYTES_PER_PAGE, fields);
TestFileGenerator.populatePigTPCHSupplierFields(props);
readEntries = "\"tmp/tpc-h/supplier\"";
testParquetFullEngineEventBased(false, false, "parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
"unused, no file is generated", 1, props, QueryType.LOGICAL);
}
@Test
public void test958_sql() throws Exception {
// testFull(QueryType.SQL, "select ss_ext_sales_price from dfs.`/tmp/store_sales`", "", 1, 1, 30000000, false);
testFull(QueryType.SQL, "select * from dfs.`tmp/store_sales`", "", 1, 1, 30000000, false);
}
@Test
public void drill_958bugTest() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 2880404, DEFAULT_BYTES_PER_PAGE, fields);
TestFileGenerator.populatePigTPCHCustomerFields(props);
String readEntries = "\"tmp/store_sales\"";
testParquetFullEngineEventBased(false, false, "parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
"unused, no file is generated", 1, props, QueryType.LOGICAL);
}
@Test
public void testMultipleRowGroupsEvent() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(2, 300, DEFAULT_BYTES_PER_PAGE, fields);
populateFieldInfoMap(props);
testParquetFullEngineEventBased(true, "parquet/parquet_scan_screen.json", "tmp/test.parquet", 1, props);
}
/**
* Tests the attribute in a scan node to limit the columns read by a scan.
*
* The functionality of selecting all columns is tested in all of the other tests that leave out the attribute.
* @throws Exception
*/
@Test
public void testSelectColumnRead() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields);
// generate metatdata for a series of test columns, these columns are all generated in the test file
populateFieldInfoMap(props);
TestFileGenerator.generateParquetFile("/tmp/test.parquet", props);
fields.clear();
// create a new object to describe the dataset expected out of the scan operation
// the fields added below match those requested in the plan specified in parquet_selective_column_read.json
// that is used below in the test query
props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields);
props.fields.put("integer", new FieldInfo("int32", "integer", 32, TestFileGenerator.intVals, TypeProtos.MinorType.INT, props));
props.fields.put("bigInt", new FieldInfo("int64", "bigInt", 64, TestFileGenerator.longVals, TypeProtos.MinorType.BIGINT, props));
props.fields.put("bin", new FieldInfo("binary", "bin", -1, TestFileGenerator.binVals, TypeProtos.MinorType.VARBINARY, props));
props.fields.put("bin2", new FieldInfo("binary", "bin2", -1, TestFileGenerator.bin2Vals, TypeProtos.MinorType.VARBINARY, props));
testParquetFullEngineEventBased(true, false, "/parquet/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, QueryType.PHYSICAL);
}
@Test
@Ignore
public void testPerformance() throws Exception {
final DrillbitContext bitContext = mock(DrillbitContext.class);
final UserClientConnection connection = mock(UserClientConnection.class);
final DrillConfig c = DrillConfig.create();
final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
final FragmentContextImpl context = new FragmentContextImpl(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry);
final Path fileName = new Path("/tmp/parquet_test_performance.parquet");
final HashMap<String, FieldInfo> fields = new HashMap<>();
final ParquetTestProperties props = new ParquetTestProperties(1, 20 * 1000 * 1000, DEFAULT_BYTES_PER_PAGE, fields);
populateFieldInfoMap(props);
final Configuration dfsConfig = new Configuration();
final List<Footer> footers = ParquetFileReader.readFooters(dfsConfig, fileName);
final Footer f = footers.iterator().next();
final List<SchemaPath> columns = Lists.newArrayList();
columns.add(new SchemaPath("_MAP.integer", ExpressionPosition.UNKNOWN));
columns.add(new SchemaPath("_MAP.bigInt", ExpressionPosition.UNKNOWN));
columns.add(new SchemaPath("_MAP.f", ExpressionPosition.UNKNOWN));
columns.add(new SchemaPath("_MAP.d", ExpressionPosition.UNKNOWN));
columns.add(new SchemaPath("_MAP.b", ExpressionPosition.UNKNOWN));
columns.add(new SchemaPath("_MAP.bin", ExpressionPosition.UNKNOWN));
columns.add(new SchemaPath("_MAP.bin2", ExpressionPosition.UNKNOWN));
int totalRowCount = 0;
final FileSystem fs = new CachedSingleFileSystem(fileName);
final BufferAllocator allocator = RootAllocatorFactory.newRoot(c);
for(int i = 0; i < 25; i++) {
CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
dfsConfig,
new ParquetDirectByteBufferAllocator(allocator),
0
);
final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs,
ccf,
f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION);
final TestOutputMutator mutator = new TestOutputMutator(allocator);
rr.setup(null, mutator);
final Stopwatch watch = Stopwatch.createStarted();
int rowCount = 0;
while ((rowCount = rr.next()) > 0) {
totalRowCount += rowCount;
}
rr.close();
}
allocator.close();
}
// specific tests should call this method, but it is not marked as a test itself intentionally
public void testParquetFullEngineEventBased(boolean generateNew, String plan, String readEntries, String filename,
int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception {
testParquetFullEngineEventBased(true, generateNew, plan, readEntries,filename,
numberOfTimesRead /* specified in json plan */, props, QueryType.LOGICAL);
}
// specific tests should call this method, but it is not marked as a test itself intentionally
public void testParquetFullEngineEventBased(boolean generateNew, String plan, String filename,
int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception {
testParquetFullEngineEventBased(true, generateNew, plan, null, filename, numberOfTimesRead, props, QueryType.LOGICAL);
}
// specific tests should call this method, but it is not marked as a test itself intentionally
public void testParquetFullEngineEventBased(boolean testValues, boolean generateNew, String plan,
String readEntries, String filename,
int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props,
QueryType queryType) throws Exception {
if (generateNew) {
TestFileGenerator.generateParquetFile(filename, props);
}
final ParquetResultListener resultListener = new ParquetResultListener(getAllocator(), props, numberOfTimesRead, testValues);
final long C = System.nanoTime();
String planText = Files.asCharSource(DrillFileUtils.getResourceAsFile(plan), StandardCharsets.UTF_8).read();
// substitute in the string for the read entries, allows reuse of the plan file for several tests
if (readEntries != null) {
planText = planText.replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
}
testWithListener(queryType, planText, resultListener);
resultListener.getResults();
}
@Test
public void testLimit() throws Exception {
List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`parquet/tpch/nation/01.parquet` LIMIT 1");
int recordsInOutput = 0;
for (QueryDataBatch batch : results) {
recordsInOutput += batch.getHeader().getDef().getRecordCount();
batch.release();
}
assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 1, recordsInOutput), 1 == recordsInOutput);
}
@Test
public void testLimitBeyondRowCount() throws Exception {
List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`parquet/tpch/nation/01.parquet` LIMIT 100");
int recordsInOutput = 0;
for (QueryDataBatch batch : results) {
recordsInOutput += batch.getHeader().getDef().getRecordCount();
batch.release();
}
assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 9, recordsInOutput), 9 == recordsInOutput);
}
@Test
public void testLimitMultipleRowGroups() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields);
populateFieldInfoMap(props);
TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props);
List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 225");
int recordsInOutput = 0;
for (QueryDataBatch batch : results) {
recordsInOutput += batch.getHeader().getDef().getRecordCount();
batch.release();
}
assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 225, recordsInOutput), 225 == recordsInOutput);
}
@Test
public void testLimitMultipleRowGroupsBeyondRowCount() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields);
populateFieldInfoMap(props);
TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props);
List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 500");
int recordsInOutput = 0;
for (QueryDataBatch batch : results) {
recordsInOutput += batch.getHeader().getDef().getRecordCount();
batch.release();
}
assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 300, recordsInOutput), 300 == recordsInOutput);
}
}