blob: f3abbbc5345f7126e8cc0ebb436204d2378035ce [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.queries;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.GroupByOperator;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.segment.local.customobject.AvgPair;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class TransformQueriesTest extends BaseQueriesTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "TransformQueriesTest");
private static final String TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";
private static final String D1 = "STRING_COL";
private static final String M1 = "INT_COL1";
private static final String M1_V2 = "INT_COL1_V2";
private static final String M1_V3 = "INT_COL1_V3";
private static final String M2 = "INT_COL2";
private static final String M3 = "LONG_COL1";
private static final String M4 = "LONG_COL2";
private static final String TIME = "T";
private static final int NUM_ROWS = 10;
private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
return "";
}
@Override
protected IndexSegment getIndexSegment() {
return _indexSegment;
}
@Override
protected List<IndexSegment> getIndexSegments() {
return _indexSegments;
}
@BeforeClass
public void setUp()
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
buildSegment();
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
protected void buildSegment()
throws Exception {
GenericRow row = new GenericRow();
row.putValue(D1, "Pinot");
row.putValue(M1, 1000);
row.putValue(M2, 2000);
row.putValue(M3, 500000);
row.putValue(M4, 1000000);
row.putValue(TIME, new DateTime(1973, 1, 8, 14, 6, 4, 3, DateTimeZone.UTC).getMillis());
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS - 1; i++) {
rows.add(row);
}
// modifying the last row
row = new GenericRow();
row.putValue(D1, "Pinot");
row.putValue(M1, 1000);
row.putValue(M1_V2, null); // column for adding the groovy tranformed value
row.putValue(M1_V3, null); // M1_V3 doesn't exist in table schema
row.putValue(M2, 2000);
row.putValue(M3, 500000);
row.putValue(M4, 1000000);
row.putValue(TIME, new DateTime(1973, 1, 8, 14, 6, 4, 3, DateTimeZone.UTC).getMillis());
rows.add(row);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME)
.setIngestionConfig(new IngestionConfig(null, null, null,
Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || "
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")),
null, null))
.build();
Schema schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING)
.addSingleValueDimension(M1, FieldSpec.DataType.INT).addSingleValueDimension(M2, FieldSpec.DataType.INT)
.addSingleValueDimension(M3, FieldSpec.DataType.LONG).addSingleValueDimension(M4, FieldSpec.DataType.LONG)
.addSingleValueDimension(M1_V2, FieldSpec.DataType.INT)
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME), null).build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR.getPath());
config.setTableName(TABLE_NAME);
config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
driver.init(config, recordReader);
driver.build();
}
}
@AfterClass
public void tearDown() {
_indexSegment.destroy();
FileUtils.deleteQuietly(INDEX_DIR);
}
@Test
public void testTransformWithAvgInnerSegment() {
String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM testTable";
runAndVerifyInnerSegmentQuery(query, -10000.0, 10);
query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM testTable";
runAndVerifyInnerSegmentQuery(query, 4990000.0, 10);
query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM testTable";
runAndVerifyInnerSegmentQuery(query, 5000000.0, 10);
query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM testTable";
runAndVerifyInnerSegmentQuery(query, 30000.0, 10);
query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM testTable";
runAndVerifyInnerSegmentQuery(query, 5010000.0, 10);
query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM testTable";
runAndVerifyInnerSegmentQuery(query, 15000000.0, 10);
query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, LONG_COL2))) FROM testTable";
runAndVerifyInnerSegmentQuery(query, 10.0, 10);
try {
query = "SELECT AVG(SUB(INT_COL1, STRING_COL)) FROM testTable";
runAndVerifyInnerSegmentQuery(query, -10000.0, 10);
Assert.fail("Query should have failed");
} catch (Exception e) {
// Expected
}
try {
query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, STRING_COL))) FROM testTable";
runAndVerifyInnerSegmentQuery(query, 10.00, 10);
Assert.fail("Query should have failed");
} catch (Exception e) {
// Expected
}
}
private void runAndVerifyInnerSegmentQuery(String query, double expectedSum, int expectedCount) {
AggregationOperator aggregationOperator = getOperator(query);
List<Object> aggregationResult = aggregationOperator.nextBlock().getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
AvgPair avgPair = (AvgPair) aggregationResult.get(0);
assertEquals(avgPair.getSum(), expectedSum);
assertEquals(avgPair.getCount(), expectedCount);
}
@Test
public void testTransformWithDateTruncInnerSegment() {
String query =
"SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('week', ADD(SUB(DIV(T, 1000), INT_COL2), INT_COL2), "
+ "'SECONDS', 'Europe/Berlin')";
verifyDateTruncationResult(query, new Object[]{95295600L});
query =
"SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('week', DIV(MULT(DIV(ADD(SUB(T, 5), 5), 1000), INT_COL2), "
+ "INT_COL2), 'SECONDS', 'Europe/Berlin', 'MILLISECONDS')";
verifyDateTruncationResult(query, new Object[]{95295600000L});
query = "SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('quarter', T, 'MILLISECONDS')";
verifyDateTruncationResult(query, new Object[]{94694400000L});
}
private void verifyDateTruncationResult(String query, Object[] expectedGroupKey) {
GroupByOperator groupByOperator = getOperator(query);
AggregationGroupByResult aggregationGroupByResult = groupByOperator.nextBlock().getAggregationGroupByResult();
assertNotNull(aggregationGroupByResult);
List<GroupKeyGenerator.GroupKey> groupKeys = ImmutableList.copyOf(aggregationGroupByResult.getGroupKeyIterator());
assertEquals(groupKeys.size(), 1);
assertEquals(groupKeys.get(0)._keys, expectedGroupKey);
Object resultForKey = aggregationGroupByResult.getResultForGroupId(groupKeys.get(0)._groupId, 0);
assertEquals(resultForKey, (long) NUM_ROWS);
}
@Test
public void testTransformWithAvgInterSegmentInterServer() {
String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM testTable";
runAndVerifyInterSegmentQuery(query, -1000.0);
query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM testTable";
runAndVerifyInterSegmentQuery(query, 499000.0);
query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM testTable";
runAndVerifyInterSegmentQuery(query, 500000.0);
query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM testTable";
runAndVerifyInterSegmentQuery(query, 3000.0);
query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM testTable";
runAndVerifyInterSegmentQuery(query, 501000.0);
query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM testTable";
runAndVerifyInterSegmentQuery(query, 1500000.0);
query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, LONG_COL2))) FROM testTable";
runAndVerifyInterSegmentQuery(query, 1.0);
}
/**
* This test checks the groovy transform when generic raw data could have some values that can be used to
* ingest values into pinot column with a different name.
*/
@Test
public void testGroovyTransformQuery() {
String query = "SELECT INT_COL1, INT_COL1_V2 FROM testTable";
List<Object[]> result = getBrokerResponse(query).getResultTable().getRows();
for (Object[] obj : result) {
assertEquals(obj[0], obj[1]);
}
}
private void runAndVerifyInterSegmentQuery(String query, double expectedResult) {
assertEquals(getBrokerResponse(query).getResultTable().getRows().get(0)[0], expectedResult);
}
}