blob: 6df759103a83dfc4ae9fa3533932050f29b57d84 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.helix.model.IdealState;
import org.apache.http.Header;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.client.PinotConnection;
import org.apache.pinot.client.PinotDriver;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.apache.pinot.common.function.scalar.StringFunctions.*;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
import static org.testng.Assert.*;
/**
* Integration test that converts Avro data for 12 segments and runs queries against it.
*/
public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
private static final int NUM_SEGMENTS = 12;
private static final long ONE_HOUR_IN_MS = TimeUnit.HOURS.toMillis(1);
private static final String SEGMENT_UPLOAD_TEST_TABLE = "segmentUploadTestTable";
// For table config refresh test, make an expensive query to ensure the query won't finish in 5ms
private static final String TEST_TIMEOUT_QUERY =
"SELECT DISTINCTCOUNT(AirlineID) FROM mytable GROUP BY Carrier LIMIT 10000";
// For inverted index triggering test
private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
// For range index triggering test
private static final List<String> UPDATED_RANGE_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime");
private static final String TEST_UPDATED_RANGE_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime > 305";
// For bloom filter triggering test
private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
// For star-tree triggering test
private static final StarTreeIndexConfig STAR_TREE_INDEX_CONFIG_1 =
new StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), 100);
private static final String TEST_STAR_TREE_QUERY_1 = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'UA'";
private static final StarTreeIndexConfig STAR_TREE_INDEX_CONFIG_2 =
new StarTreeIndexConfig(Collections.singletonList("DestState"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), 100);
private static final String TEST_STAR_TREE_QUERY_2 = "SELECT COUNT(*) FROM mytable WHERE DestState = 'CA'";
// For default columns test
private static final String TEST_EXTRA_COLUMNS_QUERY = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric = 1";
private static final String TEST_REGULAR_COLUMNS_QUERY = "SELECT COUNT(*) FROM mytable WHERE AirlineID > 0";
private static final String SELECT_STAR_QUERY = "SELECT * FROM mytable";
private static final String DISK_SIZE_IN_BYTES_KEY = "diskSizeInBytes";
private static final String NUM_SEGMENTS_KEY = "numSegments";
private static final String NUM_ROWS_KEY = "numRows";
private static final String COLUMN_LENGTH_MAP_KEY = "columnLengthMap";
private static final String COLUMN_CARDINALITY_MAP_KEY = "columnCardinalityMap";
private static final String MAX_NUM_MULTI_VALUES_MAP_KEY = "maxNumMultiValuesMap";
// TODO: This might lead to flaky test, as this disk size is not deterministic
// as it depends on the iteration order of a HashSet.
private static final int DISK_SIZE_IN_BYTES = 20797128;
private static final int NUM_ROWS = 115545;
private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
new ArrayList<>(getNumBrokers() + getNumServers());
private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
// Cache the table size after removing an index via reloading. Once this value
// is set, assert that table size always gets back to this value after removing
// any other kind of index.
private long _tableSizeAfterRemovingIndex;
protected int getNumBrokers() {
return NUM_BROKERS;
}
protected int getNumServers() {
return NUM_SERVERS;
}
@Override
protected String getSchemaFileName() {
return _schemaFileName;
}
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
startZk();
startController();
startBrokers();
startServers();
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
// Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name
// and validate correctness with parallel push protection enabled.
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
// Create a copy of _tarDir to create multiple segments with the same name.
File tarDir2 = new File(_tempDir, "tarDir2");
FileUtils.copyDirectory(_tarDir, tarDir2);
List<File> tarDirs = new ArrayList<>();
tarDirs.add(_tarDir);
tarDirs.add(tarDir2);
try {
uploadSegments(getTableName(), TableType.OFFLINE, tarDirs);
} catch (Exception e) {
// If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one
// of the two exception - 409 conflict of the second call enters ProcessExistingSegment ; segmentZkMetadata
// creation failure if both calls entered ProcessNewSegment. In/such cases ensure that we upload all the
// segments again/to ensure that the data is setup correctly.
assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage()
.contains("Failed to create ZK metadata for segment"), e.getMessage());
uploadSegments(getTableName(), _tarDir);
}
// Set up the H2 connection
setUpH2Connection(avroFiles);
// Initialize the query generator
setUpQueryGenerator(avroFiles);
// Set up service status callbacks
// NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
// resources to monitor
registerCallbackHandlers();
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}
protected void startBrokers()
throws Exception {
startBrokers(getNumBrokers());
}
protected void startServers()
throws Exception {
startServers(getNumServers());
}
private void registerCallbackHandlers() {
List<String> instances = _helixAdmin.getInstancesInCluster(getHelixClusterName());
instances.removeIf(
instanceId -> !InstanceTypeUtils.isBroker(instanceId) && !InstanceTypeUtils.isServer(instanceId));
List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(getHelixClusterName());
resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource)
&& !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource)));
for (String instance : instances) {
List<String> resourcesToMonitor = new ArrayList<>();
for (String resourceName : resourcesInCluster) {
IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName);
for (String partitionName : idealState.getPartitionSet()) {
if (idealState.getInstanceSet(partitionName).contains(instance)) {
resourcesToMonitor.add(resourceName);
break;
}
}
}
_serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(
new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, getHelixClusterName(),
instance, resourcesToMonitor, 100.0),
new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, getHelixClusterName(),
instance, resourcesToMonitor, 100.0))));
}
}
@Override
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
if (getNumServers() == 1) {
pinotQuery = "SET serverReturnFinalResult = true;" + pinotQuery;
}
super.testQuery(pinotQuery, h2Query);
}
@Test
public void testInstancesStarted() {
assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + getNumServers());
for (ServiceStatus.ServiceStatusCallback serviceStatusCallback : _serviceStatusCallbacks) {
assertEquals(serviceStatusCallback.getServiceStatus(), ServiceStatus.Status.GOOD);
}
}
@Test
public void testInvalidTableConfig() {
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("badTable").build();
ObjectNode tableConfigJson = (ObjectNode) tableConfig.toJsonNode();
// Remove a mandatory field
tableConfigJson.remove(TableConfig.VALIDATION_CONFIG_KEY);
try {
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfigJson.toString());
fail();
} catch (IOException e) {
// Should get response code 400 (BAD_REQUEST)
assertTrue(e.getMessage().contains("Got error status code: 400"));
}
}
@Test
public void testRefreshTableConfigAndQueryTimeout()
throws Exception {
// Set timeout as 5ms so that query will timeout
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(5L, null, null, null));
updateTableConfig(tableConfig);
// Wait for at most 1 minute for broker to receive and process the table config refresh message
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_TIMEOUT_QUERY);
JsonNode exceptions = queryResponse.get("exceptions");
if (exceptions.isEmpty()) {
return false;
}
int errorCode = exceptions.get(0).get("errorCode").asInt();
if (errorCode == QueryException.BROKER_TIMEOUT_ERROR_CODE) {
// Timed out on broker side
return true;
}
if (errorCode == QueryException.SERVER_NOT_RESPONDING_ERROR_CODE) {
// Timed out on server side
int numServersQueried = queryResponse.get("numServersQueried").asInt();
int numServersResponded = queryResponse.get("numServersResponded").asInt();
int numDocsScanned = queryResponse.get("numDocsScanned").asInt();
return numServersQueried == getNumServers() && numServersResponded == 0 && numDocsScanned == 0;
}
return false;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 60_000L, "Failed to refresh table config");
// Remove timeout so that query will finish
tableConfig.setQueryConfig(null);
updateTableConfig(tableConfig);
// Wait for at most 1 minute for broker to receive and process the table config refresh message
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_TIMEOUT_QUERY);
JsonNode exceptions = queryResponse.get("exceptions");
if (!exceptions.isEmpty()) {
return false;
}
int numServersQueried = queryResponse.get("numServersQueried").asInt();
int numServersResponded = queryResponse.get("numServersResponded").asInt();
int numDocsScanned = queryResponse.get("numDocsScanned").asInt();
return numServersQueried == getNumServers() && numServersResponded == getNumServers()
&& numDocsScanned == getCountStarResult();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 60_000L, "Failed to refresh table config");
}
@Test
public void testUploadSameSegments()
throws Exception {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentsZKMetadata(offlineTableName).get(0);
String segmentName = segmentZKMetadata.getSegmentName();
long crc = segmentZKMetadata.getCrc();
// Creation time is when the segment gets created
long creationTime = segmentZKMetadata.getCreationTime();
// Push time is when the segment gets first pushed (new segment)
long pushTime = segmentZKMetadata.getPushTime();
// Refresh time is when the segment gets refreshed (existing segment)
long refreshTime = segmentZKMetadata.getRefreshTime();
uploadSegments(offlineTableName, _tarDir);
for (SegmentZKMetadata segmentZKMetadataAfterUpload : _helixResourceManager.getSegmentsZKMetadata(
offlineTableName)) {
// Only check one segment
if (segmentZKMetadataAfterUpload.getSegmentName().equals(segmentName)) {
assertEquals(segmentZKMetadataAfterUpload.getCrc(), crc);
assertEquals(segmentZKMetadataAfterUpload.getCreationTime(), creationTime);
assertEquals(segmentZKMetadataAfterUpload.getPushTime(), pushTime);
// Refresh time should change
assertTrue(segmentZKMetadataAfterUpload.getRefreshTime() > refreshTime);
return;
}
}
}
@Test
public void testUploadSegmentRefreshOnly()
throws Exception {
TableConfig segmentUploadTestTableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(SEGMENT_UPLOAD_TEST_TABLE).setSchemaName(getSchemaName())
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
.setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
addTableConfig(segmentUploadTestTableConfig);
String offlineTableName = segmentUploadTestTableConfig.getTableName();
File[] segmentTarFiles = _tarDir.listFiles();
assertNotNull(segmentTarFiles);
int numSegments = segmentTarFiles.length;
assertTrue(numSegments > 0);
List<Header> headers = new ArrayList<>();
headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true"));
List<NameValuePair> parameters = new ArrayList<>();
NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
TableNameBuilder.extractRawTableName(offlineTableName));
parameters.add(tableNameParameter);
URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
// Refresh non-existing segment
File segmentTarFile = segmentTarFiles[0];
try {
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers,
parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
fail();
} catch (HttpErrorStatusException e) {
assertEquals(e.getStatusCode(), HttpStatus.SC_GONE);
assertTrue(_helixResourceManager.getSegmentsZKMetadata(SEGMENT_UPLOAD_TEST_TABLE).isEmpty());
}
// Upload segment
SimpleHttpResponse response =
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, null,
parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(offlineTableName);
assertEquals(segmentsZKMetadata.size(), 1);
// Refresh existing segment
response = fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile,
headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(offlineTableName);
assertEquals(segmentsZKMetadata.size(), 1);
assertNotEquals(segmentsZKMetadata.get(0).getRefreshTime(), Long.MIN_VALUE);
}
waitForNumOfSegmentsBecomeOnline(offlineTableName, 1);
dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
cleanupTestTableDataManager(offlineTableName);
}
private void waitForNumOfSegmentsBecomeOnline(String tableNameWithType, int numSegments)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
do {
Set<String> onlineSegments = _helixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType);
if (onlineSegments.size() == numSegments) {
return;
}
Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
} while (System.currentTimeMillis() < endTimeMs);
throw new TimeoutException(
String.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType));
}
@Test(dependsOnMethods = "testRangeIndexTriggering")
public void testInvertedIndexTriggering()
throws Exception {
long numTotalDocs = getCountStarResult();
// Without index on DivActualElapsedTime, all docs are scanned at filtering stage.
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);
addInvertedIndex();
long tableSizeWithNewIndex = getTableSize(getTableName());
// Update table config to remove the new inverted index, and
// reload table to clean the new inverted indices physically.
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
updateTableConfig(tableConfig);
String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
// Total docs should not change during reload, but num entries scanned
// gets back to total number of documents as the index is removed.
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
return queryResponse.get("numEntriesScannedInFilter").asLong() == numTotalDocs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to cleanup obsolete index");
validateReloadJobSuccess(reloadJobId);
assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
// Add the inverted index back to test index removal via force download.
addInvertedIndex();
assertEquals(getTableSize(getTableName()), tableSizeWithNewIndex);
// Update table config to remove the new inverted index.
tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
updateTableConfig(tableConfig);
// Force to download a single segment, and disk usage should drop a bit.
SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(TableNameBuilder.OFFLINE.tableNameWithType(getTableName())).get(0);
String segmentName = segmentZKMetadata.getSegmentName();
reloadOfflineSegment(getTableName(), segmentName, true);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
// If the segment got reloaded, the query should scan its docs.
return queryResponse.get("numEntriesScannedInFilter").asLong() > 0;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to clean up obsolete index in segment");
// As query behavior changed, the segment reload must have been done. The new table size should be like below,
// with only one segment being reloaded with force download and dropping the inverted index.
long tableSizeAfterReloadSegment = getTableSize(getTableName());
assertTrue(tableSizeAfterReloadSegment > DISK_SIZE_IN_BYTES && tableSizeAfterReloadSegment < tableSizeWithNewIndex,
String.format("Table size: %d should be between %d and %d after dropping inverted index from segment: %s",
tableSizeAfterReloadSegment, DISK_SIZE_IN_BYTES, tableSizeWithNewIndex, segmentName));
// Add inverted index back to check if reloading whole table with force download works.
// Note that because we have force downloaded a segment above, it's important to reset the table state by adding
// the inverted index back before check if reloading whole table with force download works. Otherwise, the query's
// numEntriesScannedInFilter can become numTotalDocs sooner than expected, while the segment (reloaded by the test
// above) is being reloaded again, causing the table size smaller than expected.
//
// As to why the table size could be smaller than expected, it's because when reloading a segment with force
// download, the original segment dir is deleted and then replaced with the newly downloaded segment, leaving a
// small chance of race condition between getting table size check and replacing the segment dir, i.e. flaky test.
addInvertedIndex();
// The table size gets larger for sure, but it does not necessarily equal to tableSizeWithNewIndex, because the
// order of entries in the index_map file can change when the raw segment adds/deletes indices back and forth.
long tableSizeAfterAddIndex = getTableSize(getTableName());
assertTrue(tableSizeAfterAddIndex > tableSizeAfterReloadSegment,
String.format("Table size: %d should increase after adding inverted index on segment: %s, as compared with %d",
tableSizeAfterAddIndex, segmentName, tableSizeAfterReloadSegment));
// Force to download the whole table and use the original table config, so the disk usage should get back to
// initial value.
tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
updateTableConfig(tableConfig);
String reloadId = reloadOfflineTableAndValidateResponse(getTableName(), true);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
// Total docs should not change during reload, but num entries scanned
// gets back to total number of documents as the index is removed.
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
long actualTotalDocs = queryResponse.get("numEntriesScannedInFilter").asLong();
return actualTotalDocs == numTotalDocs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to cleanup obsolete index in table");
validateReloadJobSuccess(reloadJobId);
// With force download, the table size gets back to the initial value.
assertEquals(getTableSize(getTableName()), DISK_SIZE_IN_BYTES);
}
private void addInvertedIndex()
throws IOException {
// Update table config to add inverted index on DivActualElapsedTime column, and
// reload the table to get config change into effect and add the inverted index.
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
updateTableConfig(tableConfig);
String reloadJobId =
reloadOfflineTableAndValidateResponse(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), false);
// It takes a while to reload multiple segments, thus we retry the query for some time.
// After all segments are reloaded, the inverted index is added on DivActualElapsedTime.
// It's expected to have numEntriesScannedInFilter equal to 0, i.e. no docs is scanned
// at filtering stage when inverted index can answer the predicate directly.
long numTotalDocs = getCountStarResult();
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
return queryResponse.get("numEntriesScannedInFilter").asLong() == 0L;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate inverted index");
validateReloadJobSuccess(reloadJobId);
}
@Test
public void testTimeFunc()
throws Exception {
String sqlQuery = "SELECT toDateTime(now(), 'yyyy-MM-dd z'), toDateTime(ago('PT1H'), 'yyyy-MM-dd z') FROM mytable";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
String todayStr = response.get("resultTable").get("rows").get(0).get(0).asText();
String expectedTodayStr =
Instant.now().atZone(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd z"));
assertEquals(todayStr, expectedTodayStr);
String oneHourAgoTodayStr = response.get("resultTable").get("rows").get(0).get(1).asText();
String expectedOneHourAgoTodayStr = Instant.now().minus(Duration.parse("PT1H")).atZone(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd z"));
assertEquals(oneHourAgoTodayStr, expectedOneHourAgoTodayStr);
}
@Test
public void testRegexpReplace()
throws Exception {
// Correctness tests of regexpReplace.
// Test replace all.
String sqlQuery = "SELECT regexpReplace('CA', 'C', 'TEST')";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
String result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "TESTA");
sqlQuery = "SELECT regexpReplace('foobarbaz', 'b', 'X')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "fooXarXaz");
sqlQuery = "SELECT regexpReplace('foobarbaz', 'b', 'XY')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "fooXYarXYaz");
sqlQuery = "SELECT regexpReplace('Argentina', '(.)', '$1 ')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "A r g e n t i n a ");
sqlQuery = "SELECT regexpReplace('Pinot is blazing fast', '( ){2,}', ' ')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "Pinot is blazing fast");
sqlQuery = "SELECT regexpReplace('healthy, wealthy, and wise','\\w+thy', 'something')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "something, something, and wise");
sqlQuery = "SELECT regexpReplace('11234567898','(\\d)(\\d{3})(\\d{3})(\\d{4})', '$1-($2) $3-$4')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "1-(123) 456-7898");
// Test replace starting at index.
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+thy', 'something', 4)";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "healthy, something, something and wise");
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+thy', 'something', 1)";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "hsomething, something, something and wise");
// Test occurence
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+thy', 'something', 0, 2)";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "healthy, wealthy, something and wise");
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+thy', 'something', 0, 0)";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "something, wealthy, stealthy and wise");
// Test flags
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+tHy', 'something', 0, 0, 'i')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "something, wealthy, stealthy and wise");
// Negative test. Pattern match not found.
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+tHy', 'something')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "healthy, wealthy, stealthy and wise");
// Negative test. Pattern match not found.
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+tHy', 'something', 3, 21, 'i')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "healthy, wealthy, stealthy and wise");
// Negative test - incorrect flag
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+tHy', 'something', 3, 12, 'xyz')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "healthy, wealthy, stealthy and wise");
// Test in select clause with column values
sqlQuery = "SELECT regexpReplace(DestCityName, ' ', '', 0, -1, 'i') from myTable where OriginState = 'CA'";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
JsonNode rows = response.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
JsonNode row = rows.get(i);
assertFalse(row.get(0).asText().contains(" "));
}
// Test in where clause
sqlQuery = "SELECT count(*) from myTable where regexpReplace(originState, '[VC]A', 'TEST') = 'TEST'";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
int count1 = response.get("resultTable").get("rows").get(0).get(0).asInt();
sqlQuery = "SELECT count(*) from myTable where originState='CA' or originState='VA'";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
int count2 = response.get("resultTable").get("rows").get(0).get(0).asInt();
assertEquals(count1, count2);
// Test nested transform
sqlQuery =
"SELECT count(*) from myTable where contains(regexpReplace(originState, '(C)(A)', '$1TEST$2'), 'CTESTA')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
count1 = response.get("resultTable").get("rows").get(0).get(0).asInt();
sqlQuery = "SELECT count(*) from myTable where originState='CA'";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
count2 = response.get("resultTable").get("rows").get(0).get(0).asInt();
assertEquals(count1, count2);
}
@Test
public void testCastMV()
throws Exception {
// simple cast
String sqlQuery = "SELECT DivLongestGTimes, CAST(DivLongestGTimes as DOUBLE) from myTable LIMIT 100";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
JsonNode resultTable = response.get("resultTable");
JsonNode dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"FLOAT_ARRAY\",\"DOUBLE_ARRAY\"]");
JsonNode rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 100);
for (int i = 0; i < 100; i++) {
JsonNode row = rows.get(i).get(0);
JsonNode rowCast = rows.get(i).get(1);
assertTrue(rowCast.isArray());
assertTrue(row.isArray());
assertEquals(row.size(), rowCast.size());
for (int j = 0; j < rowCast.size(); j++) {
float original = row.get(j).floatValue();
assertTrue(rowCast.get(j).isDouble());
double resultCast = rowCast.get(j).asDouble();
assertEquals(resultCast, (double) original);
}
}
// nested cast
sqlQuery = "SELECT DivAirportIDs, CAST(CAST(CAST(DivAirportIDs AS FLOAT) as INT) as STRING),"
+ " DivTotalGTimes, CAST(CAST(DivTotalGTimes AS STRING) AS LONG) from myTable ORDER BY CARRIER LIMIT 100";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnDataTypes").toString(),
"[\"INT_ARRAY\",\"STRING_ARRAY\",\"LONG_ARRAY\",\"LONG_ARRAY\"]");
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 100);
for (int i = 0; i < 100; i++) {
JsonNode col1 = rows.get(i).get(0);
JsonNode col1Cast = rows.get(i).get(1);
assertTrue(col1Cast.isArray());
assertTrue(col1.isArray());
assertEquals(col1.size(), col1Cast.size());
for (int j = 0; j < col1Cast.size(); j++) {
int original = col1.get(j).asInt();
assertTrue(col1Cast.get(j).isTextual());
String resultCast = col1Cast.get(j).asText();
assertEquals(resultCast, String.valueOf((int) ((float) original)));
}
JsonNode col2 = rows.get(i).get(2);
JsonNode col2Cast = rows.get(i).get(3);
assertTrue(col2Cast.isArray());
assertTrue(col2.isArray());
assertEquals(col2.size(), col2Cast.size());
for (int j = 0; j < col2Cast.size(); j++) {
long original = col2.get(j).asLong();
long resultCast = col2Cast.get(j).asLong();
assertEquals(resultCast, Long.parseLong(String.valueOf(original)));
}
}
}
@Test
public void testUrlFunc()
throws Exception {
String sqlQuery = "SELECT encodeUrl('key1=value 1&key2=value@!$2&key3=value%3'), "
+ "decodeUrl('key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253') FROM myTable";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
String encodedString = response.get("resultTable").get("rows").get(0).get(0).asText();
String expectedUrlStr = encodeUrl("key1=value 1&key2=value@!$2&key3=value%3");
assertEquals(encodedString, expectedUrlStr);
String decodedString = response.get("resultTable").get("rows").get(0).get(1).asText();
expectedUrlStr = decodeUrl("key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253");
assertEquals(decodedString, expectedUrlStr);
}
@Test
public void testBase64Func()
throws Exception {
// string literal
String sqlQuery = "SELECT toBase64(toUtf8('hello!')), " + "fromUtf8(fromBase64('aGVsbG8h')) FROM myTable";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
JsonNode resultTable = response.get("resultTable");
JsonNode dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"STRING\",\"STRING\"]");
JsonNode rows = response.get("resultTable").get("rows");
String encodedString = rows.get(0).get(0).asText();
String expectedEncodedStr = toBase64(toUtf8("hello!"));
assertEquals(encodedString, expectedEncodedStr);
String decodedString = rows.get(0).get(1).asText();
String expectedDecodedStr = fromUtf8(fromBase64("aGVsbG8h"));
assertEquals(decodedString, expectedDecodedStr);
// long string literal encode
sqlQuery =
"SELECT toBase64(toUtf8('this is a long string that will encode to more than 76 characters using base64')) "
+ "FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
encodedString = rows.get(0).get(0).asText();
assertEquals(encodedString,
toBase64(toUtf8("this is a long string that will encode to more than 76 characters using base64")));
// long string literal decode
sqlQuery = "SELECT fromUtf8(fromBase64"
+ "('dGhpcyBpcyBhIGxvbmcgc3RyaW5nIHRoYXQgd2lsbCBlbmNvZGUgdG8gbW9yZSB0aGFuIDc2IGNoYXJhY3RlcnMgdXNpbmcgYmFzZTY0"
+ "')) FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
decodedString = rows.get(0).get(0).asText();
assertEquals(decodedString, fromUtf8(fromBase64(
"dGhpcyBpcyBhIGxvbmcgc3RyaW5nIHRoYXQgd2lsbCBlbmNvZGUgdG8gbW9yZSB0aGFuIDc2IGNoYXJhY3RlcnMgdXNpbmcgYmFzZTY0")));
// non-string literal
sqlQuery = "SELECT toBase64(toUtf8(123)), fromUtf8(fromBase64(toBase64(toUtf8(123)))), 123 FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
encodedString = rows.get(0).get(0).asText();
decodedString = rows.get(0).get(1).asText();
String originalCol = rows.get(0).get(2).asText();
assertEquals(decodedString, originalCol);
assertEquals(encodedString, toBase64(toUtf8("123")));
// identifier
sqlQuery = "SELECT Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))), "
+ "fromBase64(toBase64(toUtf8(Carrier))) FROM myTable LIMIT 100";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"STRING\",\"STRING\",\"STRING\",\"BYTES\"]");
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 100);
for (int i = 0; i < 100; i++) {
String original = rows.get(0).asText();
String encoded = rows.get(1).asText();
String decoded = rows.get(2).asText();
assertEquals(original, decoded);
assertEquals(encoded, toBase64(toUtf8(original)));
assertEquals(decoded, fromUtf8(fromBase64(toBase64(toUtf8(original)))));
}
// invalid argument
sqlQuery = "SELECT toBase64() FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
// invalid argument
sqlQuery = "SELECT fromBase64() FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
// invalid argument
sqlQuery = "SELECT toBase64('hello!') FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
assertTrue(response.get("exceptions").get(0).get("message").toString().contains("SqlCompilationException"));
// invalid argument
sqlQuery = "SELECT fromBase64('hello!') FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
assertTrue(response.get("exceptions").get(0).get("message").toString().contains("IllegalArgumentException"));
// string literal used in a filter
sqlQuery = "SELECT * FROM myTable WHERE fromUtf8(fromBase64('aGVsbG8h')) != Carrier AND "
+ "toBase64(toUtf8('hello!')) != Carrier LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
// non-string literal used in a filter
sqlQuery = "SELECT * FROM myTable WHERE fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) != Carrier LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
// string identifier used in a filter
sqlQuery = "SELECT * FROM myTable WHERE fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) = Carrier LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
// non-string identifier used in a filter
sqlQuery = "SELECT fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))), AirlineID FROM myTable WHERE "
+ "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) = AirlineID LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"STRING\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
// string identifier used in group by order by
sqlQuery = "SELECT Carrier as originalCol, toBase64(toUtf8(Carrier)) as encoded, "
+ "fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) as decoded FROM myTable "
+ "GROUP BY Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) "
+ "ORDER BY toBase64(toUtf8(Carrier)) LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"STRING\",\"STRING\",\"STRING\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
for (int i = 0; i < 10; i++) {
String original = rows.get(0).asText();
String encoded = rows.get(1).asText();
String decoded = rows.get(2).asText();
assertEquals(original, decoded);
assertEquals(encoded, toBase64(toUtf8(original)));
assertEquals(decoded, fromUtf8(fromBase64(toBase64(toUtf8(original)))));
}
// non-string identifier used in group by order by
sqlQuery = "SELECT AirlineID as originalCol, toBase64(toUtf8(AirlineID)) as encoded, "
+ "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) as decoded FROM myTable "
+ "GROUP BY AirlineID, toBase64(toUtf8(AirlineID)), fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) "
+ "ORDER BY fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"LONG\",\"STRING\",\"STRING\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
for (int i = 0; i < 10; i++) {
String original = rows.get(0).asText();
String encoded = rows.get(1).asText();
String decoded = rows.get(2).asText();
assertEquals(original, decoded);
assertEquals(encoded, toBase64(toUtf8(original)));
assertEquals(decoded, fromUtf8(fromBase64(toBase64(toUtf8(original)))));
}
}
@Test
public void testLiteralOnlyFunc()
throws Exception {
long currentTsMin = System.currentTimeMillis();
long oneHourAgoTsMin = currentTsMin - ONE_HOUR_IN_MS;
String sqlQuery =
"SELECT 1, now() as currentTs, ago('PT1H') as oneHourAgoTs, 'abc', toDateTime(now(), 'yyyy-MM-dd z') as "
+ "today, now(), ago('PT1H'), encodeUrl('key1=value 1&key2=value@!$2&key3=value%3') as encodedUrl, "
+ "decodeUrl('key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253') as decodedUrl, toBase64"
+ "(toUtf8('hello!')) as toBase64, fromUtf8(fromBase64('aGVsbG8h')) as fromBase64";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
long currentTsMax = System.currentTimeMillis();
long oneHourAgoTsMax = currentTsMax - ONE_HOUR_IN_MS;
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "1");
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(1).asText(), "currentTs");
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(2).asText(), "oneHourAgoTs");
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(3).asText(), "abc");
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(4).asText(), "today");
String nowColumnName = response.get("resultTable").get("dataSchema").get("columnNames").get(5).asText();
String oneHourAgoColumnName = response.get("resultTable").get("dataSchema").get("columnNames").get(6).asText();
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(7).asText(), "encodedUrl");
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(8).asText(), "decodedUrl");
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(9).asText(), "toBase64");
assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(10).asText(), "fromBase64");
assertTrue(Long.parseLong(nowColumnName) > currentTsMin);
assertTrue(Long.parseLong(nowColumnName) < currentTsMax);
assertTrue(Long.parseLong(oneHourAgoColumnName) > oneHourAgoTsMin);
assertTrue(Long.parseLong(oneHourAgoColumnName) < oneHourAgoTsMax);
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(1).asText(), "LONG");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(2).asText(), "LONG");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(3).asText(), "STRING");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(4).asText(), "STRING");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(5).asText(), "LONG");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(6).asText(), "LONG");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(7).asText(), "STRING");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(8).asText(), "STRING");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(9).asText(), "STRING");
assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(10).asText(), "STRING");
int first = response.get("resultTable").get("rows").get(0).get(0).asInt();
long second = response.get("resultTable").get("rows").get(0).get(1).asLong();
long third = response.get("resultTable").get("rows").get(0).get(2).asLong();
String fourth = response.get("resultTable").get("rows").get(0).get(3).asText();
assertEquals(first, 1);
assertTrue(second > currentTsMin);
assertTrue(second < currentTsMax);
assertTrue(third > oneHourAgoTsMin);
assertTrue(third < oneHourAgoTsMax);
assertEquals(fourth, "abc");
String todayStr = response.get("resultTable").get("rows").get(0).get(4).asText();
String expectedTodayStr =
Instant.now().atZone(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd z"));
assertEquals(todayStr, expectedTodayStr);
long nowValue = response.get("resultTable").get("rows").get(0).get(5).asLong();
assertEquals(nowValue, Long.parseLong(nowColumnName));
long oneHourAgoValue = response.get("resultTable").get("rows").get(0).get(6).asLong();
assertEquals(oneHourAgoValue, Long.parseLong(oneHourAgoColumnName));
assertEquals(response.get("resultTable").get("rows").get(0).get(7).asText(),
"key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253");
assertEquals(response.get("resultTable").get("rows").get(0).get(8).asText(),
"key1=value 1&key2=value@!$2&key3=value%3");
assertEquals(response.get("resultTable").get("rows").get(0).get(9).asText(), "aGVsbG8h");
assertEquals(response.get("resultTable").get("rows").get(0).get(10).asText(), "hello!");
}
@Test(dependsOnMethods = "testBloomFilterTriggering")
public void testRangeIndexTriggering()
throws Exception {
long numTotalDocs = getCountStarResult();
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);
// Update table config and trigger reload
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS);
updateTableConfig(tableConfig);
String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
return queryResponse.get("numEntriesScannedInFilter").asLong() < numTotalDocs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate range index");
validateReloadJobSuccess(reloadJobId);
// Update table config to remove the new range index, and
// reload table to clean the new range index physically.
tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns());
updateTableConfig(tableConfig);
reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
// Total docs should not change during reload, but num entries scanned
// gets back to total number of documents as the index is removed.
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
return queryResponse.get("numEntriesScannedInFilter").asLong() == numTotalDocs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to cleanup obsolete index");
validateReloadJobSuccess(reloadJobId);
assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
}
@Test(dependsOnMethods = "testDefaultColumns")
public void testBloomFilterTriggering()
throws Exception {
long numTotalDocs = getCountStarResult();
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(), NUM_SEGMENTS);
// Update table config and trigger reload
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS);
updateTableConfig(tableConfig);
String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
return queryResponse.get("numSegmentsProcessed").asLong() == 0L;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate bloom filter");
validateReloadJobSuccess(reloadJobId);
// Update table config to remove the new bloom filter, and
// reload table to clean the new bloom filter physically.
tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setBloomFilterColumns(getBloomFilterColumns());
updateTableConfig(tableConfig);
reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
// Total docs should not change during reload, but num entries scanned
// gets back to total number of documents as bloom filter is removed.
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
return queryResponse.get("numSegmentsProcessed").asLong() == NUM_SEGMENTS;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to cleanup obsolete index");
validateReloadJobSuccess(reloadJobId);
assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
}
/**
* Check if server returns error response quickly without timing out Broker.
*/
@Test
public void testServerErrorWithBrokerTimeout()
throws Exception {
long startTimeMs = System.currentTimeMillis();
// The query below will fail execution due to JSON_MATCH on column without json index
JsonNode queryResponse = postQuery("SELECT count(*) FROM mytable WHERE JSON_MATCH(Dest, '$=123')");
// NOTE: Broker timeout is 60s
assertTrue(System.currentTimeMillis() - startTimeMs < 60_000L);
assertTrue(queryResponse.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
}
@Test
public void testStarTreeTriggering()
throws Exception {
long numTotalDocs = getCountStarResult();
long tableSizeWithDefaultIndex = getTableSize(getTableName());
// Test the first query
JsonNode firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
int firstQueryResult = firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt();
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
// Initially 'numDocsScanned' should be the same as 'COUNT(*)' result
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), firstQueryResult);
// Update table config and trigger reload
TableConfig tableConfig = getOfflineTableConfig();
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_1));
indexingConfig.setEnableDynamicStarTreeCreation(true);
updateTableConfig(tableConfig);
String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
// Result should not change during reload
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
// With star-tree, 'numDocsScanned' should be the same as number of segments (1 per segment)
return queryResponse.get("numDocsScanned").asInt() == NUM_SEGMENTS;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to add first star-tree index");
validateReloadJobSuccess(reloadJobId);
// Reload again should have no effect
reloadOfflineTableAndValidateResponse(getTableName(), false);
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult);
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
// Enforce a sleep here since segment reload is async and there is another back-to-back reload below.
// Otherwise, there is no way to tell whether the 1st reload on server side is finished,
// which may hit the race condition that the 1st reload finishes after the 2nd reload is fully done.
// 10 seconds are still better than hitting race condition which will time out after 10 minutes.
Thread.sleep(10_000L);
// Should be able to use the star-tree with an additional match-all predicate on another dimension
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1 + " AND DaysSinceEpoch > 16070");
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult);
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
// Test the second query
JsonNode secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
int secondQueryResult = secondQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt();
assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
// Initially 'numDocsScanned' should be the same as 'COUNT(*)' result
assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), secondQueryResult);
// Update table config with a different star-tree index config and trigger reload
indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_2));
updateTableConfig(tableConfig);
String reloadId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
// Result should not change during reload
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), secondQueryResult);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
// With star-tree, 'numDocsScanned' should be the same as number of segments (1 per segment)
return queryResponse.get("numDocsScanned").asInt() == NUM_SEGMENTS;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to change to second star-tree index");
validateReloadJobSuccess(reloadJobId);
// First query should not be able to use the star-tree
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), firstQueryResult);
// Reload again should have no effect
reloadOfflineTableAndValidateResponse(getTableName(), false);
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult);
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), firstQueryResult);
secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
assertEquals(secondQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), secondQueryResult);
assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
// Should be able to use the star-tree with an additional match-all predicate on another dimension
secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2 + " AND DaysSinceEpoch > 16070");
assertEquals(secondQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), secondQueryResult);
assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
// Enforce a sleep here since segment reload is async and there is another back-to-back reload below.
// Otherwise, there is no way to tell whether the 1st reload on server side is finished,
// which may hit the race condition that the 1st reload finishes after the 2nd reload is fully done.
// 10 seconds are still better than hitting race condition which will time out after 10 minutes.
Thread.sleep(10_000L);
// Remove the star-tree index config and trigger reload
indexingConfig.setStarTreeIndexConfigs(null);
updateTableConfig(tableConfig);
reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
// Result should not change during reload
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), secondQueryResult);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
// Without star-tree, 'numDocsScanned' should be the same as the 'COUNT(*)' result
return queryResponse.get("numDocsScanned").asInt() == secondQueryResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to remove star-tree index");
validateReloadJobSuccess(reloadJobId);
assertEquals(getTableSize(getTableName()), tableSizeWithDefaultIndex);
// First query should not be able to use the star-tree
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), firstQueryResult);
// Reload again should have no effect
reloadOfflineTableAndValidateResponse(getTableName(), false);
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult);
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), firstQueryResult);
secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
assertEquals(secondQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), secondQueryResult);
assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), secondQueryResult);
}
/**
* We will add extra new columns to the schema to test adding new columns with default value/transform function to the
* offline segments.
* <p>New columns are: (name, field type, data type, single/multi value, default null value)
* <ul>
* <li>"NewAddedIntMetric", METRIC, INT, single-value, 1</li>
* <li>"NewAddedLongMetric", METRIC, LONG, single-value, 1</li>
* <li>"NewAddedFloatMetric", METRIC, FLOAT, single-value, default (0.0)</li>
* <li>"NewAddedDoubleMetric", METRIC, DOUBLE, single-value, default (0.0)</li>
* <li>"NewAddedBigDecimalMetric", METRIC, BIG_DECIMAL, single-value, default (0)</li>
* <li>"NewAddedBytesMetric", METRIC, BYTES, single-value, default (byte[0])</li>
* <li>"NewAddedMVIntDimension", DIMENSION, INT, multi-value, default (Integer.MIN_VALUE)</li>
* <li>"NewAddedMVLongDimension", DIMENSION, LONG, multi-value, default (Long.MIN_VALUE)</li>
* <li>"NewAddedMVFloatDimension", DIMENSION, FLOAT, multi-value, default (Float.NEGATIVE_INFINITY)</li>
* <li>"NewAddedMVDoubleDimension", DIMENSION, DOUBLE, multi-value, default (Double.NEGATIVE_INFINITY)</li>
* <li>"NewAddedMVBooleanDimension", DIMENSION, BOOLEAN, multi-value, default (false)</li>
* <li>"NewAddedMVTimestampDimension", DIMENSION, TIMESTAMP, multi-value, default (EPOCH)</li>
* <li>"NewAddedMVStringDimension", DIMENSION, STRING, multi-value, default ("null")</li>
* <li>"NewAddedSVJSONDimension", DIMENSION, JSON, single-value, default ("null")</li>
* <li>"NewAddedSVBytesDimension", DIMENSION, BYTES, single-value, default (byte[0])</li>
* <li>"NewAddedDerivedHoursSinceEpoch", DATE_TIME, INT, single-value, default (Integer.MIN_VALUE)</li>
* <li>"NewAddedDerivedTimestamp", DATE_TIME, TIMESTAMP, single-value, default (EPOCH)</li>
* <li>"NewAddedDerivedSVBooleanDimension", DIMENSION, BOOLEAN, single-value, default (false)</li>
* <li>"NewAddedDerivedMVStringDimension", DATE_TIME, STRING, multi-value</li>
* </ul>
*/
@Test(dependsOnMethods = "testAggregateMetadataAPI")
public void testDefaultColumns()
throws Exception {
long numTotalDocs = getCountStarResult();
reloadWithExtraColumns();
JsonNode queryResponse = postQuery(SELECT_STAR_QUERY);
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 98);
testNewAddedColumns();
testExpressionOverride();
reloadWithMissingColumns();
queryResponse = postQuery(SELECT_STAR_QUERY);
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 75);
reloadWithRegularColumns();
queryResponse = postQuery(SELECT_STAR_QUERY);
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 79);
_tableSizeAfterRemovingIndex = getTableSize(getTableName());
}
@Test
public void testDisableGroovyQueryTableConfigOverride()
throws Exception {
String groovyQuery = "SELECT GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', "
+ "'arg0 + arg1', FlightNum, Origin) FROM myTable";
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(null, false, null, null));
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
try {
// Query should not throw exception
postQuery(groovyQuery);
return true;
} catch (Exception e) {
return false;
}
}, 60_000L, "Failed to accept Groovy query with table override");
// Remove query config
tableConfig.setQueryConfig(null);
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
try {
postQuery(groovyQuery);
return false;
} catch (Exception e) {
// expected
return true;
}
}, 60_000L, "Failed to reject Groovy query without query table config override");
}
private void reloadWithExtraColumns()
throws Exception {
long numTotalDocs = getCountStarResult();
// Add columns to the schema first to pass the validation of the table config
Schema schema = createSchema();
schema.addField(new MetricFieldSpec("NewAddedIntMetric", DataType.INT, 1));
schema.addField(new MetricFieldSpec("NewAddedLongMetric", DataType.LONG, 1));
schema.addField(new MetricFieldSpec("NewAddedFloatMetric", DataType.FLOAT));
schema.addField(new MetricFieldSpec("NewAddedDoubleMetric", DataType.DOUBLE));
schema.addField(new MetricFieldSpec("NewAddedBigDecimalMetric", DataType.BIG_DECIMAL));
schema.addField(new MetricFieldSpec("NewAddedBytesMetric", DataType.BYTES));
schema.addField(new DimensionFieldSpec("NewAddedMVIntDimension", DataType.INT, false));
schema.addField(new DimensionFieldSpec("NewAddedMVLongDimension", DataType.LONG, false));
schema.addField(new DimensionFieldSpec("NewAddedMVFloatDimension", DataType.FLOAT, false));
schema.addField(new DimensionFieldSpec("NewAddedMVDoubleDimension", DataType.DOUBLE, false));
schema.addField(new DimensionFieldSpec("NewAddedMVBooleanDimension", DataType.BOOLEAN, false));
schema.addField(new DimensionFieldSpec("NewAddedMVTimestampDimension", DataType.TIMESTAMP, false));
schema.addField(new DimensionFieldSpec("NewAddedMVStringDimension", DataType.STRING, false));
// NOTE: MV JSON and BYTES are not supported
schema.addField(new DimensionFieldSpec("NewAddedSVJSONDimension", DataType.JSON, true));
schema.addField(new DimensionFieldSpec("NewAddedSVBytesDimension", DataType.BYTES, true));
schema.addField(new DateTimeFieldSpec("NewAddedDerivedHoursSinceEpoch", DataType.INT, "EPOCH|HOURS", "1:DAYS"));
schema.addField(new DateTimeFieldSpec("NewAddedDerivedTimestamp", DataType.TIMESTAMP, "TIMESTAMP", "1:DAYS"));
schema.addField(new DimensionFieldSpec("NewAddedDerivedSVBooleanDimension", DataType.BOOLEAN, true));
schema.addField(new DimensionFieldSpec("NewAddedDerivedMVStringDimension", DataType.STRING, false));
addSchema(schema);
TableConfig tableConfig = getOfflineTableConfig();
List<TransformConfig> transformConfigs =
Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"),
new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"),
new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"),
new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
updateTableConfig(tableConfig);
// Trigger reload
String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_EXTRA_COLUMNS_QUERY);
if (!queryResponse.get("exceptions").isEmpty()) {
// Schema is not refreshed on the broker side yet
return false;
}
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
long count = queryResponse.get("resultTable").get("rows").get(0).get(0).asLong();
return count == numTotalDocs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to add default columns");
validateReloadJobSuccess(reloadJobId);
}
private void reloadWithMissingColumns()
throws Exception {
long numTotalDocs = getCountStarResult();
// Remove columns from the table config first to pass the validation of the table config
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setIngestionConfig(null);
updateTableConfig(tableConfig);
// Need to first delete then add the schema because removing columns is backward-incompatible change
deleteSchema(getSchemaName());
Schema schema = createSchema();
schema.removeField("AirlineID");
schema.removeField("ArrTime");
schema.removeField("AirTime");
schema.removeField("ArrDel15");
addSchema(schema);
// Trigger reload
String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(SELECT_STAR_QUERY);
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(), "*")));
assertEquals(segmentsMetadata.size(), 12);
for (JsonNode segmentMetadata : segmentsMetadata) {
if (segmentMetadata.get("columns").size() != 75) {
return false;
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to skip missing columns");
validateReloadJobSuccess(reloadJobId);
}
private void reloadWithRegularColumns()
throws Exception {
long numTotalDocs = getCountStarResult();
_schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
addSchema(createSchema());
// Trigger reload
String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse = postQuery(TEST_REGULAR_COLUMNS_QUERY);
if (!queryResponse.get("exceptions").isEmpty()) {
// Schema is not refreshed on the broker side yet
return false;
}
// Total docs should not change during reload
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
long count = queryResponse.get("resultTable").get("rows").get(0).get(0).asLong();
return count == numTotalDocs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to reload regular columns");
validateReloadJobSuccess(reloadJobId);
}
private void testNewAddedColumns()
throws Exception {
long numTotalDocs = getCountStarResult();
double numTotalDocsInDouble = (double) numTotalDocs;
// Test queries with each new added columns
String h2Query = "SELECT COUNT(*) FROM mytable";
String pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric = 1";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedLongMetric = 1";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedFloatMetric = 0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedDoubleMetric = 0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedBigDecimalMetric = 0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedBytesMetric = ''";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedMVIntDimension < 0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedMVLongDimension < 0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedMVFloatDimension < 0.0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedMVDoubleDimension < 0.0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedMVBooleanDimension = false";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedMVTimestampDimension = 0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedMVStringDimension = 'null'";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedSVJSONDimension = 'null'";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedSVBytesDimension = ''";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedDerivedHoursSinceEpoch = 392232";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch = 16343";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedDerivedTimestamp = 1411862400000";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch = 16341";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedDerivedSVBooleanDimension = true";
h2Query = "SELECT COUNT(*) FROM mytable WHERE ActualElapsedTime > 0";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedDerivedMVStringDimension = 'CA'";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'CA'";
testQuery(pinotQuery, h2Query);
// Test queries with new added metric column in aggregation function
pinotQuery = "SELECT SUM(NewAddedIntMetric) FROM mytable WHERE DaysSinceEpoch <= 16312";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch <= 16312";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT SUM(NewAddedIntMetric) FROM mytable WHERE DaysSinceEpoch > 16312";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch > 16312";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT SUM(NewAddedLongMetric) FROM mytable WHERE DaysSinceEpoch <= 16312";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch <= 16312";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT SUM(NewAddedLongMetric) FROM mytable WHERE DaysSinceEpoch > 16312";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch > 16312";
testQuery(pinotQuery, h2Query);
// Test other query forms with new added columns
pinotQuery =
"SELECT NewAddedMVStringDimension, SUM(NewAddedFloatMetric) FROM mytable GROUP BY NewAddedMVStringDimension";
JsonNode response = postQuery(pinotQuery);
JsonNode rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
JsonNode row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asText(), "null");
assertEquals(row.get(1).asDouble(), 0.0);
pinotQuery =
"SELECT NewAddedSVBytesDimension, SUM(NewAddedBigDecimalMetric) FROM mytable GROUP BY NewAddedSVBytesDimension";
response = postQuery(pinotQuery);
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asText(), "");
assertEquals(row.get(1).asDouble(), 0.0);
pinotQuery = "SELECT NewAddedMVLongDimension, SUM(NewAddedIntMetric) FROM mytable GROUP BY NewAddedMVLongDimension";
response = postQuery(pinotQuery);
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asLong(), Long.MIN_VALUE);
assertEquals(row.get(1).asDouble(), numTotalDocsInDouble);
String newAddedDimensions =
"NewAddedMVIntDimension, NewAddedMVLongDimension, NewAddedMVFloatDimension, NewAddedMVDoubleDimension, "
+ "NewAddedMVBooleanDimension, NewAddedMVTimestampDimension, NewAddedMVStringDimension, "
+ "NewAddedSVJSONDimension, NewAddedSVBytesDimension";
pinotQuery = "SELECT " + newAddedDimensions + ", SUM(NewAddedIntMetric), SUM(NewAddedLongMetric), "
+ "SUM(NewAddedFloatMetric), SUM(NewAddedDoubleMetric) FROM mytable GROUP BY " + newAddedDimensions;
response = postQuery(pinotQuery);
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
row = rows.get(0);
assertEquals(row.size(), 13);
assertEquals(row.get(0).asInt(), Integer.MIN_VALUE);
assertEquals(row.get(1).asLong(), Long.MIN_VALUE);
assertEquals(row.get(2).asDouble(), (double) Float.NEGATIVE_INFINITY);
assertEquals(row.get(3).asDouble(), Double.NEGATIVE_INFINITY);
assertFalse(row.get(4).asBoolean());
assertEquals(row.get(5).asText(), new Timestamp(0).toString());
assertEquals(row.get(6).asText(), "null");
assertEquals(row.get(7).asText(), "null");
assertEquals(row.get(8).asText(), "");
assertEquals(row.get(9).asDouble(), numTotalDocsInDouble);
assertEquals(row.get(10).asDouble(), numTotalDocsInDouble);
assertEquals(row.get(11).asDouble(), 0.0);
assertEquals(row.get(12).asDouble(), 0.0);
}
private void testExpressionOverride()
throws Exception {
String query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch * 24 = 392184";
// Initially there is no expression override
{
JsonNode response = postQuery(query);
assertEquals(response.get("numSegmentsProcessed").asInt(), 12);
assertEquals(response.get("numEntriesScannedInFilter").asInt(), getCountStarResult());
}
// Add expression override
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(null, null, null,
Collections.singletonMap("DaysSinceEpoch * 24", "NewAddedDerivedHoursSinceEpoch")));
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode response = postQuery(query);
return response.get("numSegmentsProcessed").asInt() == 1
&& response.get("numEntriesScannedInFilter").asInt() == 0;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 60_000L, "Failed to add expression override");
// Remove expression override
tableConfig.setQueryConfig(null);
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode response = postQuery(query);
return response.get("numSegmentsProcessed").asInt() == 12
&& response.get("numEntriesScannedInFilter").asInt() == getCountStarResult();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 60_000L, "Failed to remove expression override");
}
@Test
@Override
public void testBrokerResponseMetadata()
throws Exception {
super.testBrokerResponseMetadata();
}
@Test
public void testInBuiltVirtualColumns()
throws Exception {
String query = "SELECT $docId, $HOSTNAME, $segmentname FROM mytable";
JsonNode response = postQuery(query);
JsonNode resultTable = response.get("resultTable");
JsonNode dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"$docId\",\"$hostName\",\"$segmentName\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"INT\",\"STRING\",\"STRING\"]");
JsonNode rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
String expectedHostName = NetUtils.getHostnameOrAddress();
String expectedSegmentNamePrefix = "mytable_";
for (int i = 0; i < 10; i++) {
JsonNode row = rows.get(i);
assertEquals(row.get(0).asInt(), i);
assertEquals(row.get(1).asText(), expectedHostName);
assertTrue(row.get(2).asText().startsWith(expectedSegmentNamePrefix));
}
}
@Test
public void testGroupByUDF()
throws Exception {
String query = "SELECT timeConvert(DaysSinceEpoch,'DAYS','SECONDS'), COUNT(*) FROM mytable "
+ "GROUP BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS') ORDER BY COUNT(*) DESC";
JsonNode response = postQuery(query);
JsonNode resultTable = response.get("resultTable");
JsonNode dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(),
"[\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"LONG\",\"LONG\"]");
JsonNode rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
JsonNode row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asLong(), 16138 * 24 * 3600);
assertEquals(row.get(1).asLong(), 605);
query = "SELECT dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS'), COUNT(*) FROM mytable "
+ "GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS') ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(),
"[\"datetimeconvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"LONG\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asLong(), 16138 * 24);
assertEquals(row.get(1).asLong(), 605);
query = "SELECT add(DaysSinceEpoch,DaysSinceEpoch,15), COUNT(*) FROM mytable "
+ "GROUP BY add(DaysSinceEpoch,DaysSinceEpoch,15) ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(),
"[\"add(DaysSinceEpoch,DaysSinceEpoch,'15')\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"DOUBLE\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asDouble(), 16138.0 + 16138 + 15);
assertEquals(row.get(1).asLong(), 605);
query = "SELECT sub(DaysSinceEpoch,25), COUNT(*) FROM mytable "
+ "GROUP BY sub(DaysSinceEpoch,25) ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"sub(DaysSinceEpoch,'25')\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"DOUBLE\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asDouble(), 16138.0 - 25);
assertEquals(row.get(1).asLong(), 605);
query = "SELECT mult(DaysSinceEpoch,24,3600), COUNT(*) FROM mytable "
+ "GROUP BY mult(DaysSinceEpoch,24,3600) ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"mult(DaysSinceEpoch,'24','3600')\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"DOUBLE\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asDouble(), 16138.0 * 24 * 3600);
assertEquals(row.get(1).asLong(), 605);
query = "SELECT div(DaysSinceEpoch,2), COUNT(*) FROM mytable "
+ "GROUP BY div(DaysSinceEpoch,2) ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"div(DaysSinceEpoch,'2')\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"DOUBLE\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 10);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asDouble(), 16138.0 / 2);
assertEquals(row.get(1).asLong(), 605);
query = "SELECT arrayLength(DivAirports), COUNT(*) FROM mytable "
+ "GROUP BY arrayLength(DivAirports) ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"arraylength(DivAirports)\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"INT\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 1);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asInt(), 5);
assertEquals(row.get(1).asLong(), 115545);
query = "SELECT arrayLength(valueIn(DivAirports,'DFW','ORD')), COUNT(*) FROM mytable GROUP BY "
+ "arrayLength(valueIn(DivAirports,'DFW','ORD')) ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(),
"[\"arraylength(valuein(DivAirports,'DFW','ORD'))\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"INT\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 3);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asInt(), 0);
assertEquals(row.get(1).asLong(), 114895);
row = rows.get(1);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asInt(), 1);
assertEquals(row.get(1).asLong(), 648);
row = rows.get(2);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asInt(), 2);
assertEquals(row.get(1).asLong(), 2);
query = "SELECT valueIn(DivAirports,'DFW','ORD'), COUNT(*) FROM mytable "
+ "GROUP BY valueIn(DivAirports,'DFW','ORD') ORDER BY COUNT(*) DESC";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"valuein(DivAirports,'DFW','ORD')\",\"count(*)\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"STRING\",\"LONG\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 2);
row = rows.get(0);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asText(), "ORD");
assertEquals(row.get(1).asLong(), 336);
row = rows.get(1);
assertEquals(row.size(), 2);
assertEquals(row.get(0).asText(), "DFW");
assertEquals(row.get(1).asLong(), 316);
}
@Test
public void testAggregationUDF()
throws Exception {
String query = "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable";
JsonNode response = postQuery(query);
JsonNode resultTable = response.get("resultTable");
JsonNode dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"max(timeconvert(DaysSinceEpoch,'DAYS','SECONDS'))\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"DOUBLE\"]");
JsonNode rows = resultTable.get("rows");
assertEquals(rows.size(), 1);
JsonNode row = rows.get(0);
assertEquals(row.size(), 1);
assertEquals(row.get(0).asDouble(), 16435.0 * 24 * 3600);
query = "SELECT MIN(div(DaysSinceEpoch,2)) FROM mytable";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(), "[\"min(div(DaysSinceEpoch,'2'))\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"DOUBLE\"]");
rows = resultTable.get("rows");
assertEquals(rows.size(), 1);
row = rows.get(0);
assertEquals(row.size(), 1);
assertEquals(row.get(0).asDouble(), 16071.0 / 2);
}
@Test
public void testSelectionUDF()
throws Exception {
String query = "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable";
JsonNode response = postQuery(query);
JsonNode resultTable = response.get("resultTable");
JsonNode dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(),
"[\"DaysSinceEpoch\",\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"INT\",\"LONG\"]");
JsonNode rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 10);
for (int i = 0; i < 10; i++) {
long daysSinceEpoch = rows.get(i).get(0).asInt();
long secondsSinceEpoch = rows.get(i).get(1).asLong();
assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
}
query = "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable "
+ "ORDER BY DaysSinceEpoch LIMIT 10000";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(),
"[\"DaysSinceEpoch\",\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"INT\",\"LONG\"]");
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 10000);
long prevValue = -1;
for (int i = 0; i < 10000; i++) {
long daysSinceEpoch = rows.get(i).get(0).asInt();
long secondsSinceEpoch = rows.get(i).get(1).asLong();
assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
assertTrue(daysSinceEpoch >= prevValue);
prevValue = daysSinceEpoch;
}
query = "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable "
+ "ORDER BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC LIMIT 10000";
response = postQuery(query);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
assertEquals(dataSchema.get("columnNames").toString(),
"[\"DaysSinceEpoch\",\"timeconvert(DaysSinceEpoch,'DAYS','SECONDS')\"]");
assertEquals(dataSchema.get("columnDataTypes").toString(), "[\"INT\",\"LONG\"]");
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 10000);
prevValue = Long.MAX_VALUE;
for (int i = 0; i < 10000; i++) {
long daysSinceEpoch = rows.get(i).get(0).asInt();
long secondsSinceEpoch = rows.get(i).get(1).asLong();
assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
assertTrue(secondsSinceEpoch <= prevValue);
prevValue = secondsSinceEpoch;
}
}
@Test
public void testFilterUDF()
throws Exception {
int daysSinceEpoch = 16138;
long secondsSinceEpoch = 16138 * 24 * 60 * 60;
String query;
query = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch;
long expectedResult = postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong();
query = "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResult);
query = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+ " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResult);
query = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+ " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResult);
query =
"SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResult);
query = String.format("SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') IN (%d, %d)",
secondsSinceEpoch - 100, secondsSinceEpoch);
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResult);
query = String.format(
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') BETWEEN %d AND %d",
secondsSinceEpoch - 100, secondsSinceEpoch);
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResult);
}
@Test
public void testCaseStatementInSelection()
throws Exception {
List<String> origins =
Arrays.asList("ATL", "ORD", "DFW", "DEN", "LAX", "IAH", "SFO", "PHX", "LAS", "EWR", "MCO", "BOS", "SLC", "SEA",
"MSP", "CLT", "LGA", "DTW", "JFK", "BWI");
StringBuilder caseStatementBuilder = new StringBuilder("CASE ");
for (int i = 0; i < origins.size(); i++) {
// WHEN origin = 'ATL' THEN 1
// WHEN origin = 'ORD' THEN 2
// WHEN origin = 'DFW' THEN 3
// ....
caseStatementBuilder.append(String.format("WHEN origin = '%s' THEN %d ", origins.get(i), i + 1));
}
caseStatementBuilder.append("ELSE 0 END");
String sqlQuery = "SELECT origin, " + caseStatementBuilder + " AS origin_code FROM mytable LIMIT 1000";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
JsonNode rows = response.get("resultTable").get("rows");
assertTrue(response.get("exceptions").isEmpty());
for (int i = 0; i < rows.size(); i++) {
String origin = rows.get(i).get(0).asText();
int originCode = rows.get(i).get(1).asInt();
if (originCode > 0) {
assertEquals(origin, origins.get(originCode - 1));
} else {
assertFalse(origins.contains(origin));
}
}
}
@Test
public void testCaseStatementInSelectionWithTransformFunctionInThen()
throws Exception {
String sqlQuery =
"SELECT ArrDelay, CASE WHEN ArrDelay > 0 THEN ArrDelay WHEN ArrDelay < 0 THEN ArrDelay * -1 ELSE 0 END AS "
+ "ArrTimeDiff FROM mytable LIMIT 1000";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
JsonNode rows = response.get("resultTable").get("rows");
assertTrue(response.get("exceptions").isEmpty());
for (int i = 0; i < rows.size(); i++) {
int arrDelay = rows.get(i).get(0).asInt();
int arrDelayDiff = rows.get(i).get(1).asInt();
if (arrDelay > 0) {
assertEquals(arrDelay, arrDelayDiff);
} else {
assertEquals(arrDelay, arrDelayDiff * -1);
}
}
}
@Test
public void testCaseStatementWithLogicalTransformFunction()
throws Exception {
String sqlQuery = "SELECT ArrDelay" + ", CASE WHEN ArrDelay > 50 OR ArrDelay < 10 THEN 10 ELSE 0 END"
+ ", CASE WHEN ArrDelay < 50 AND ArrDelay >= 10 THEN 10 ELSE 0 END" + " FROM mytable LIMIT 1000";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
JsonNode rows = response.get("resultTable").get("rows");
assertTrue(response.get("exceptions").isEmpty());
for (int i = 0; i < rows.size(); i++) {
int row0 = rows.get(i).get(0).asInt();
int row1 = rows.get(i).get(1).asInt();
int row2 = rows.get(i).get(2).asInt();
if (row0 > 50 || row0 < 10) {
assertEquals(row1, 10);
} else {
assertEquals(row1, 0);
}
if (row0 < 50 && row0 >= 10) {
assertEquals(row2, 10);
} else {
assertEquals(row2, 0);
}
}
}
@Test
public void testCaseStatementWithInAggregation()
throws Exception {
testCountVsCaseQuery("origin = 'ATL'");
testCountVsCaseQuery("origin <> 'ATL'");
testCountVsCaseQuery("DaysSinceEpoch > 16312");
testCountVsCaseQuery("DaysSinceEpoch >= 16312");
testCountVsCaseQuery("DaysSinceEpoch < 16312");
testCountVsCaseQuery("DaysSinceEpoch <= 16312");
testCountVsCaseQuery("DaysSinceEpoch = 16312");
testCountVsCaseQuery("DaysSinceEpoch <> 16312");
}
private void testCountVsCaseQuery(String predicate)
throws Exception {
String sqlQuery = String.format("SELECT COUNT(*) FROM mytable WHERE %s", predicate);
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
long countValue = response.get("resultTable").get("rows").get(0).get(0).asLong();
sqlQuery = String.format("SELECT SUM(CASE WHEN %s THEN 1 ELSE 0 END) as sum1 FROM mytable", predicate);
response = postQuery(sqlQuery, _brokerBaseApiUrl);
long caseSum = response.get("resultTable").get("rows").get(0).get(0).asLong();
assertEquals(caseSum, countValue);
}
@Test
public void testFilterWithInvertedIndexUDF()
throws Exception {
int daysSinceEpoch = 16138;
long secondsSinceEpoch = 16138 * 24 * 60 * 60;
//@formatter:off
String[] origins = new String[]{
"ATL", "ORD", "DFW", "DEN", "LAX", "IAH", "SFO", "PHX", "LAS", "EWR", "MCO", "BOS", "SLC", "SEA", "MSP", "CLT",
"LGA", "DTW", "JFK", "BWI"
};
//@formatter:on
for (String origin : origins) {
String query =
"SELECT COUNT(*) FROM mytable WHERE Origin = '" + origin + "' AND DaysSinceEpoch = " + daysSinceEpoch;
JsonNode response1 = postQuery(query);
query = "SELECT COUNT(*) FROM mytable WHERE Origin = '" + origin
+ "' AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response2 = postQuery(query);
long value1 = response1.get("resultTable").get("rows").get(0).get(0).asLong();
long value2 = response2.get("resultTable").get("rows").get(0).get(0).asLong();
assertEquals(value1, value2);
}
}
@Test
public void testQueryWithRepeatedColumns()
throws Exception {
//test repeated columns in selection query
String query = "SELECT ArrTime, ArrTime FROM mytable WHERE DaysSinceEpoch <= 16312 AND Carrier = 'DL'";
testQuery(query);
//test repeated columns in selection query with order by
query = "SELECT ArrTime, ArrTime FROM mytable WHERE DaysSinceEpoch <= 16312 AND Carrier = 'DL' order by ArrTime";
testQuery(query);
//test repeated columns in agg query
query = "SELECT COUNT(*), COUNT(*) FROM mytable WHERE DaysSinceEpoch <= 16312 AND Carrier = 'DL'";
testQuery(query);
//test repeated columns in agg group by query
query = "SELECT ArrTime, ArrTime, COUNT(*), COUNT(*) FROM mytable WHERE DaysSinceEpoch <= 16312 AND Carrier = 'DL' "
+ "GROUP BY ArrTime, ArrTime";
testQuery(query);
}
@Test
public void testQueryWithOrderby()
throws Exception {
//test repeated columns in selection query
String query = "SELECT ArrTime, Carrier, DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
testQuery(query);
//test repeated columns in selection query
query = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable ORDER BY Carrier DESC";
testQuery(query);
//test repeated columns in selection query
query = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC";
testQuery(query);
}
@Test
public void testQueryWithAlias()
throws Exception {
{
//test same alias name with column name
String query = "SELECT ArrTime AS ArrTime, Carrier AS Carrier, DaysSinceEpoch AS DaysSinceEpoch FROM mytable "
+ "ORDER BY DaysSinceEpoch DESC";
testQuery(query);
query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier AS Carrier FROM mytable "
+ "ORDER BY Carrier DESC";
testQuery(query);
query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier AS Carrier FROM mytable "
+ "ORDER BY Carrier DESC, ArrTime DESC";
testQuery(query);
}
{
//test single alias
String query = "SELECT ArrTime, Carrier AS CarrierName, DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
testQuery(query);
query = "SELECT count(*) AS cnt, max(ArrTime) as maxArrTime FROM mytable";
testQuery(query);
query = "SELECT count(*) AS cnt, Carrier AS CarrierName FROM mytable GROUP BY CarrierName ORDER BY cnt";
testQuery(query);
}
{
//test multiple alias
String query =
"SELECT ArrTime, Carrier, Carrier AS CarrierName1, Carrier AS CarrierName2, DaysSinceEpoch FROM mytable "
+ "ORDER BY DaysSinceEpoch DESC";
testQuery(query);
query = "SELECT count(*) AS cnt, max(ArrTime) as maxArrTime1, max(ArrTime) as maxArrTime2 FROM mytable";
testQuery(query);
query = "SELECT count(*), count(*) AS cnt1, count(*) AS cnt2, Carrier AS CarrierName FROM mytable "
+ "GROUP BY CarrierName ORDER BY cnt2";
testQuery(query);
}
{
//test alias with distinct
String query =
"SELECT DISTINCT ArrTime, Carrier, Carrier AS CarrierName1, Carrier AS CarrierName2, DaysSinceEpoch "
+ "FROM mytable ORDER BY DaysSinceEpoch DESC";
testQuery(query);
query = "SELECT ArrTime, Carrier, Carrier AS CarrierName1, Carrier AS CarrierName2, DaysSinceEpoch FROM mytable "
+ "GROUP BY ArrTime, Carrier, DaysSinceEpoch ORDER BY DaysSinceEpoch DESC";
testQuery(query);
}
}
@AfterClass
public void tearDown()
throws Exception {
// Test instance decommission before tearing down
testInstanceDecommission();
// Brokers and servers has been stopped
stopController();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
private void testInstanceDecommission()
throws Exception {
// Fetch all instances
JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
JsonNode instanceList = response.get("instances");
int numInstances = instanceList.size();
// The total number of instances is equal to the sum of num brokers, num servers and 1 controller.
assertEquals(numInstances, getNumBrokers() + getNumServers() + 1);
// Try to delete a server that does not exist
String deleteInstanceRequest = _controllerRequestURLBuilder.forInstance("potato");
try {
sendDeleteRequest(deleteInstanceRequest);
fail("Delete should have returned a failure status (404)");
} catch (IOException e) {
// Expected exception on 404 status code
}
// Get the server name
String serverName = null;
String brokerName = null;
for (int i = 0; i < numInstances; i++) {
String instanceId = instanceList.get(i).asText();
InstanceType instanceType = InstanceTypeUtils.getInstanceType(instanceId);
if (instanceType == InstanceType.SERVER) {
serverName = instanceId;
} else if (instanceType == InstanceType.BROKER) {
brokerName = instanceId;
}
}
// Try to delete a live server
deleteInstanceRequest = _controllerRequestURLBuilder.forInstance(serverName);
try {
sendDeleteRequest(deleteInstanceRequest);
fail("Delete should have returned a failure status (409)");
} catch (IOException e) {
// Expected exception on 409 status code
}
// Stop servers
stopServer();
// Try to delete a server whose information is still on the ideal state
try {
sendDeleteRequest(deleteInstanceRequest);
fail("Delete should have returned a failure status (409)");
} catch (IOException e) {
// Expected exception on 409 status code
}
// Delete the table
dropOfflineTable(getTableName());
// Now, delete server should work
response = JsonUtils.stringToJsonNode(sendDeleteRequest(deleteInstanceRequest));
assertTrue(response.has("status"));
// Try to delete a broker whose information is still live
try {
deleteInstanceRequest = _controllerRequestURLBuilder.forInstance(brokerName);
sendDeleteRequest(deleteInstanceRequest);
fail("Delete should have returned a failure status (409)");
} catch (IOException e) {
// Expected exception on 409 status code
}
// Stop brokers
stopBroker();
// TODO: Add test to delete broker instance. Currently, stopBroker() does not work correctly.
// Check if '/INSTANCES/<serverName>' has been erased correctly
String instancePath = "/" + getHelixClusterName() + "/INSTANCES/" + serverName;
assertFalse(_propertyStore.exists(instancePath, 0));
// Check if '/CONFIGS/PARTICIPANT/<serverName>' has been erased correctly
String configPath = "/" + getHelixClusterName() + "/CONFIGS/PARTICIPANT/" + serverName;
assertFalse(_propertyStore.exists(configPath, 0));
}
@Test
public void testDistinctQuery()
throws Exception {
// by default 10 rows will be returned, so use high limit
String pinotQuery = "SELECT DISTINCT Carrier FROM mytable LIMIT 1000000";
String h2Query = "SELECT DISTINCT Carrier FROM mytable";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT DISTINCT Carrier, DestAirportID FROM mytable LIMIT 1000000";
h2Query = "SELECT DISTINCT Carrier, DestAirportID FROM mytable";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT DISTINCT Carrier, DestAirportID, DestStateName FROM mytable LIMIT 1000000";
h2Query = "SELECT DISTINCT Carrier, DestAirportID, DestStateName FROM mytable";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT DISTINCT Carrier, DestAirportID, DestCityName FROM mytable LIMIT 1000000";
h2Query = "SELECT DISTINCT Carrier, DestAirportID, DestCityName FROM mytable";
testQuery(pinotQuery, h2Query);
}
@Test
public void testNonAggregationGroupByQuery()
throws Exception {
// by default 10 rows will be returned, so use high limit
String pinotQuery = "SELECT Carrier FROM mytable GROUP BY Carrier LIMIT 1000000";
String h2Query = "SELECT Carrier FROM mytable GROUP BY Carrier";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, DestAirportID LIMIT 1000000";
h2Query = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, DestAirportID";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT Carrier, DestAirportID, DestStateName FROM mytable "
+ "GROUP BY Carrier, DestAirportID, DestStateName LIMIT 1000000";
h2Query =
"SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY Carrier, DestAirportID, DestStateName";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT Carrier, DestAirportID, DestCityName FROM mytable "
+ "GROUP BY Carrier, DestAirportID, DestCityName LIMIT 1000000";
h2Query = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY Carrier, DestAirportID, DestCityName";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT ArrTime-DepTime FROM mytable GROUP BY ArrTime, DepTime LIMIT 1000000";
h2Query = "SELECT ArrTime-DepTime FROM mytable GROUP BY ArrTime, DepTime";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT ArrTime-DepTime,ArrTime/3,DepTime*2 FROM mytable GROUP BY ArrTime, DepTime LIMIT 1000000";
h2Query = "SELECT ArrTime-DepTime,ArrTime/3,DepTime*2 FROM mytable GROUP BY ArrTime, DepTime";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT ArrTime+DepTime FROM mytable GROUP BY ArrTime + DepTime LIMIT 1000000";
h2Query = "SELECT ArrTime+DepTime FROM mytable GROUP BY ArrTime + DepTime";
testQuery(pinotQuery, h2Query);
}
@Test
public void testCaseInsensitivity()
throws Exception {
int daysSinceEpoch = 16138;
int hoursSinceEpoch = 16138 * 24;
int secondsSinceEpoch = 16138 * 24 * 60 * 60;
List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch "
+ "limit 10000",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert"
+ "(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
"SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH',"
+ "'1:HOURS')");
List<String> queries = new ArrayList<>();
baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
baseQueries.forEach(
q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
for (String query : queries) {
JsonNode response = postQuery(query);
assertTrue(response.get("numSegmentsProcessed").asLong() >= 1L, "Query: " + query + " failed");
}
}
@Test
public void testColumnNameContainsTableName()
throws Exception {
int daysSinceEpoch = 16138;
int hoursSinceEpoch = 16138 * 24;
int secondsSinceEpoch = 16138 * 24 * 60 * 60;
List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch "
+ "limit 10000",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert"
+ "(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
"SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH',"
+ "'1:HOURS')");
List<String> queries = new ArrayList<>();
baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch")));
baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch")));
for (String query : queries) {
JsonNode response = postQuery(query);
assertTrue(response.get("numSegmentsProcessed").asLong() >= 1L, "Query: " + query + " failed");
}
}
@Test
public void testCaseInsensitivityWithColumnNameContainsTableName()
throws Exception {
int daysSinceEpoch = 16138;
int hoursSinceEpoch = 16138 * 24;
int secondsSinceEpoch = 16138 * 24 * 60 * 60;
List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch "
+ "limit 10000",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert"
+ "(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
"SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH',"
+ "'1:HOURS')");
List<String> queries = new ArrayList<>();
baseQueries.forEach(
q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
// something like "SELECT MYDB.MYTABLE.DAYSSinceEpOch from MYDB.MYTABLE where MYDB.MYTABLE.DAYSSinceEpOch = 16138"
baseQueries.forEach(
q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
for (String query : queries) {
JsonNode response = postQuery(query);
assertTrue(response.get("numSegmentsProcessed").asLong() >= 1L, "Query: " + query + " failed");
}
}
@Test
public void testQuerySourceWithDatabaseName()
throws Exception {
// by default 10 rows will be returned, so use high limit
String pinotQuery = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000";
String h2Query = "SELECT DISTINCT Carrier FROM mytable";
testQuery(pinotQuery, h2Query);
pinotQuery = "SELECT DISTINCT Carrier FROM db.mytable LIMIT 1000000";
testQuery(pinotQuery, h2Query);
}
@Test
public void testDistinctCountHll()
throws Exception {
String query;
// The Accurate value is 6538.
query = "SELECT distinctCount(FlightNum) FROM mytable ";
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), 6538);
assertEquals(postQuery(query, _brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(), 6538);
// Expected distinctCountHll with different log2m value from 2 to 19. The Accurate value is 6538.
long[] expectedResults = new long[]{
3504, 6347, 8877, 9729, 9046, 7672, 7538, 6993, 6649, 6651, 6553, 6525, 6459, 6523, 6532, 6544, 6538, 6539
};
for (int i = 2; i < 20; i++) {
query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM mytable ", i);
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[i - 2]);
assertEquals(postQuery(query, _brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[i - 2]);
}
// Default HLL is set as log2m=12
query = "SELECT distinctCountHLL(FlightNum) FROM mytable ";
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[10]);
assertEquals(postQuery(query, _brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[10]);
}
@Test
public void testAggregationFunctionsWithUnderscore()
throws Exception {
String query;
// The Accurate value is 6538.
query = "SELECT distinct_count(FlightNum) FROM mytable";
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asInt(), 6538);
// The Accurate value is 115545.
query = "SELECT c_o_u_n_t(FlightNum) FROM mytable";
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asInt(), 115545);
}
@Test
public void testExplainPlanQuery()
throws Exception {
String query1 = "EXPLAIN PLAN FOR SELECT count(*) AS count, Carrier AS name FROM mytable GROUP BY name ORDER BY 1";
String response1 = postQuery(query1, _brokerBaseApiUrl).get("resultTable").toString();
// Replace string "docs:[0-9]+" with "docs:*" so that test doesn't fail when number of documents change. This is
// needed because both OfflineClusterIntegrationTest and MultiNodesOfflineClusterIntegrationTest run this test
// case with different number of documents in the segment.
response1 = response1.replaceAll("docs:[0-9]+", "docs:*");
assertEquals(response1, "{\"dataSchema\":{\"columnNames\":[\"Operator\",\"Operator_Id\",\"Parent_Id\"],"
+ "\"columnDataTypes\":[\"STRING\",\"INT\",\"INT\"]},"
+ "\"rows\":[[\"BROKER_REDUCE(sort:[count(*) ASC],limit:10)\",1,0],[\"COMBINE_GROUP_BY\",2,1],"
+ "[\"PLAN_START(numSegmentsForThisPlan:1)\",-1,-1],"
+ "[\"GROUP_BY(groupKeys:Carrier, aggregations:count(*))\",3,2],"
+ "[\"TRANSFORM_PASSTHROUGH(Carrier)\",4,3],[\"PROJECT(Carrier)\",5,4],[\"DOC_ID_SET\",6,5],"
+ "[\"FILTER_MATCH_ENTIRE_SEGMENT(docs:*)\",7,6]]}");
// In the query below, FlightNum column has an inverted index and there is no data satisfying the predicate
// "FlightNum < 0". Hence, all segments are pruned out before query execution on the server side.
String query2 = "EXPLAIN PLAN FOR SELECT * FROM mytable WHERE FlightNum < 0";
String response2 = postQuery(query2, _brokerBaseApiUrl).get("resultTable").toString();
assertEquals(response2, "{\"dataSchema\":{\"columnNames\":[\"Operator\",\"Operator_Id\",\"Parent_Id\"],"
+ "\"columnDataTypes\":[\"STRING\",\"INT\",\"INT\"]},\"rows\":[[\"BROKER_REDUCE(limit:10)\",1,0],"
+ "[\"PLAN_START(numSegmentsForThisPlan:12)\",-1,-1],[\"ALL_SEGMENTS_PRUNED_ON_SERVER\",2,1]]}");
}
/** Test to make sure we are properly handling string comparisons in predicates. */
@Test
public void testStringComparisonInFilter()
throws Exception {
// compare two string columns.
String query1 = "SELECT count(*) FROM mytable WHERE OriginState = DestState";
String response1 = postQuery(query1, _brokerBaseApiUrl).get("resultTable").toString();
assertEquals(response1,
"{\"dataSchema\":{\"columnNames\":[\"count(*)\"],\"columnDataTypes\":[\"LONG\"]}," + "\"rows\":[[14011]]}");
// compare string function with string column.
String query2 = "SELECT count(*) FROM mytable WHERE trim(OriginState) = DestState";
String response2 = postQuery(query2, _brokerBaseApiUrl).get("resultTable").toString();
assertEquals(response2,
"{\"dataSchema\":{\"columnNames\":[\"count(*)\"],\"columnDataTypes\":[\"LONG\"]}," + "\"rows\":[[14011]]}");
// compare string function with string function.
String query3 = "SELECT count(*) FROM mytable WHERE substr(OriginState, 0, 1) = substr(DestState, 0, 1)";
String response3 = postQuery(query3, _brokerBaseApiUrl).get("resultTable").toString();
assertEquals(response3,
"{\"dataSchema\":{\"columnNames\":[\"count(*)\"],\"columnDataTypes\":[\"LONG\"]}," + "\"rows\":[[19755]]}");
}
/**
* Test queries that can be solved with {@link NonScanBasedAggregationOperator}.
*/
@Test
public void testNonScanAggregationQueries()
throws Exception {
String tableName = getTableName();
// Test queries with COUNT, MIN, MAX, MIN_MAX_RANGE
// Dictionary columns
// INT
String query = "SELECT COUNT(*) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN(ArrTime) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MAX(ArrTime) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN_MAX_RANGE(ArrTime) FROM " + tableName;
String h2Query = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
query = "SELECT COUNT(*), MIN(ArrTime), MAX(ArrTime), MIN_MAX_RANGE(ArrTime) FROM " + tableName;
h2Query = "SELECT COUNT(*), MIN(ArrTime), MAX(ArrTime), MAX(ArrTime)-MIN(ArrTime) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
// LONG
query = "SELECT MIN(AirlineID) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MAX(AirlineID) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN_MAX_RANGE(AirlineID) FROM " + tableName;
h2Query = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
query = "SELECT COUNT(*), MIN(AirlineID), MAX(AirlineID), MIN_MAX_RANGE(AirlineID) FROM " + tableName;
h2Query = "SELECT COUNT(*), MIN(AirlineID), MAX(AirlineID), MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
// FLOAT
query = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MAX(DepDelayMinutes) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN_MAX_RANGE(DepDelayMinutes) FROM " + tableName;
h2Query = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
query =
"SELECT COUNT(*), MIN(DepDelayMinutes), MAX(DepDelayMinutes), MIN_MAX_RANGE(DepDelayMinutes) FROM " + tableName;
h2Query =
"SELECT COUNT(*), MIN(DepDelayMinutes), MAX(DepDelayMinutes), MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM "
+ tableName;
testNonScanAggregationQuery(query, h2Query);
// DOUBLE
query = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MAX(ArrDelayMinutes) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN_MAX_RANGE(ArrDelayMinutes) FROM " + tableName;
h2Query = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
query =
"SELECT COUNT(*), MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MIN_MAX_RANGE(ArrDelayMinutes) FROM " + tableName;
h2Query =
"SELECT COUNT(*), MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM "
+ tableName;
testNonScanAggregationQuery(query, h2Query);
// STRING
// TODO: add test cases for string column when we add support for min and max on string datatype columns
// Non dictionary columns
// INT
query = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MAX(ActualElapsedTime) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN_MAX_RANGE(ActualElapsedTime) FROM " + tableName;
h2Query = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
query = "SELECT COUNT(*), MIN(ActualElapsedTime), MAX(ActualElapsedTime), MIN_MAX_RANGE(ActualElapsedTime) FROM "
+ tableName;
h2Query = "SELECT COUNT(*), MIN(ActualElapsedTime), MAX(ActualElapsedTime), "
+ "MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
// FLOAT
query = "SELECT MIN(ArrDelay) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MAX(ArrDelay) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN_MAX_RANGE(ArrDelay) FROM " + tableName;
h2Query = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
query = "SELECT COUNT(*), MIN(ArrDelay), MAX(ArrDelay), MIN_MAX_RANGE(ArrDelay) FROM " + tableName;
h2Query = "SELECT COUNT(*), MIN(ArrDelay), MAX(ArrDelay), MAX(ArrDelay)-MIN(ArrDelay) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
// DOUBLE
query = "SELECT MIN(DepDelay) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MAX(DepDelay) FROM " + tableName;
testNonScanAggregationQuery(query);
query = "SELECT MIN_MAX_RANGE(DepDelay) FROM " + tableName;
h2Query = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
query = "SELECT COUNT(*), MIN(DepDelay), MAX(DepDelay), MIN_MAX_RANGE(DepDelay) FROM " + tableName;
h2Query = "SELECT COUNT(*), MIN(DepDelay), MAX(DepDelay), MAX(DepDelay)-MIN(DepDelay) FROM " + tableName;
testNonScanAggregationQuery(query, h2Query);
// STRING
// TODO: add test cases for string column when we add support for min and max on string datatype columns
// Multiple aggregation functions, mix of dictionary based and metadata based: answered by
// NonScanBasedAggregationOperator
query = "SELECT MIN(ArrTime), MAX(ActualElapsedTime) FROM " + tableName;
testNonScanAggregationQuery(query);
// Group-by in query: not answered by NonScanBasedAggregationOperator
query = "SELECT MAX(ArrTime) FROM " + tableName + " GROUP BY DaysSinceEpoch";
JsonNode response = postQuery(query);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
// Filter in query: not answered by NonScanBasedAggregationOperator
query = "SELECT MAX(ArrTime) FROM " + tableName + " WHERE DepDelay > 100";
response = postQuery(query);
assertTrue(response.get("numEntriesScannedInFilter").asLong() > 0);
assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
}
private void testNonScanAggregationQuery(String query)
throws Exception {
testNonScanAggregationQuery(query, null);
}
private void testNonScanAggregationQuery(String pinotQuery, @Nullable String h2Query)
throws Exception {
testQuery(pinotQuery, h2Query != null ? h2Query : pinotQuery);
JsonNode response = postQuery(pinotQuery);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
assertEquals(response.get("numDocsScanned").asLong(), response.get("totalDocs").asLong());
}
@Test
@Override
public void testHardcodedServerPartitionedSqlQueries()
throws Exception {
super.testHardcodedServerPartitionedSqlQueries();
}
@Test
public void testAggregateMetadataAPI()
throws IOException {
JsonNode oneSVColumnResponse = JsonUtils.stringToJsonNode(
sendGetRequest(_controllerBaseApiUrl + "/tables/mytable/metadata?columns=DestCityMarketID"));
// DestCityMarketID is a SV column
validateMetadataResponse(oneSVColumnResponse, 1, 0);
JsonNode oneMVColumnResponse = JsonUtils.stringToJsonNode(
sendGetRequest(_controllerBaseApiUrl + "/tables/mytable/metadata?columns=DivLongestGTimes"));
// DivLongestGTimes is a MV column
validateMetadataResponse(oneMVColumnResponse, 1, 1);
JsonNode threeSVColumnsResponse = JsonUtils.stringToJsonNode(sendGetRequest(_controllerBaseApiUrl
+ "/tables/mytable/metadata?columns=DivActualElapsedTime&columns=CRSElapsedTime&columns=OriginStateName"));
validateMetadataResponse(threeSVColumnsResponse, 3, 0);
JsonNode threeSVColumnsWholeEncodedResponse = JsonUtils.stringToJsonNode(sendGetRequest(
_controllerBaseApiUrl + "/tables/mytable/metadata?columns="
+ "DivActualElapsedTime%26columns%3DCRSElapsedTime%26columns%3DOriginStateName"));
validateMetadataResponse(threeSVColumnsWholeEncodedResponse, 3, 0);
JsonNode threeMVColumnsResponse = JsonUtils.stringToJsonNode(sendGetRequest(_controllerBaseApiUrl
+ "/tables/mytable/metadata?columns=DivLongestGTimes&columns=DivWheelsOns&columns=DivAirports"));
validateMetadataResponse(threeMVColumnsResponse, 3, 3);
JsonNode threeMVColumnsWholeEncodedResponse = JsonUtils.stringToJsonNode(sendGetRequest(
_controllerBaseApiUrl + "/tables/mytable/metadata?columns="
+ "DivLongestGTimes%26columns%3DDivWheelsOns%26columns%3DDivAirports"));
validateMetadataResponse(threeMVColumnsWholeEncodedResponse, 3, 3);
JsonNode zeroColumnResponse =
JsonUtils.stringToJsonNode(sendGetRequest(_controllerBaseApiUrl + "/tables/mytable/metadata"));
validateMetadataResponse(zeroColumnResponse, 0, 0);
JsonNode starColumnResponse =
JsonUtils.stringToJsonNode(sendGetRequest(_controllerBaseApiUrl + "/tables/mytable/metadata?columns=*"));
validateMetadataResponse(starColumnResponse, 82, 9);
JsonNode starEncodedColumnResponse =
JsonUtils.stringToJsonNode(sendGetRequest(_controllerBaseApiUrl + "/tables/mytable/metadata?columns=%2A"));
validateMetadataResponse(starEncodedColumnResponse, 82, 9);
JsonNode starWithExtraColumnResponse = JsonUtils.stringToJsonNode(sendGetRequest(
_controllerBaseApiUrl + "/tables/mytable/metadata?columns="
+ "CRSElapsedTime&columns=*&columns=OriginStateName"));
validateMetadataResponse(starWithExtraColumnResponse, 82, 9);
JsonNode starWithExtraEncodedColumnResponse = JsonUtils.stringToJsonNode(sendGetRequest(
_controllerBaseApiUrl + "/tables/mytable/metadata?columns="
+ "CRSElapsedTime&columns=%2A&columns=OriginStateName"));
validateMetadataResponse(starWithExtraEncodedColumnResponse, 82, 9);
JsonNode starWithExtraColumnWholeEncodedResponse = JsonUtils.stringToJsonNode(sendGetRequest(
_controllerBaseApiUrl + "/tables/mytable/metadata?columns="
+ "CRSElapsedTime%26columns%3D%2A%26columns%3DOriginStateName"));
validateMetadataResponse(starWithExtraColumnWholeEncodedResponse, 82, 9);
}
private void validateMetadataResponse(JsonNode response, int numTotalColumn, int numMVColumn) {
assertEquals(response.get(DISK_SIZE_IN_BYTES_KEY).asInt(), DISK_SIZE_IN_BYTES);
assertEquals(response.get(NUM_SEGMENTS_KEY).asInt(), NUM_SEGMENTS);
assertEquals(response.get(NUM_ROWS_KEY).asInt(), NUM_ROWS);
assertEquals(response.get(COLUMN_LENGTH_MAP_KEY).size(), numTotalColumn);
assertEquals(response.get(COLUMN_CARDINALITY_MAP_KEY).size(), numTotalColumn);
assertEquals(response.get(MAX_NUM_MULTI_VALUES_MAP_KEY).size(), numMVColumn);
}
@Test
public void testReset()
throws Exception {
super.testReset(TableType.OFFLINE);
}
@Test
public void testJDBCClient()
throws Exception {
String query = "SELECT count(*) FROM " + getTableName();
java.sql.Connection connection = getJDBCConnectionFromController(DEFAULT_CONTROLLER_PORT);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query);
resultSet.first();
Assert.assertTrue(resultSet.getLong(1) > 0);
connection = getJDBCConnectionFromBrokers(RANDOM.nextInt(), DEFAULT_BROKER_PORT);
statement = connection.createStatement();
resultSet = statement.executeQuery(query);
resultSet.first();
Assert.assertTrue(resultSet.getLong(1) > 0);
}
private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
throws Exception {
PinotDriver pinotDriver = new PinotDriver();
Properties jdbcProps = new Properties();
return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
}
private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
throws Exception {
PinotDriver pinotDriver = new PinotDriver();
Properties jdbcProps = new Properties();
jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
}
private String reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload)
throws IOException {
String jobId = null;
String response =
sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload), null);
String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
JsonNode tableLevelDetails =
JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
assertEquals("SUCCESS", isZKWriteSuccess);
jobId = tableLevelDetails.get("reloadJobId").asText();
String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(jobId));
JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
// Validate all fields are present
assertEquals(jobStatus.get("metadata").get("jobId").asText(), jobId);
assertEquals(jobStatus.get("metadata").get("jobType").asText(), "RELOAD_ALL_SEGMENTS");
assertEquals(jobStatus.get("metadata").get("tableName").asText(), tableNameWithType);
return jobId;
}
private boolean validateReloadJobSuccess(String reloadJobId)
throws IOException {
String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(reloadJobId));
JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
assertEquals(jobStatus.get("metadata").get("jobId").asText(), reloadJobId);
assertEquals(jobStatus.get("metadata").get("jobType").asText(), "RELOAD_ALL_SEGMENTS");
return jobStatus.get("totalSegmentCount").asInt() == jobStatus.get("successCount").asInt();
}
}