blob: c0272f89544d8566df0a8a077e5c501c73951f4c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.apache.http.HttpStatus;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
* Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer.
* TODO: Add separate module-level tests and remove the randomness of this test
*/
public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime");
private static final long RANDOM_SEED = System.currentTimeMillis();
private static final Random RANDOM = new Random(RANDOM_SEED);
private final boolean _isDirectAlloc = RANDOM.nextBoolean();
private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
private final boolean _enableSplitCommit = RANDOM.nextBoolean();
private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
private final long _startTime = System.currentTimeMillis();
@Override
protected boolean injectTombstones() {
return true;
}
@Override
protected boolean useLlc() {
return true;
}
@Override
protected String getLoadMode() {
return ReadMode.mmap.name();
}
@Override
public void startController()
throws Exception {
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit);
startController(properties);
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
}
@Override
protected void overrideServerConf(PinotConfiguration configuration) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc);
if (_isConsumerDirConfigured) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
}
if (_enableSplitCommit) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
}
}
@Override
protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, TableConfig tableConfig)
throws Exception {
if (!_tarDir.exists()) {
_tarDir.mkdir();
}
if (!_segmentDir.exists()) {
_segmentDir.mkdir();
}
// create segments out of the avro files (segments will be placed in _tarDir)
List<File> copyOfAvroFiles = new ArrayList<>(avroFiles);
ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
// upload segments to controller
uploadSegmentsToController(getTableName(), _tarDir, false, false);
// upload the first segment again to verify refresh
uploadSegmentsToController(getTableName(), _tarDir, true, false);
// upload the first segment again to verify refresh with different segment crc
uploadSegmentsToController(getTableName(), _tarDir, true, true);
// add avro files to the original list so H2 will have the uploaded data as well
avroFiles.addAll(copyOfAvroFiles);
}
private void uploadSegmentsToController(String tableName, File tarDir, boolean onlyFirstSegment, boolean changeCrc)
throws Exception {
File[] segmentTarFiles = tarDir.listFiles();
assertNotNull(segmentTarFiles);
int numSegments = segmentTarFiles.length;
assertTrue(numSegments > 0);
if (onlyFirstSegment) {
numSegments = 1;
}
URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
if (numSegments == 1) {
File segmentTarFile = segmentTarFiles[0];
if (changeCrc) {
changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString());
}
assertEquals(fileUploadDownloadClient
.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName,
TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK);
} else {
// Upload segments in parallel
ExecutorService executorService = Executors.newFixedThreadPool(numSegments);
List<Future<Integer>> futures = new ArrayList<>(numSegments);
for (File segmentTarFile : segmentTarFiles) {
futures.add(executorService.submit(() -> fileUploadDownloadClient
.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName,
TableType.REALTIME).getStatusCode()));
}
executorService.shutdown();
for (Future<Integer> future : futures) {
assertEquals((int) future.get(), HttpStatus.SC_OK);
}
}
}
}
private void changeCrcInSegmentZKMetadata(String tableName, String segmentFilePath) {
int startIdx = segmentFilePath.indexOf("mytable_");
int endIdx = segmentFilePath.indexOf(".tar.gz");
String segmentName = segmentFilePath.substring(startIdx, endIdx);
String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
segmentZKMetadata.setCrc(111L);
_helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata);
}
@Override
protected long getCountStarResult() {
// all the data that was ingested from Kafka also got uploaded via the controller's upload endpoint
return super.getCountStarResult() * 2;
}
@BeforeClass
@Override
public void setUp()
throws Exception {
System.out.println(String.format(
"Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, "
+ "enableLeadControllerResource: %s", RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured,
_enableSplitCommit, _enableLeadControllerResource));
// Remove the consumer directory
FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
super.setUp();
}
@AfterClass
@Override
public void tearDown()
throws Exception {
FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
super.tearDown();
}
@Test
public void testConsumerDirectoryExists() {
File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME");
assertEquals(consumerDirectory.exists(), _isConsumerDirConfigured,
"The off heap consumer directory does not exist");
}
@Test
public void testSegmentFlushSize() {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
List<SegmentZKMetadata> segmentsZKMetadata =
ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, realtimeTableName);
for (SegmentZKMetadata segMetadata : segmentsZKMetadata) {
if (segMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.UPLOADED) {
assertEquals(segMetadata.getSizeThresholdToFlushSegment(),
getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
}
}
}
@Test
public void testInvertedIndexTriggering()
throws Exception {
long numTotalDocs = getCountStarResult();
JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
assertTrue(queryResponse.get("numEntriesScannedInFilter").asLong() > 0L);
TableConfig tableConfig = getRealtimeTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
updateTableConfig(tableConfig);
reloadRealtimeTable(getTableName());
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
// Total docs should not change during reload
assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
assertEquals(queryResponse1.get("numConsumingSegmentsQueried").asLong(), 2);
assertTrue(queryResponse1.get("minConsumingFreshnessTimeMs").asLong() > _startTime);
assertTrue(queryResponse1.get("minConsumingFreshnessTimeMs").asLong() < System.currentTimeMillis());
return queryResponse1.get("numEntriesScannedInFilter").asLong() == 0;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate inverted index");
}
@Test(expectedExceptions = IOException.class)
public void testAddHLCTableShouldFail()
throws IOException {
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
.setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build();
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
}
@Test
public void testReload()
throws Exception {
testReload(false);
}
@Test
public void testReset()
throws Exception {
super.testReset(TableType.REALTIME);
}
@Test
@Override
public void testHardcodedServerPartitionedSqlQueries()
throws Exception {
super.testHardcodedServerPartitionedSqlQueries();
}
}