blob: 5cbe1b51b01dbdc5f5dccf999934e6ff0cab6265 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.queries;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.geospatial.transform.function.ScalarFunctions;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.GroupByOperator;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.GeometryUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.Point;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
/**
* Queries test for ST_UNION queries.
*/
@SuppressWarnings("rawtypes")
public class StUnionQueriesTest extends BaseQueriesTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "StUnionQueriesTest");
private static final String RAW_TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";
private static final Random RANDOM = new Random();
private static final int NUM_RECORDS = 200;
private static final int MAX_VALUE = 100000;
private static final String POINT_COLUMN = "pointColumn";
private static final String INT_COLUMN = "intColumn";
private static final Schema SCHEMA =
new Schema.SchemaBuilder().addSingleValueDimension(POINT_COLUMN, FieldSpec.DataType.BYTES)
.addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build();
private static final TableConfig TABLE_CONFIG =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
private Map<Integer, Geometry> _values;
private Geometry _intermediateResult;
private byte[] _expectedResults;
private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
return "";
}
@Override
protected IndexSegment getIndexSegment() {
return _indexSegment;
}
@Override
protected List<IndexSegment> getIndexSegments() {
return _indexSegments;
}
@BeforeClass
public void setUp()
throws Exception {
FileUtils.deleteDirectory(INDEX_DIR);
int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
_values = new HashMap<>(hashMapCapacity);
List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
for (int i = 0; i < NUM_RECORDS; i++) {
GenericRow record = new GenericRow();
int x = RANDOM.nextInt(MAX_VALUE);
int y = RANDOM.nextInt(MAX_VALUE);
Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(x, y));
byte[] pointBytes = GeometrySerializer.serialize(point);
_intermediateResult = _intermediateResult == null ? point : point.union(_intermediateResult);
record.putValue(POINT_COLUMN, pointBytes);
int value = RANDOM.nextInt(MAX_VALUE);
record.putValue(INT_COLUMN, value);
int key = Integer.hashCode(value);
_values.put(key, _values.containsKey(key) ? _values.get(key).union(point) : point);
records.add(record);
}
_expectedResults = GeometrySerializer.serialize(_intermediateResult);
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
driver.build();
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
@Test
public void testAggregationOnly() {
String query = "SELECT ST_UNION(pointColumn) FROM testTable";
// Inner segment
Operator operator = getOperator(query);
assertTrue(operator instanceof AggregationOperator);
AggregationResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, NUM_RECORDS,
NUM_RECORDS);
List<Object> aggregationResult = resultsBlock.getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
assertEquals(aggregationResult.get(0), _intermediateResult);
// Inter segments
Object[] expectedResults = new Object[]{BytesUtils.toHexString(_expectedResults)};
QueriesTestUtils
.testInterSegmentsResult(getBrokerResponse(query), 4 * NUM_RECORDS, 0, 4 * NUM_RECORDS, 4 * NUM_RECORDS,
expectedResults);
}
@Test
public void testPostAggregation() {
String query = "SELECT "
+ "ST_AS_TEXT(ST_UNION(pointColumn)), "
+ "ST_AS_BINARY(ST_UNION(pointColumn)), "
+ "TO_GEOMETRY(ST_UNION(pointColumn)), "
+ "TO_SPHERICAL_GEOGRAPHY(ST_UNION(pointColumn)), "
+ "ST_GEOM_FROM_TEXT(ST_AS_TEXT(ST_UNION(pointColumn))), "
+ "ST_GEOG_FROM_TEXT(ST_AS_TEXT(ST_UNION(pointColumn))), "
+ "ST_GEOM_FROM_WKB(ST_AS_BINARY(ST_UNION(pointColumn))), "
+ "ST_GEOG_FROM_WKB(ST_AS_BINARY(ST_UNION(pointColumn))) "
+ "FROM testTable";
// Inner segment
AggregationOperator aggregationOperator = getOperator(query);
AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
NUM_RECORDS, NUM_RECORDS);
List<Object> aggregationResult = resultsBlock.getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 8);
for (Object value : aggregationResult) {
assertEquals(value, _intermediateResult);
}
// Inter segment
DataSchema expectedDataSchema = new DataSchema(new String[]{
"stastext(stunion(pointColumn))",
"stasbinary(stunion(pointColumn))",
"togeometry(stunion(pointColumn))",
"tosphericalgeography(stunion(pointColumn))",
"stgeomfromtext(stastext(stunion(pointColumn)))",
"stgeogfromtext(stastext(stunion(pointColumn)))",
"stgeomfromwkb(stasbinary(stunion(pointColumn)))",
"stgeogfromwkb(stasbinary(stunion(pointColumn)))"
}, new ColumnDataType[]{
ColumnDataType.STRING,
ColumnDataType.BYTES,
ColumnDataType.BYTES,
ColumnDataType.BYTES,
ColumnDataType.BYTES,
ColumnDataType.BYTES,
ColumnDataType.BYTES,
ColumnDataType.BYTES
});
List<Object[]> expectedRows = Collections.singletonList(new Object[]{
ScalarFunctions.stAsText(_expectedResults),
BytesUtils.toHexString(ScalarFunctions.stAsBinary(_expectedResults)),
BytesUtils.toHexString(ScalarFunctions.toGeometry(_expectedResults)),
BytesUtils.toHexString(ScalarFunctions.toSphericalGeography(_expectedResults)),
BytesUtils.toHexString(ScalarFunctions.toGeometry(_expectedResults)),
BytesUtils.toHexString(ScalarFunctions.toSphericalGeography(_expectedResults)),
BytesUtils.toHexString(ScalarFunctions.toGeometry(_expectedResults)),
BytesUtils.toHexString(ScalarFunctions.toSphericalGeography(_expectedResults))
});
QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(query),
new ResultTable(expectedDataSchema, expectedRows));
}
@Test
public void testAggregationOnlyOnEmptyResultSet() {
String query = "SELECT ST_UNION(pointColumn) FROM testTable where intColumn=-1";
// Inner segment
Operator operator = getOperator(query);
assertTrue(operator instanceof AggregationOperator);
AggregationResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 0, 0, 0, NUM_RECORDS);
List<Object> aggregationResult = resultsBlock.getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
assertEquals(aggregationResult.get(0), GeometryUtils.EMPTY_POINT);
// Inter segments
Object[] expectedResults =
new Object[]{BytesUtils.toHexString(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT))};
QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(query), 0, 0, 0, 4 * NUM_RECORDS, expectedResults);
}
@Test
public void testAggregationGroupBy() {
String query = "SELECT intColumn, ST_UNION(pointColumn) FROM testTable GROUP BY intColumn";
// Inner segment
GroupByOperator groupByOperator = getOperator(query);
GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
2 * NUM_RECORDS, NUM_RECORDS);
AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
assertNotNull(aggregationGroupByResult);
int numGroups = 0;
Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
while (groupKeyIterator.hasNext()) {
numGroups++;
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
Integer key = (Integer) groupKey._keys[0];
assertTrue(_values.containsKey(key));
}
assertEquals(numGroups, _values.size());
// Inter segments
BrokerResponseNative brokerResponse = getBrokerResponse(query);
assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 2 * NUM_RECORDS);
assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
List<Object[]> rows = brokerResponse.getResultTable().getRows();
assertEquals(rows.size(), 10);
for (Object[] row : rows) {
assertEquals(row.length, 2);
Integer key = (Integer) row[0];
Geometry geometry = _values.get(key);
assertNotNull(geometry);
assertEquals(row[1], BytesUtils.toHexString(GeometrySerializer.serialize(geometry)));
}
}
@AfterClass
public void tearDown()
throws IOException {
FileUtils.deleteDirectory(INDEX_DIR);
_indexSegment.destroy();
}
}