blob: 24cd42bfacfc231dc8a886c2f6e257dde4d8d8a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.integration.tests.startree.SegmentInfoProvider;
import org.apache.pinot.integration.tests.startree.StarTreeQueryGenerator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Integration test for Star-Tree based indexes.
* <ul>
* <li>
* Set up the Pinot cluster and create two tables, one with default indexes, one with star tree indexes
* </li>
* <li>
* Send queries to both the tables and assert that results match
* </li>
* <li>
* Query to reference table is sent with TOP 10000, and the comparator ensures that response from star tree is
* contained within the reference response. This is to avoid false failures when groups with same value are
* truncated due to TOP N
* </li>
* </ul>
*/
public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest {
protected static final String DEFAULT_TABLE_NAME = "myTable";
protected static final String STAR_TREE_TABLE_NAME = "myStarTable";
private static final String SCHEMA_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
private static final int NUM_STAR_TREE_DIMENSIONS = 5;
private static final int NUM_STAR_TREE_METRICS = 5;
private static final List<AggregationFunctionType> AGGREGATION_FUNCTION_TYPES =
Arrays.asList(AggregationFunctionType.COUNT, AggregationFunctionType.MIN, AggregationFunctionType.MAX,
AggregationFunctionType.SUM, AggregationFunctionType.AVG, AggregationFunctionType.MINMAXRANGE,
AggregationFunctionType.DISTINCTCOUNTBITMAP);
private static final int NUM_QUERIES_TO_GENERATE = 100;
private String _currentTable;
private StarTreeQueryGenerator _starTree1QueryGenerator;
private StarTreeQueryGenerator _starTree2QueryGenerator;
@Override
protected String getTableName() {
return _currentTable;
}
@Override
protected String getSchemaFileName() {
return SCHEMA_FILE_NAME;
}
// NOTE: Star-Tree and SegmentInfoProvider does not work on no-dictionary dimensions
@Override
protected List<String> getNoDictionaryColumns() {
return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay");
}
@BeforeClass
public void setUp()
throws Exception {
File defaultTableSegmentDir = new File(_segmentDir, DEFAULT_TABLE_NAME);
File defaultTableTarDir = new File(_tarDir, DEFAULT_TABLE_NAME);
File starTreeTableSegmentDir = new File(_segmentDir, STAR_TREE_TABLE_NAME);
File starTreeTableTarDir = new File(_tarDir, STAR_TREE_TABLE_NAME);
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir, defaultTableSegmentDir, defaultTableTarDir,
starTreeTableSegmentDir, starTreeTableTarDir);
// Start the Pinot cluster
startZk();
startController();
startBroker();
startServers(2);
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
_currentTable = DEFAULT_TABLE_NAME;
TableConfig defaultTableConfig = createOfflineTableConfig();
addTableConfig(defaultTableConfig);
// Randomly pick some dimensions and metrics for star-trees
List<String> starTree1Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
List<String> starTree2Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
List<String> allDimensions = new ArrayList<>(schema.getDimensionNames());
Collections.shuffle(allDimensions);
for (int i = 0; i < NUM_STAR_TREE_DIMENSIONS; i++) {
starTree1Dimensions.add(allDimensions.get(2 * i));
starTree2Dimensions.add(allDimensions.get(2 * i + 1));
}
List<String> starTree1Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
List<String> starTree2Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
List<String> allMetrics = new ArrayList<>(schema.getMetricNames());
Collections.shuffle(allMetrics);
for (int i = 0; i < NUM_STAR_TREE_METRICS; i++) {
starTree1Metrics.add(allMetrics.get(2 * i));
starTree2Metrics.add(allMetrics.get(2 * i + 1));
}
_currentTable = STAR_TREE_TABLE_NAME;
TableConfig starTreeTableConfig = createOfflineTableConfig();
starTreeTableConfig.getIndexingConfig().setStarTreeIndexConfigs(
Arrays.asList(getStarTreeIndexConfig(starTree1Dimensions, starTree1Metrics),
getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics)));
addTableConfig(starTreeTableConfig);
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, defaultTableConfig, schema, 0, defaultTableSegmentDir,
defaultTableTarDir);
uploadSegments(DEFAULT_TABLE_NAME, defaultTableTarDir);
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, starTreeTableConfig, schema, 0,
starTreeTableSegmentDir, starTreeTableTarDir);
uploadSegments(STAR_TREE_TABLE_NAME, starTreeTableTarDir);
// Set up the query generators
SegmentInfoProvider segmentInfoProvider = new SegmentInfoProvider(defaultTableTarDir.getPath());
List<String> aggregationFunctions = new ArrayList<>(AGGREGATION_FUNCTION_TYPES.size());
for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
aggregationFunctions.add(functionType.getName());
}
_starTree1QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, starTree1Dimensions, starTree1Metrics,
segmentInfoProvider.getSingleValueDimensionValuesMap(), aggregationFunctions);
_starTree2QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, starTree2Dimensions, starTree2Metrics,
segmentInfoProvider.getSingleValueDimensionValuesMap(), aggregationFunctions);
// Wait for all documents loaded
_currentTable = DEFAULT_TABLE_NAME;
waitForAllDocsLoaded(600_000L);
_currentTable = STAR_TREE_TABLE_NAME;
waitForAllDocsLoaded(600_000L);
}
private static StarTreeIndexConfig getStarTreeIndexConfig(List<String> dimensions, List<String> metrics) {
List<String> functionColumnPairs = new ArrayList<>();
for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
for (String metric : metrics) {
functionColumnPairs.add(new AggregationFunctionColumnPair(functionType, metric).toColumnName());
}
}
return new StarTreeIndexConfig(dimensions, null, functionColumnPairs, 100);
}
@Test
public void testGeneratedQueries()
throws Exception {
for (int i = 0; i < NUM_QUERIES_TO_GENERATE; i += 2) {
testStarQuery(_starTree1QueryGenerator.nextQuery());
testStarQuery(_starTree2QueryGenerator.nextQuery());
}
}
@Test
public void testPredicateOnMetrics()
throws Exception {
String starQuery;
// Query containing predicate on one metric only
starQuery = "SELECT SUM(DepDelayMinutes) FROM myStarTable WHERE DepDelay > 0";
testStarQuery(starQuery);
starQuery = "SELECT SUM(DepDelayMinutes) FROM myStarTable WHERE DepDelay BETWEEN 0 and 10000";
testStarQuery(starQuery);
// Query containing predicate on multiple metrics
starQuery = "SELECT SUM(DepDelayMinutes) FROM myStarTable WHERE DepDelay > 0 AND ArrDelay > 0";
testStarQuery(starQuery);
// Query containing predicate on multiple metrics and dimensions
starQuery =
"SELECT SUM(DepDelayMinutes) FROM myStarTable WHERE DepDelay > 0 AND ArrDelay > 0 AND OriginStateName = "
+ "'Massachusetts'";
testStarQuery(starQuery);
}
private void testStarQuery(String starQuery)
throws Exception {
String referenceQuery = starQuery.replace(STAR_TREE_TABLE_NAME, DEFAULT_TABLE_NAME);
JsonNode starResponse = postQuery(starQuery);
JsonNode referenceResponse = postQuery(referenceQuery);
Assert.assertEquals(starResponse.get("resultTable"), referenceResponse.get("resultTable"),
"Query comparison failed for: \nStar Query: " + starQuery + "\nStar Response: " + starResponse
+ "\nReference Query: " + referenceQuery + "\nReference Response: " + referenceResponse);
}
@AfterClass
public void tearDown()
throws Exception {
dropOfflineTable(DEFAULT_TABLE_NAME);
dropOfflineTable(STAR_TREE_TABLE_NAME);
stopServer();
stopBroker();
stopController();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
}