blob: b817adb281405e13c48bf205767ebddcab271d02 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.queries;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.GroupByOperator;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
/**
* The <code>FastHllQueriesTest</code> class sets up the index segment and create fastHll on 'column17' and 'column18'.
* <p>There are totally 18 columns, 30000 records inside the original Avro file where 11 columns are selected to build
* the index segment. Selected columns information are as following:
* <ul>
* ColumnName, FieldType, DataType, Cardinality, IsSorted, HasInvertedIndex
* <li>column1, METRIC, INT, 6582, F, F</li>
* <li>column3, METRIC, INT, 21910, F, F</li>
* <li>column5, DIMENSION, STRING, 1, T, F</li>
* <li>column6, DIMENSION, INT, 608, F, T</li>
* <li>column7, DIMENSION, INT, 146, F, T</li>
* <li>column9, DIMENSION, INT, 1737, F, F</li>
* <li>column11, DIMENSION, STRING, 5, F, T</li>
* <li>column12, DIMENSION, STRING, 5, F, F</li>
* <li>column17, METRIC, INT, 24, F, T</li>
* <li>column18, METRIC, INT, 1440, F, T</li>
* <li>daysSinceEpoch, TIME, INT, 2, T, F</li>
* </ul>
*/
@SuppressWarnings("ConstantConditions")
public class FastHllQueriesTest extends BaseQueriesTest {
private static final String AVRO_DATA_WITH_PRE_GENERATED_HLL_COLUMNS =
"data" + File.separator + "test_data-sv_hll.avro";
private static final String SEGMENT_NAME = "testTable_126164076_167572854";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "FastHllQueriesTest");
private static final String BASE_QUERY = "SELECT FASTHLL(column17_HLL), FASTHLL(column18_HLL) FROM testTable";
private static final String GROUP_BY = " GROUP BY column11 ORDER BY column11";
private static final String QUERY_FILTER =
" WHERE column1 > 100000000 " + "AND column3 BETWEEN 20000000 AND 1000000000 " + "AND column5 = 'gFuH' "
+ "AND (column6 < 500000000 OR column11 NOT IN ('t', 'P')) " + "AND daysSinceEpoch = 126164076";
private IndexSegment _indexSegment;
// Contains 2 identical index segments
private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
return QUERY_FILTER;
}
@Override
protected IndexSegment getIndexSegment() {
return _indexSegment;
}
@Override
protected List<IndexSegment> getIndexSegments() {
return _indexSegments;
}
@Test
public void testFastHllWithPreGeneratedHllColumns()
throws Exception {
buildAndLoadSegment();
// Test inner segment queries
// Test base query
AggregationOperator aggregationOperator = getOperator(BASE_QUERY);
AggregationResultsBlock aggregationResultsBlock = aggregationOperator.nextBlock();
ExecutionStatistics executionStatistics = aggregationOperator.getExecutionStatistics();
QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 30000L, 0L, 60000L, 30000L);
List<Object> aggregationResult = aggregationResultsBlock.getResults();
assertEquals(((HyperLogLog) aggregationResult.get(0)).cardinality(), 21L);
assertEquals(((HyperLogLog) aggregationResult.get(1)).cardinality(), 1762L);
// Test query with filter
aggregationOperator = getOperatorWithFilter(BASE_QUERY);
aggregationResultsBlock = aggregationOperator.nextBlock();
executionStatistics = aggregationOperator.getExecutionStatistics();
QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 6129L, 84134L, 12258L, 30000L);
aggregationResult = aggregationResultsBlock.getResults();
assertEquals(((HyperLogLog) aggregationResult.get(0)).cardinality(), 17L);
assertEquals(((HyperLogLog) aggregationResult.get(1)).cardinality(), 1197L);
// Test query with group-by
GroupByOperator groupByOperator = getOperator(BASE_QUERY + GROUP_BY);
GroupByResultsBlock groupByResultsBlock = groupByOperator.nextBlock();
executionStatistics = groupByOperator.getExecutionStatistics();
QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 30000L, 0L, 90000L, 30000L);
AggregationGroupByResult aggregationGroupByResult = groupByResultsBlock.getAggregationGroupByResult();
GroupKeyGenerator.GroupKey firstGroupKey = aggregationGroupByResult.getGroupKeyIterator().next();
assertEquals(firstGroupKey._keys[0], "");
assertEquals(((HyperLogLog) aggregationGroupByResult.getResultForGroupId(0, firstGroupKey._groupId)).cardinality(),
21L);
assertEquals(((HyperLogLog) aggregationGroupByResult.getResultForGroupId(1, firstGroupKey._groupId)).cardinality(),
691L);
// Test inter segments base query
QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(BASE_QUERY), 120000L, 0L, 240000L, 120000L,
new Object[]{21L, 1762L});
// Test inter segments query with filter
QueriesTestUtils.testInterSegmentsResult(getBrokerResponseWithFilter(BASE_QUERY), 24516L, 336536L, 49032L, 120000L,
new Object[]{17L, 1197L});
// Test inter segments query with group-by
List<Object[]> expectedRows =
Arrays.asList(new Object[]{21L, 691L}, new Object[]{21L, 1762L}, new Object[]{11L, 27L},
new Object[]{21L, 1397L}, new Object[]{21L, 1532L});
QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(BASE_QUERY + GROUP_BY), 120000L, 0L, 360000L, 120000L,
expectedRows);
deleteSegment();
}
private void buildAndLoadSegment()
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
// Get resource file path
URL resource = getClass().getClassLoader().getResource(AVRO_DATA_WITH_PRE_GENERATED_HLL_COLUMNS);
assertNotNull(resource);
String filePath = resource.getFile();
// Build the segment schema
Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", FieldSpec.DataType.INT)
.addMetric("column3", FieldSpec.DataType.INT).addSingleValueDimension("column5", FieldSpec.DataType.STRING)
.addSingleValueDimension("column6", FieldSpec.DataType.INT)
.addSingleValueDimension("column7", FieldSpec.DataType.INT)
.addSingleValueDimension("column9", FieldSpec.DataType.INT)
.addSingleValueDimension("column11", FieldSpec.DataType.STRING)
.addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
.addMetric("column18", FieldSpec.DataType.INT)
.addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null)
.addSingleValueDimension("column17_HLL", FieldSpec.DataType.STRING)
.addSingleValueDimension("column18_HLL", FieldSpec.DataType.STRING).build();
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setSegmentTimeValueCheck(false);
ingestionConfig.setRowTimeValueCheck(false);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
.setIngestionConfig(ingestionConfig).build();
// Create the segment generator config
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
segmentGeneratorConfig.setInputFilePath(filePath);
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
segmentGeneratorConfig.setInvertedIndexCreationColumns(
Arrays.asList("column6", "column7", "column11", "column17", "column18"));
// Build the index segment
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig);
driver.build();
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
private void deleteSegment() {
_indexSegment.destroy();
FileUtils.deleteQuietly(INDEX_DIR);
}
}