blob: f8791b5b394c59c98323ad14891648ed530f05d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Integration test that creates a Kafka broker, creates a Pinot cluster that consumes from Kafka and queries Pinot.
*/
public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSet {
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
// Start the Pinot cluster
startZk();
startController();
startBroker();
startServer();
// Start Kafka
startKafka();
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
addTableConfig(tableConfig);
// Push data into Kafka
pushAvroIntoKafka(avroFiles);
// create segments and upload them to controller
createSegmentsAndUpload(avroFiles, schema, tableConfig);
// Set up the H2 connection
setUpH2Connection(avroFiles);
// Initialize the query generator
setUpQueryGenerator(avroFiles);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}
protected void createSegmentsAndUpload(List<File> avroFile, Schema schema, TableConfig tableConfig)
throws Exception {
// Do nothing. This is specific to LLC use cases for now.
}
@Override
protected void overrideServerConf(PinotConfiguration configuration) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, false);
}
@Override
protected List<String> getNoDictionaryColumns() {
// Randomly set time column as no dictionary column.
if (new Random().nextInt(2) == 0) {
return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "DaysSinceEpoch");
} else {
return super.getNoDictionaryColumns();
}
}
/**
* In realtime consuming segments, the dictionary is not sorted,
* and the dictionary based operator should not be used
*
* Adding explicit queries to test dictionary based functions,
* to ensure the right result is computed, wherein dictionary is not read if it is mutable
* @throws Exception
*/
@Test
public void testDictionaryBasedQueries()
throws Exception {
// Dictionary columns
// int
testDictionaryBasedFunctions("NASDelay");
// long
testDictionaryBasedFunctions("AirlineID");
// double
testDictionaryBasedFunctions("ArrDelayMinutes");
// float
testDictionaryBasedFunctions("DepDelayMinutes");
// Non Dictionary columns
// int
testDictionaryBasedFunctions("ActualElapsedTime");
// double
testDictionaryBasedFunctions("DepDelay");
// float
testDictionaryBasedFunctions("ArrDelay");
}
private void testDictionaryBasedFunctions(String column)
throws Exception {
testQuery(String.format("SELECT MIN(%s) FROM %s", column, getTableName()));
testQuery(String.format("SELECT MAX(%s) FROM %s", column, getTableName()));
testQuery(String.format("SELECT MIN_MAX_RANGE(%s) FROM %s", column, getTableName()),
String.format("SELECT MAX(%s)-MIN(%s) FROM %s", column, column, getTableName()));
}
@Test
public void testHardcodedQueries()
throws Exception {
super.testHardcodedQueries();
}
@Test
@Override
public void testQueriesFromQueryFile()
throws Exception {
super.testQueriesFromQueryFile();
}
@Test
@Override
public void testGeneratedQueries()
throws Exception {
testGeneratedQueries(true, false);
}
@Test
@Override
public void testQueryExceptions()
throws Exception {
super.testQueryExceptions();
}
@Test
@Override
public void testInstanceShutdown()
throws Exception {
super.testInstanceShutdown();
}
@AfterClass
public void tearDown()
throws Exception {
dropRealtimeTable(getTableName());
cleanupTestTableDataManager(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
}