blob: 35fc8127bcf91b5001fd434cb30912e110a5a1b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.queries;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.theta.UpdateSketchBuilder;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.GroupByOperator;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
/**
* Queries test for DISTINCT_COUNT_THETA_SKETCH queries.
*/
@SuppressWarnings("unchecked")
public class DistinctCountThetaSketchQueriesTest extends BaseQueriesTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DistinctCountThetaSketchQueriesTest");
private static final String RAW_TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";
private static final int NUM_RECORDS = 1000;
private static final String INT_SV_COLUMN = "intSVColumn";
private static final String LONG_SV_COLUMN = "longSVColumn";
private static final String FLOAT_SV_COLUMN = "floatSVColumn";
private static final String DOUBLE_SV_COLUMN = "doubleSVColumn";
private static final String STRING_SV_COLUMN = "stringSVColumn";
private static final String INT_MV_COLUMN = "intMVColumn";
private static final String LONG_MV_COLUMN = "longMVColumn";
private static final String FLOAT_MV_COLUMN = "floatMVColumn";
private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
private static final String STRING_MV_COLUMN = "stringMVColumn";
private static final String BYTES_COLUMN = "bytesColumn";
private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_SV_COLUMN, DataType.INT)
.addSingleValueDimension(LONG_SV_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_SV_COLUMN, DataType.FLOAT)
.addSingleValueDimension(DOUBLE_SV_COLUMN, DataType.DOUBLE)
.addSingleValueDimension(STRING_SV_COLUMN, DataType.STRING).addMultiValueDimension(INT_MV_COLUMN, DataType.INT)
.addMultiValueDimension(LONG_MV_COLUMN, DataType.LONG).addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT)
.addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE)
.addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING).addMetric(BYTES_COLUMN, DataType.BYTES).build();
private static final TableConfig TABLE_CONFIG =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
return "";
}
@Override
protected IndexSegment getIndexSegment() {
return _indexSegment;
}
@Override
protected List<IndexSegment> getIndexSegments() {
return _indexSegments;
}
@BeforeClass
public void setUp()
throws Exception {
FileUtils.deleteDirectory(INDEX_DIR);
UpdateSketchBuilder sketchBuilder = new UpdateSketchBuilder();
List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
for (int i = 0; i < NUM_RECORDS; i++) {
GenericRow record = new GenericRow();
record.putValue(INT_SV_COLUMN, i);
record.putValue(LONG_SV_COLUMN, i);
record.putValue(FLOAT_SV_COLUMN, i);
record.putValue(DOUBLE_SV_COLUMN, i);
record.putValue(STRING_SV_COLUMN, i);
Integer[] mvEntry = new Integer[]{i, i + NUM_RECORDS, i + 2 * NUM_RECORDS};
record.putValue(INT_MV_COLUMN, mvEntry);
record.putValue(LONG_MV_COLUMN, mvEntry);
record.putValue(FLOAT_MV_COLUMN, mvEntry);
record.putValue(DOUBLE_MV_COLUMN, mvEntry);
record.putValue(STRING_MV_COLUMN, mvEntry);
// Store serialized sketches in the BYTES column
UpdateSketch sketch = sketchBuilder.build();
sketch.update(i);
sketch.update(i + NUM_RECORDS);
sketch.update(i + 2 * NUM_RECORDS);
record.putValue(BYTES_COLUMN, sketch.compact().toByteArray());
records.add(record);
}
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
driver.build();
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
@Test
public void testAggregationOnly() {
String query = "SELECT DISTINCT_COUNT_THETA_SKETCH(intSVColumn), DISTINCT_COUNT_THETA_SKETCH(longSVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(floatSVColumn), DISTINCT_COUNT_THETA_SKETCH(doubleSVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(stringSVColumn), DISTINCT_COUNT_THETA_SKETCH(intMVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(longMVColumn), DISTINCT_COUNT_THETA_SKETCH(floatMVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(doubleMVColumn), DISTINCT_COUNT_THETA_SKETCH(stringMVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(bytesColumn) FROM testTable";
// Inner segment
AggregationOperator aggregationOperator = getOperator(query);
AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
11 * NUM_RECORDS, NUM_RECORDS);
List<Object> aggregationResult = resultsBlock.getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 11);
for (int i = 0; i < 11; i++) {
List<Sketch> sketches = (List<Sketch>) aggregationResult.get(i);
assertEquals(sketches.size(), 1);
Sketch sketch = sketches.get(0);
if (i < 5) {
assertEquals(Math.round(sketch.getEstimate()), NUM_RECORDS);
} else {
assertEquals(Math.round(sketch.getEstimate()), 3 * NUM_RECORDS);
}
}
// Inter segments
Object[] expectedResults = new Object[11];
for (int i = 0; i < 11; i++) {
if (i < 5) {
expectedResults[i] = (long) NUM_RECORDS;
} else {
expectedResults[i] = (long) (3 * NUM_RECORDS);
}
}
BrokerResponseNative brokerResponse = getBrokerResponse(query);
QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS, 0, 4 * 11 * NUM_RECORDS, 4 * NUM_RECORDS,
expectedResults);
}
@Test
public void testAggregationGroupBy() {
String baseQuery = "SELECT DISTINCT_COUNT_THETA_SKETCH(intSVColumn), DISTINCT_COUNT_THETA_SKETCH(longSVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(floatSVColumn), DISTINCT_COUNT_THETA_SKETCH(doubleSVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(stringSVColumn), DISTINCT_COUNT_THETA_SKETCH(intMVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(longMVColumn), DISTINCT_COUNT_THETA_SKETCH(floatMVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(doubleMVColumn), DISTINCT_COUNT_THETA_SKETCH(stringMVColumn), "
+ "DISTINCT_COUNT_THETA_SKETCH(bytesColumn) FROM testTable GROUP BY ";
for (boolean groupBySV : new boolean[]{true, false}) {
String query = baseQuery + (groupBySV ? "intSVColumn" : "intMVColumn");
// Inner segment
GroupByOperator groupByOperator = getOperator(query);
GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
11 * NUM_RECORDS, NUM_RECORDS);
AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
assertNotNull(aggregationGroupByResult);
int numGroups = 0;
Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
while (groupKeyIterator.hasNext()) {
numGroups++;
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
for (int i = 0; i < 6; i++) {
List<Sketch> sketches = (List<Sketch>) aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId);
assertEquals(sketches.size(), 1);
Sketch sketch = sketches.get(0);
if (i < 5) {
assertEquals(Math.round(sketch.getEstimate()), 1);
} else {
assertEquals(Math.round(sketch.getEstimate()), 3);
}
}
}
if (groupBySV) {
assertEquals(numGroups, NUM_RECORDS);
} else {
assertEquals(numGroups, 3 * NUM_RECORDS);
}
// Inter segments
BrokerResponseNative brokerResponse = getBrokerResponse(query);
assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 11 * NUM_RECORDS);
assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
List<Object[]> rows = brokerResponse.getResultTable().getRows();
assertEquals(rows.size(), 10);
for (Object[] row : rows) {
assertEquals(row.length, 11);
for (int i = 0; i < 11; i++) {
if (i < 5) {
assertEquals(row[i], 1L);
} else {
assertEquals(row[i], 3L);
}
}
}
}
}
@Test
public void testPostAggregation() {
String query = "SELECT DISTINCT_COUNT_THETA_SKETCH(intSVColumn, '', "
// [300, 500), [800, 900)
+ "'longSVColumn >= 300 AND (floatSVColumn < 500 OR doubleSVColumn BETWEEN 800 AND 899)', "
// [400, 850)
+ "'intMVColumn >= 2400 AND longMVColumn < 850', "
// [825, 1000)
+ "'floatMVColumn >= 2825', "
// [0, 100)
+ "'doubleMVColumn < 100', "
// Expected: [0, 100), [400, 500), [800, 825)
+ "'SET_UNION($4,SET_DIFF(SET_INTERSECT($1,$2),$3))') FROM testTable";
// Inner segment
AggregationOperator aggregationOperator = getOperator(query);
AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
8 * NUM_RECORDS, NUM_RECORDS);
List<Object> aggregationResult = resultsBlock.getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
List<Sketch> sketches = (List<Sketch>) aggregationResult.get(0);
assertEquals(sketches.size(), 5);
assertTrue(sketches.get(0).isEmpty());
assertEquals(Math.round(sketches.get(1).getEstimate()), 300);
assertEquals(Math.round(sketches.get(2).getEstimate()), 450);
assertEquals(Math.round(sketches.get(3).getEstimate()), 175);
assertEquals(Math.round(sketches.get(4).getEstimate()), 100);
// Inter segments
Object[] expectedResults = new Object[]{225L};
BrokerResponseNative brokerResponse = getBrokerResponse(query);
QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS, 0, 4 * 8 * NUM_RECORDS, 4 * NUM_RECORDS,
expectedResults);
}
@Test
public void testDistinctCountRawThetaSketch() {
String query = "SELECT DISTINCT_COUNT_RAW_THETA_SKETCH(intSVColumn) FROM testTable";
BrokerResponseNative brokerResponse = getBrokerResponse(query);
String serializedSketch = (String) brokerResponse.getResultTable().getRows().get(0)[0];
Sketch sketch = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize(Base64.getDecoder().decode(serializedSketch));
assertEquals(Math.round(sketch.getEstimate()), NUM_RECORDS);
}
@Test
public void testInvalidQueries() {
testInvalidQuery("select DISTINCT_COUNT_THETA_SKETCH(intSVColumn, '', 'longSVColumn < 100', '$2') from testTable");
testInvalidQuery("select DISTINCT_COUNT_THETA_SKETCH(intSVColumn, '', 'longSVColumn < 100', 'foo') from testTable");
testInvalidQuery(
"select DISTINCT_COUNT_THETA_SKETCH(intSVColumn, '', 'longSVColumn < 100', 'SET_UNION($1)') from testTable");
testInvalidQuery(
"select DISTINCT_COUNT_THETA_SKETCH(intSVColumn, '', 'longSVColumn < 100', 'SET_INTERSECT($1)') from "
+ "testTable");
testInvalidQuery(
"select DISTINCT_COUNT_THETA_SKETCH(intSVColumn, '', 'longSVColumn < 100', 'SET_DIFF($1)') from testTable");
testInvalidQuery(
"select DISTINCT_COUNT_THETA_SKETCH(intSVColumn, '', 'longSVColumn < 100', 'floatSVColumn > 500', 'SET_DIFF"
+ "($0,$1,$2)') from testTable");
}
private void testInvalidQuery(String query) {
try {
getBrokerResponse(query);
fail();
} catch (BadQueryRequestException e) {
// Expected
}
}
@AfterClass
public void tearDown()
throws IOException {
_indexSegment.destroy();
FileUtils.deleteDirectory(INDEX_DIR);
}
}