blob: 936e4cc415bb948e982872cc7f57e3d27e69f9f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
* Shared set of common tests for cluster integration tests.
* <p>To enable the test, override it and add @Test annotation.
*/
public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseClusterIntegrationTestSet.class);
// Default settings
private static final String DEFAULT_QUERY_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset.test_queries_200.sql";
private static final int DEFAULT_NUM_QUERIES_TO_GENERATE = 100;
/**
* Can be overridden to change default setting
*/
protected String getQueryFileName() {
return DEFAULT_QUERY_FILE_NAME;
}
/**
* Can be overridden to change default setting
*/
protected int getNumQueriesToGenerate() {
return DEFAULT_NUM_QUERIES_TO_GENERATE;
}
/**
* Test server table data manager deletion after the table is dropped
*/
protected void cleanupTestTableDataManager(String tableNameWithType) {
TestUtils.waitForCondition(aVoid -> {
try {
for (BaseServerStarter serverStarter : _serverStarters) {
if (serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType)
!= null) {
return false;
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to delete table data managers");
}
/**
* Test features supported in V2 Multi-stage Engine.
* - Some V1 features will not be supported.
* - Some V1 features will be added as V2 engine feature development progresses.
* @throws Exception
*/
public void testHardcodedQueriesMultiStage()
throws Exception {
testHardcodedQueriesCommon();
}
/**
* Test hard-coded queries.
* @throws Exception
*/
public void testHardcodedQueries()
throws Exception {
testHardcodedQueriesCommon();
testHardCodedQueriesV1();
}
/**
* Test hardcoded queries.
* <p>NOTE:
* <p>For queries with <code>LIMIT</code>, need to remove limit or add <code>LIMIT 10000</code> to the H2 SQL query
* because the comparison only works on exhausted result with at most 10000 rows.
* <ul>
* <li>
* Eg. <code>SELECT a FROM table LIMIT 15 -> [SELECT a FROM table LIMIT 10000]</code>
* </li>
* </ul>
* <p>For group-by queries, need to add group-by columns to the select clause for H2 queries.
* <ul>
* <li>
* Eg. <code>SELECT SUM(a) FROM table GROUP BY b -> [SELECT b, SUM(a) FROM table GROUP BY b]</code>
* </li>
* </ul>
* TODO: Selection queries, Aggregation Group By queries, Order By, Distinct
* This list is very basic right now (aggregations only) and needs to be enriched
*/
private void testHardcodedQueriesCommon()
throws Exception {
String query;
String h2Query;
query = "SELECT COUNT(*) FROM mytable WHERE CarrierDelay=15 AND ArrDelay > CarrierDelay LIMIT 1";
testQuery(query);
query = "SELECT ArrDelay, CarrierDelay, (ArrDelay - CarrierDelay) AS diff FROM mytable WHERE CarrierDelay=15 AND "
+ "ArrDelay > CarrierDelay ORDER BY diff, ArrDelay, CarrierDelay LIMIT 100000";
testQuery(query);
query = "SELECT COUNT(*) FROM mytable WHERE ArrDelay > CarrierDelay LIMIT 1";
testQuery(query);
query =
"SELECT ArrDelay, CarrierDelay, (ArrDelay - CarrierDelay) AS diff FROM mytable WHERE ArrDelay > CarrierDelay "
+ "ORDER BY diff, ArrDelay, CarrierDelay LIMIT 100000";
testQuery(query);
query = "SELECT count(*) FROM mytable WHERE AirlineID > 20355 AND OriginState BETWEEN 'PA' AND 'DE' AND DepTime <> "
+ "2202 LIMIT 21";
testQuery(query);
query = "SELECT MAX(Quarter), MAX(FlightNum) FROM mytable LIMIT 8";
h2Query = "SELECT MAX(Quarter),MAX(FlightNum) FROM mytable LIMIT 10000";
testQuery(query, h2Query);
query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch = 16312 AND Carrier = 'DL'";
testQuery(query);
query = "SELECT SUM(ArrTime) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL'";
testQuery(query);
query = "SELECT MAX(ArrTime) FROM mytable WHERE DaysSinceEpoch > 16312 AND Carrier = 'DL'";
testQuery(query);
query = "SELECT MIN(ArrTime) FROM mytable WHERE DaysSinceEpoch >= 16312 AND Carrier = 'DL'";
testQuery(query);
query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch < 16312 AND Carrier = 'DL'";
testQuery(query);
query = "SELECT MAX(ArrTime), MIN(ArrTime) FROM mytable WHERE DaysSinceEpoch <= 16312 AND Carrier = 'DL'";
testQuery(query);
query = "SELECT COUNT(*), MAX(ArrTime), MIN(ArrTime) FROM mytable WHERE DaysSinceEpoch >= 16312";
testQuery(query);
query = "SELECT COUNT(*), MAX(ArrTime), MIN(ArrTime), DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch";
testQuery(query);
query = "SELECT DaysSinceEpoch, COUNT(*), MAX(ArrTime), MIN(ArrTime) FROM mytable GROUP BY DaysSinceEpoch";
testQuery(query);
query = "SELECT ArrTime, ArrTime * 10 FROM mytable WHERE DaysSinceEpoch >= 16312";
testQuery(query);
query = "SELECT ArrTime, ArrTime - ArrTime % 10 FROM mytable WHERE DaysSinceEpoch >= 16312";
testQuery(query);
query = "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10 FROM mytable WHERE DaysSinceEpoch >= 16312";
testQuery(query);
query = "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10 FROM mytable WHERE ArrTime - 100 > 0";
testQuery(query);
query = "SELECT COUNT(*) AS \"date\", MAX(ArrTime) AS \"group\", MIN(ArrTime) AS \"min\" FROM mytable";
testQuery(query);
// NOT
query = "SELECT count(*) FROM mytable WHERE OriginState NOT BETWEEN 'DE' AND 'PA'";
testQuery(query);
query = "SELECT count(*) FROM mytable WHERE OriginState NOT LIKE 'A_'";
testQuery(query);
query = "SELECT count(*) FROM mytable WHERE NOT (DaysSinceEpoch = 16312 AND Carrier = 'DL')";
testQuery(query);
query = "SELECT count(*) FROM mytable WHERE (NOT DaysSinceEpoch = 16312) AND Carrier = 'DL'";
testQuery(query);
// Post-aggregation in ORDER-BY
query = "SELECT MAX(ArrTime) FROM mytable GROUP BY DaysSinceEpoch ORDER BY MAX(ArrTime) - MIN(ArrTime)";
testQuery(query);
query = "SELECT MAX(ArrDelay), Month FROM mytable GROUP BY Month ORDER BY ABS(Month - 6) + MAX(ArrDelay)";
testQuery(query);
// Post-aggregation in SELECT
query = "SELECT MAX(ArrDelay) + MAX(AirTime) FROM mytable";
testQuery(query);
query = "SELECT MAX(ArrDelay) - MAX(AirTime), DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch ORDER BY MAX"
+ "(ArrDelay) - MIN(AirTime) DESC";
testQuery(query);
query = "SELECT DaysSinceEpoch, MAX(ArrDelay) * 2 - MAX(AirTime) - 3 FROM mytable GROUP BY DaysSinceEpoch ORDER BY "
+ "MAX(ArrDelay) - MIN(AirTime) DESC";
testQuery(query);
// Having
query = "SELECT COUNT(*) AS Count, DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch HAVING Count > 350";
testQuery(query);
query =
"SELECT MAX(ArrDelay) - MAX(AirTime) AS Diff, DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch HAVING Diff"
+ " * 2 > 1000 ORDER BY Diff ASC";
testQuery(query);
query = "SELECT DaysSinceEpoch, MAX(ArrDelay) - MAX(AirTime) AS Diff FROM mytable GROUP BY DaysSinceEpoch HAVING "
+ "(Diff >= 300 AND Diff < 500) OR Diff < -500 ORDER BY Diff DESC";
testQuery(query);
}
private void testHardCodedQueriesV1()
throws Exception {
String query;
String h2Query;
// Escape quotes
// TODO: move to common when multistage support correct escaping strategy.
query = "SELECT DistanceGroup FROM mytable WHERE DATE_TIME_CONVERT(DaysSinceEpoch, '1:DAYS:EPOCH', "
+ "'1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd''T''HH:mm:ss.SSS''Z''', '1:DAYS') = '2014-09-05T00:00:00.000Z'";
h2Query = "SELECT DistanceGroup FROM mytable WHERE DaysSinceEpoch = 16318 LIMIT 10000";
testQuery(query, h2Query);
// LIKE
// TODO: move to common when multistage support LIKE
query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'";
testQuery(query);
query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE 'C%'";
testQuery(query);
query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE '_h%'";
testQuery(query);
// Non-Standard functions
// mult is not a standard function.
query =
"SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ADD(ArrTime + 5, ArrDelay), ADD(ArrTime * 5, ArrDelay)"
+ " FROM mytable WHERE mult((ArrTime - 100), (5 + ArrDelay))> 0";
h2Query =
"SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ArrTime + 5 + ArrDelay, ArrTime * 5 + ArrDelay FROM "
+ "mytable WHERE (ArrTime - 100) * (5 + ArrDelay)> 0";
testQuery(query, h2Query);
// TODO: move to common when multistage support CAST AS 'LONG', for now it must use: CAST AS BIGINT
query =
"SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = "
+ "'DL'";
testQuery(query);
query =
"SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL' "
+ "ORDER BY ArrTime DESC";
testQuery(query);
// TODO: move to common when multistage support MV columns
query =
"SELECT DistanceGroup FROM mytable WHERE \"Month\" BETWEEN 1 AND 1 AND DivAirportSeqIDs IN (1078102, 1142303,"
+ " 1530402, 1172102, 1291503) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10";
h2Query =
"SELECT DistanceGroup FROM mytable WHERE Month BETWEEN 1 AND 1 AND (DivAirportSeqIDs__MV0 IN (1078102, "
+ "1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV1 IN (1078102, 1142303, 1530402, 1172102, "
+ "1291503) OR DivAirportSeqIDs__MV2 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR "
+ "DivAirportSeqIDs__MV3 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV4 IN "
+ "(1078102, 1142303, 1530402, 1172102, 1291503)) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10000";
testQuery(query, h2Query);
// Non-Standard SQL syntax:
// IN_ID_SET
{
IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
idSet.add(19690L);
idSet.add(20355L);
idSet.add(21171L);
// Also include a non-existing id
idSet.add(0L);
String serializedIdSet = idSet.toBase64String();
String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690, 20355, 21171, 0)";
testQuery(inIdSetQuery, inQuery);
String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN (19690, 20355, 21171, 0)";
testQuery(notInIdSetQuery, notInQuery);
}
// IN_SUBQUERY
{
String inSubqueryQuery =
"SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable "
+ "WHERE DaysSinceEpoch = 16430') = 1";
String inQuery = "SELECT COUNT(*) FROM mytable WHERE DestAirportID IN (SELECT DestAirportID FROM mytable WHERE "
+ "DaysSinceEpoch = 16430)";
testQuery(inSubqueryQuery, inQuery);
String notInSubqueryQuery =
"SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable "
+ "WHERE DaysSinceEpoch = 16430') = 0";
String notInQuery =
"SELECT COUNT(*) FROM mytable WHERE DestAirportID NOT IN (SELECT DestAirportID FROM mytable WHERE "
+ "DaysSinceEpoch = 16430)";
testQuery(notInSubqueryQuery, notInQuery);
}
}
/**
* Test hardcoded queries on server partitioned data (all the segments for a partition is served by a single server).
*/
public void testHardcodedServerPartitionedSqlQueries()
throws Exception {
// IN_PARTITIONED_SUBQUERY
{
String inPartitionedSubqueryQuery =
"SELECT COUNT(*) FROM mytable WHERE INPARTITIONEDSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM "
+ "mytable WHERE DaysSinceEpoch = 16430') = 1";
String inQuery = "SELECT COUNT(*) FROM mytable WHERE DestAirportID IN (SELECT DestAirportID FROM mytable WHERE "
+ "DaysSinceEpoch = 16430)";
testQuery(inPartitionedSubqueryQuery, inQuery);
String notInPartitionedSubqueryQuery =
"SELECT COUNT(*) FROM mytable WHERE INPARTITIONEDSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM "
+ "mytable WHERE DaysSinceEpoch = 16430') = 0";
String notInQuery =
"SELECT COUNT(*) FROM mytable WHERE DestAirportID NOT IN (SELECT DestAirportID FROM mytable WHERE "
+ "DaysSinceEpoch = 16430)";
testQuery(notInPartitionedSubqueryQuery, notInQuery);
}
}
/**
* Test to ensure that broker response contains expected stats
*
* @throws Exception
*/
public void testBrokerResponseMetadata()
throws Exception {
String[] queries = new String[]{
// matching query
"SELECT count(*) FROM mytable",
// query that does not match any row
"SELECT count(*) FROM mytable where non_existing_column='non_existing_value'",
// query a non existing table
"SELECT count(*) FROM mytable_foo"
};
String[] statNames = new String[]{
"totalDocs", "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed",
"numSegmentsMatched", "numDocsScanned", "totalDocs", "timeUsedMs", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter"
};
for (String query : queries) {
JsonNode response = postQuery(query);
for (String statName : statNames) {
assertTrue(response.has(statName));
}
}
}
public void testVirtualColumnQueries() {
// Check that there are no virtual columns in the query results
ResultSetGroup resultSetGroup = getPinotConnection().execute("select * from mytable");
ResultSet resultSet = resultSetGroup.getResultSet(0);
for (int i = 0; i < resultSet.getColumnCount(); i++) {
assertFalse(resultSet.getColumnName(i).startsWith("$"),
"Virtual column " + resultSet.getColumnName(i) + " is present in the results!");
}
// Check that the virtual columns work as expected (throws no exceptions)
getPinotConnection().execute("select $docId, $segmentName, $hostName from mytable");
getPinotConnection().execute("select $docId, $segmentName, $hostName from mytable where $docId < 5 limit 50");
getPinotConnection().execute("select $docId, $segmentName, $hostName from mytable where $docId = 5 limit 50");
getPinotConnection().execute("select $docId, $segmentName, $hostName from mytable where $docId > 19998 limit 50");
getPinotConnection().execute("select max($docId) from mytable group by $segmentName");
}
/**
* Test queries from the query file.
*/
public void testQueriesFromQueryFile()
throws Exception {
InputStream inputStream =
BaseClusterIntegrationTestSet.class.getClassLoader().getResourceAsStream(getQueryFileName());
assertNotNull(inputStream);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String queryString;
while ((queryString = reader.readLine()) != null) {
// Skip commented line and empty line.
queryString = queryString.trim();
if (queryString.startsWith("#") || queryString.isEmpty()) {
continue;
}
JsonNode query = JsonUtils.stringToJsonNode(queryString);
String pinotQuery = query.get("sql").asText();
JsonNode hsqls = query.get("hsqls");
String h2Query;
if (hsqls == null || hsqls.isEmpty()) {
h2Query = pinotQuery;
} else {
h2Query = hsqls.get(0).asText();
}
try {
testQuery(pinotQuery, h2Query);
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Failed to test Pinot query: {} with H2 query: {}.", pinotQuery, h2Query, e);
throw e;
}
}
}
}
/**
* Test queries generated by query generator.
*
* @throws Exception
*/
public void testGeneratedQueries()
throws Exception {
// default test with MV columns, without using multistage engine
testGeneratedQueries(true, false);
}
protected void testGeneratedQueries(boolean withMultiValues, boolean useMultistageEngine)
throws Exception {
QueryGenerator queryGenerator = getQueryGenerator();
queryGenerator.setSkipMultiValuePredicates(!withMultiValues);
queryGenerator.setUseMultistageEngine(useMultistageEngine);
int numQueriesToGenerate = getNumQueriesToGenerate();
for (int i = 0; i < numQueriesToGenerate; i++) {
QueryGenerator.Query query = queryGenerator.generateQuery();
if (useMultistageEngine) {
// multistage engine follows standard SQL thus should use H2 query string for testing.
testQuery(query.generateH2Query(), query.generateH2Query());
} else {
testQuery(query.generatePinotQuery(), query.generateH2Query());
}
}
}
/**
* Test invalid queries which should cause query exceptions.
*
* @throws Exception
*/
public void testQueryExceptions()
throws Exception {
testQueryException("POTATO", QueryException.SQL_PARSING_ERROR_CODE);
testQueryException("SELECT COUNT(*) FROM potato", QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
testQueryException("SELECT POTATO(ArrTime) FROM mytable", QueryException.QUERY_EXECUTION_ERROR_CODE);
testQueryException("SELECT COUNT(*) FROM mytable where ArrTime = 'potato'",
QueryException.QUERY_EXECUTION_ERROR_CODE);
}
private void testQueryException(String query, int errorCode)
throws Exception {
JsonNode jsonObject = postQuery(query);
assertEquals(jsonObject.get("exceptions").get(0).get("errorCode").asInt(), errorCode);
}
/**
* Test if routing table get updated when instance is shutting down.
*
* @throws Exception
*/
public void testInstanceShutdown()
throws Exception {
List<String> instances = _helixAdmin.getInstancesInCluster(getHelixClusterName());
assertFalse(instances.isEmpty(), "List of instances should not be empty");
// Mark all instances in the cluster as shutting down
for (String instance : instances) {
InstanceConfig instanceConfig = _helixAdmin.getInstanceConfig(getHelixClusterName(), instance);
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, true);
_helixAdmin.setInstanceConfig(getHelixClusterName(), instance, instanceConfig);
}
// Check that the routing table is empty
checkForEmptyRoutingTable(true);
// Mark all instances as not shutting down
for (String instance : instances) {
InstanceConfig instanceConfig = _helixAdmin.getInstanceConfig(getHelixClusterName(), instance);
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false);
_helixAdmin.setInstanceConfig(getHelixClusterName(), instance, instanceConfig);
}
// Check that the routing table is not empty
checkForEmptyRoutingTable(false);
// Check on each server instance
for (String instance : instances) {
if (!InstanceTypeUtils.isServer(instance)) {
continue;
}
// Ensure that the random instance is in the routing table
checkForInstanceInRoutingTable(instance, true);
// Mark the server instance as shutting down
InstanceConfig instanceConfig = _helixAdmin.getInstanceConfig(getHelixClusterName(), instance);
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, true);
_helixAdmin.setInstanceConfig(getHelixClusterName(), instance, instanceConfig);
// Check that it is not in the routing table
checkForInstanceInRoutingTable(instance, false);
// Re-enable the server instance
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, false);
_helixAdmin.setInstanceConfig(getHelixClusterName(), instance, instanceConfig);
// Check that it is in the routing table
checkForInstanceInRoutingTable(instance, true);
}
}
private void checkForInstanceInRoutingTable(String instance, boolean shouldExist) {
String errorMessage;
if (shouldExist) {
errorMessage = "Routing table does not contain expected instance: " + instance;
} else {
errorMessage = "Routing table contains unexpected instance: " + instance;
}
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode routingTables = getDebugInfo("debug/routingTable/" + getTableName());
for (JsonNode routingTable : routingTables) {
if (routingTable.has(instance)) {
return shouldExist;
}
}
return !shouldExist;
} catch (Exception e) {
return null;
}
}, 60_000L, errorMessage);
}
private void checkForEmptyRoutingTable(boolean shouldBeEmpty) {
String errorMessage;
if (shouldBeEmpty) {
errorMessage = "Routing table is not empty";
} else {
errorMessage = "Routing table is empty";
}
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode routingTables = getDebugInfo("debug/routingTable/" + getTableName());
for (JsonNode routingTable : routingTables) {
if ((routingTable.isEmpty()) != shouldBeEmpty) {
return false;
}
}
return true;
} catch (Exception e) {
return null;
}
}, 60_000L, errorMessage);
}
public void testReset(TableType tableType)
throws Exception {
String rawTableName = getTableName();
// reset the table.
resetTable(rawTableName, tableType, null);
// wait for all live messages clear the queue.
List<String> instances = _helixResourceManager.getServerInstancesForTable(rawTableName, tableType);
PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
TestUtils.waitForCondition(aVoid -> {
int liveMessageCount = 0;
for (String instanceName : instances) {
List<Message> messages = _helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true);
liveMessageCount += messages.size();
}
return liveMessageCount == 0;
}, 30_000L, "Failed to wait for all segment reset messages clear helix state transition!");
// Check that all segment states come back to ONLINE.
TestUtils.waitForCondition(aVoid -> {
// check external view and wait for everything to come back online
ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(),
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName));
for (Map<String, String> externalViewStateMap : externalView.getRecord().getMapFields().values()) {
for (String state : externalViewStateMap.values()) {
if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)
&& !CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) {
return false;
}
}
}
return true;
}, 30_000L, "Failed to wait for all segments come back online");
}
/**
* TODO: Support removing new added columns for MutableSegment and remove the new added columns before running the
* next test. Use this to replace {@link OfflineClusterIntegrationTest#testDefaultColumns()}.
*/
public void testReload(boolean includeOfflineTable)
throws Exception {
String rawTableName = getTableName();
Schema schema = getSchema();
String selectStarQuery = "SELECT * FROM " + rawTableName;
JsonNode queryResponse = postQuery(selectStarQuery);
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), schema.size());
long numTotalDocs = queryResponse.get("totalDocs").asLong();
schema.addField(constructNewDimension(FieldSpec.DataType.INT, true));
schema.addField(constructNewDimension(FieldSpec.DataType.LONG, true));
schema.addField(constructNewDimension(FieldSpec.DataType.FLOAT, true));
schema.addField(constructNewDimension(FieldSpec.DataType.DOUBLE, true));
schema.addField(constructNewDimension(FieldSpec.DataType.STRING, true));
schema.addField(constructNewDimension(FieldSpec.DataType.INT, false));
schema.addField(constructNewDimension(FieldSpec.DataType.LONG, false));
schema.addField(constructNewDimension(FieldSpec.DataType.FLOAT, false));
schema.addField(constructNewDimension(FieldSpec.DataType.DOUBLE, false));
schema.addField(constructNewDimension(FieldSpec.DataType.STRING, false));
schema.addField(constructNewMetric(FieldSpec.DataType.INT));
schema.addField(constructNewMetric(FieldSpec.DataType.LONG));
schema.addField(constructNewMetric(FieldSpec.DataType.FLOAT));
schema.addField(constructNewMetric(FieldSpec.DataType.DOUBLE));
schema.addField(constructNewMetric(FieldSpec.DataType.BYTES));
// Upload the schema with extra columns
addSchema(schema);
// Reload the table
if (includeOfflineTable) {
reloadOfflineTable(rawTableName);
}
reloadRealtimeTable(rawTableName);
// Wait for all segments to finish reloading, and test querying the new columns
// NOTE: Use count query to prevent schema inconsistency error
String testQuery = "SELECT COUNT(*) FROM " + rawTableName + " WHERE NewIntSVDimension < 0";
long countStarResult = getCountStarResult();
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode testQueryResponse = postQuery(testQuery);
// Should not throw exception during reload
assertEquals(testQueryResponse.get("exceptions").size(), 0);
// Total docs should not change during reload
assertEquals(testQueryResponse.get("totalDocs").asLong(), numTotalDocs);
return testQueryResponse.get("resultTable").get("rows").get(0).get(0).asLong() == countStarResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate default values for new columns");
// Select star query should return all the columns
queryResponse = postQuery(selectStarQuery);
assertEquals(queryResponse.get("exceptions").size(), 0);
JsonNode resultTable = queryResponse.get("resultTable");
assertEquals(resultTable.get("dataSchema").get("columnNames").size(), schema.size());
assertEquals(resultTable.get("rows").size(), 10);
// Test aggregation query to include querying all segemnts (including realtime)
String aggregationQuery = "SELECT SUMMV(NewIntMVDimension) FROM " + rawTableName;
queryResponse = postQuery(aggregationQuery);
assertEquals(queryResponse.get("exceptions").size(), 0);
// Test filter on all new added columns
String countStarQuery = "SELECT COUNT(*) FROM " + rawTableName
+ " WHERE NewIntSVDimension < 0 AND NewLongSVDimension < 0 AND NewFloatSVDimension < 0 AND "
+ "NewDoubleSVDimension < 0 AND NewStringSVDimension = 'null' AND NewIntMVDimension < 0 AND "
+ "NewLongMVDimension < 0 AND NewFloatMVDimension < 0 AND NewDoubleMVDimension < 0 AND "
+ "NewStringMVDimension = 'null' AND NewIntMetric = 0 AND NewLongMetric = 0 AND NewFloatMetric = 0 "
+ "AND NewDoubleMetric = 0 AND NewBytesMetric = ''";
queryResponse = postQuery(countStarQuery);
assertEquals(queryResponse.get("exceptions").size(), 0);
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asLong(), countStarResult);
}
private DimensionFieldSpec constructNewDimension(FieldSpec.DataType dataType, boolean singleValue) {
String column =
"New" + StringUtils.capitalize(dataType.toString().toLowerCase()) + (singleValue ? "SV" : "MV") + "Dimension";
return new DimensionFieldSpec(column, dataType, singleValue);
}
private MetricFieldSpec constructNewMetric(FieldSpec.DataType dataType) {
String column = "New" + StringUtils.capitalize(dataType.toString().toLowerCase()) + "Metric";
return new MetricFieldSpec(column, dataType);
}
}