blob: eb1e07a69e52127d25a50d85ea002c6506d7b586 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.executor;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEqualsNoOrder;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class QueryExecutorExceptionsTest {
private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
private static final String EMPTY_JSON_DATA_PATH = "data/test_empty_data.json";
private static final String QUERY_EXECUTOR_CONFIG_PATH = "conf/query-executor.properties";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
private static final String TABLE_NAME = "testTable";
private static final int NUM_SEGMENTS_TO_GENERATE = 2;
private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
private static final ExecutorService QUERY_RUNNERS = Executors.newFixedThreadPool(20);
private final List<ImmutableSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
private final List<String> _segmentNames = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
private ServerMetrics _serverMetrics;
private QueryExecutor _queryExecutor;
@BeforeClass
public void setUp()
throws Exception {
// Set up the segments
FileUtils.deleteQuietly(INDEX_DIR);
assertTrue(INDEX_DIR.mkdirs());
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
assertNotNull(resourceUrl);
File avroFile = new File(resourceUrl.getFile());
Schema schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(avroFile);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
for (int i = 0; i < NUM_SEGMENTS_TO_GENERATE; i++) {
SegmentGeneratorConfig config =
SegmentTestUtils.getSegmentGeneratorConfig(avroFile, FileFormat.AVRO, INDEX_DIR, TABLE_NAME, tableConfig,
schema);
config.setSegmentNamePostfix(Integer.toString(i));
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
driver.build();
IngestionSchemaValidator ingestionSchemaValidator = driver.getIngestionSchemaValidator();
assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
_indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
_segmentNames.add(driver.getSegmentName());
}
resourceUrl = getClass().getClassLoader().getResource(EMPTY_JSON_DATA_PATH);
assertNotNull(resourceUrl);
File jsonFile = new File(resourceUrl.getFile());
for (int i = NUM_SEGMENTS_TO_GENERATE; i < NUM_SEGMENTS_TO_GENERATE + NUM_EMPTY_SEGMENTS_TO_GENERATE; i++) {
SegmentGeneratorConfig config =
SegmentTestUtils.getSegmentGeneratorConfig(jsonFile, FileFormat.JSON, INDEX_DIR, TABLE_NAME, tableConfig,
schema);
config.setSegmentNamePostfix(Integer.toString(i));
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
driver.build();
_indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
_segmentNames.add(driver.getSegmentName());
}
// Mock the instance data manager
_serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class);
when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
TableDataManagerProvider.init(instanceDataManagerConfig);
@SuppressWarnings("unchecked")
TableDataManager tableDataManager =
TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance",
mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class), mock(HelixManager.class), null);
tableDataManager.start();
//we don't add index segments to the data manager to simulate numSegmentsAcquired < numSegmentsQueried
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
// Set up the query executor
resourceUrl = getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
assertNotNull(resourceUrl);
PropertiesConfiguration queryExecutorConfig = new PropertiesConfiguration();
queryExecutorConfig.setDelimiterParsingDisabled(false);
queryExecutorConfig.load(new File(resourceUrl.getFile()));
_queryExecutor = new ServerQueryExecutorV1Impl();
_queryExecutor.init(new PinotConfiguration(queryExecutorConfig), instanceDataManager, _serverMetrics);
}
/**
* Given some segments were missing, when a query is executed, then the correct error code is returned along with
* the list of missing segments.
*/
@Test
public void testServerSegmentMissingExceptionDetails() {
String query = "SELECT COUNT(*) FROM " + TABLE_NAME;
InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
Map<Integer, String> exceptions = instanceResponse.getExceptions();
assertTrue(exceptions.containsKey(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE));
String errorMessage = exceptions.get(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE);
String[] actualMissingSegments = StringUtils.splitByWholeSeparator(
errorMessage.substring(1 + errorMessage.indexOf('['), errorMessage.indexOf(']')), ", ");
String[] expectedMissingSegments = new String[]{"testTable_0", "testTable_1", "testTable_2", "testTable_3"};
assertEqualsNoOrder(actualMissingSegments, expectedMissingSegments);
}
@AfterClass
public void tearDown() {
for (IndexSegment segment : _indexSegments) {
segment.destroy();
}
FileUtils.deleteQuietly(INDEX_DIR);
}
private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
return new ServerQueryRequest(instanceRequest, _serverMetrics, System.currentTimeMillis());
}
}