blob: 60acfe33df441e07a73ffc434e16473bdd7fa130 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
* Integration test that enables segment partition for the LLC real-time table.
*/
public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest {
private static final String PARTITION_COLUMN = "DestState";
// Number of documents in the first and second Avro file
private static final long NUM_DOCS_IN_FIRST_AVRO_FILE = 9292;
private static final long NUM_DOCS_IN_SECOND_AVRO_FILE = 8736;
private static final long NUM_DOCS_IN_THIRD_AVRO_FILE = 9378;
private List<File> _avroFiles;
private String _partitionColumn;
private long _countStarResult;
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
// Start the Pinot cluster
startZk();
startController();
startBroker();
startServer();
// Start Kafka
startKafka();
// Unpack the Avro files
_avroFiles = unpackAvroData(_tempDir);
// Create and upload the schema and table config with reduced number of columns and partition config
Schema schema = new Schema.SchemaBuilder().setSchemaName(getSchemaName())
.addSingleValueDimension(PARTITION_COLUMN, DataType.STRING)
.addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build();
addSchema(schema);
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
indexingConfig.setSegmentPartitionConfig(new SegmentPartitionConfig(
Collections.singletonMap(PARTITION_COLUMN, new ColumnPartitionConfig("murmur", 2))));
tableConfig.setRoutingConfig(
new RoutingConfig(null, Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null));
addTableConfig(tableConfig);
// Push data into Kafka (only ingest the first Avro file)
_partitionColumn = PARTITION_COLUMN;
pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(0)));
// Wait for all documents loaded
_countStarResult = NUM_DOCS_IN_FIRST_AVRO_FILE;
waitForAllDocsLoaded(600_000L);
}
@Override
protected long getCountStarResult() {
return _countStarResult;
}
@Override
protected boolean useLlc() {
return true;
}
@Nullable
@Override
protected String getPartitionColumn() {
return _partitionColumn;
}
@Nullable
@Override
protected String getSortedColumn() {
return null;
}
@Nullable
@Override
protected List<String> getInvertedIndexColumns() {
return null;
}
@Nullable
@Override
protected List<String> getNoDictionaryColumns() {
return null;
}
@Nullable
@Override
protected List<String> getRangeIndexColumns() {
return null;
}
@Nullable
@Override
protected List<String> getBloomFilterColumns() {
return null;
}
@Test
public void testPartitionMetadata() {
int[] numSegmentsForPartition = new int[2];
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(realtimeTableName);
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
assertNotNull(segmentPartitionMetadata);
Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
segmentPartitionMetadata.getColumnPartitionMap();
assertEquals(columnPartitionMetadataMap.size(), 1);
ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get(PARTITION_COLUMN);
assertNotNull(columnPartitionMetadata);
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
int partitionGroupId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(partitionGroupId));
numSegmentsForPartition[partitionGroupId]++;
}
// There should be 2 segments for partition 0, 2 segments for partition 1
assertEquals(numSegmentsForPartition[0], 2);
assertEquals(numSegmentsForPartition[1], 2);
}
@Test(dependsOnMethods = "testPartitionMetadata")
public void testPartitionRouting()
throws Exception {
// Query partition 0
{
String query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'CA'";
JsonNode response = postQuery(query);
String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE DestState BETWEEN 'CA' AND 'CA'";
JsonNode responseToCompare = postQuery(queryToCompare);
// Should only query the segments for partition 0
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 2);
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 4);
assertEquals(response.get("resultTable").get("rows").get(0).get(0).asInt(),
responseToCompare.get("resultTable").get("rows").get(0).get(0).asInt());
}
// Query partition 1
{
String query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'FL'";
JsonNode response = postQuery(query);
String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE DestState BETWEEN 'FL' AND 'FL'";
JsonNode responseToCompare = postQuery(queryToCompare);
// Should only query the segments for partition 1
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 2);
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 4);
assertEquals(response.get("resultTable").get("rows").get(0).get(0).asInt(),
responseToCompare.get("resultTable").get("rows").get(0).get(0).asInt());
}
}
@Test(dependsOnMethods = "testPartitionRouting")
public void testNonPartitionedStream()
throws Exception {
// Push the second Avro file into Kafka without partitioning
_partitionColumn = null;
pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(1)));
// Wait for all documents loaded
_countStarResult += NUM_DOCS_IN_SECOND_AVRO_FILE;
waitForAllDocsLoaded(600_000L);
// Check partition metadata
int[] numSegmentsForPartition = new int[2];
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(realtimeTableName);
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
assertNotNull(segmentPartitionMetadata);
Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
segmentPartitionMetadata.getColumnPartitionMap();
assertEquals(columnPartitionMetadataMap.size(), 1);
ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get(PARTITION_COLUMN);
assertNotNull(columnPartitionMetadata);
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
int partitionGroupId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
numSegmentsForPartition[partitionGroupId]++;
if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
// For consuming segment, the partition metadata should only contain the stream partition
assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(partitionGroupId));
} else {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
int sequenceNumber = llcSegmentName.getSequenceNumber();
if (sequenceNumber == 0) {
// The partition metadata for the first completed segment should only contain the stream partition
assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(partitionGroupId));
} else {
// The partition metadata for the new completed segments should contain both partitions
assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1)));
}
}
}
// There should be 4 segments for partition 0, 4 segments for partition 1
assertEquals(numSegmentsForPartition[0], 4);
assertEquals(numSegmentsForPartition[1], 4);
// Check partition routing
int numSegments = segmentsZKMetadata.size();
// Query partition 0
{
String query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'CA'";
JsonNode response = postQuery(query);
String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE DestState BETWEEN 'CA' AND 'CA'";
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip the first completed segments and the consuming segment for partition 1
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments - 2);
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments);
// The result won't match because the consuming segment for partition 1 is pruned out
}
// Query partition 1
{
String query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'FL'";
JsonNode response = postQuery(query);
String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE DestState BETWEEN 'FL' AND 'FL'";
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip the first completed segments and the consuming segment for partition 0
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments - 2);
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments);
// The result won't match because the consuming segment for partition 0 is pruned out
}
// Push the third Avro file into Kafka with partitioning
_partitionColumn = PARTITION_COLUMN;
pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(2)));
// Wait for all documents loaded
_countStarResult += NUM_DOCS_IN_THIRD_AVRO_FILE;
waitForAllDocsLoaded(600_000L);
// Check partition metadata
numSegmentsForPartition = new int[2];
segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(realtimeTableName);
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
assertNotNull(segmentPartitionMetadata);
Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
segmentPartitionMetadata.getColumnPartitionMap();
assertEquals(columnPartitionMetadataMap.size(), 1);
ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get(PARTITION_COLUMN);
assertNotNull(columnPartitionMetadata);
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
int partitionGroupId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
numSegmentsForPartition[partitionGroupId]++;
if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
// For consuming segment, the partition metadata should only contain the stream partition
assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(partitionGroupId));
} else {
// The partition metadata for the new completed segments should only contain the stream partition
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
int sequenceNumber = llcSegmentName.getSequenceNumber();
if (sequenceNumber == 0 || sequenceNumber >= 4) {
// The partition metadata for the first and new completed segments should only contain the stream partition
assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(partitionGroupId));
} else {
// The partition metadata for the completed segments containing records from the second Avro file should
// contain both partitions
assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1)));
}
}
}
// There should be 6 segments for partition 0, 6 segments for partition 1
assertEquals(numSegmentsForPartition[0], 6);
assertEquals(numSegmentsForPartition[1], 6);
// Check partition routing
numSegments = segmentsZKMetadata.size();
// Query partition 0
{
String query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'CA'";
JsonNode response = postQuery(query);
String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE DestState BETWEEN 'CA' AND 'CA'";
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip 2 completed segments and the consuming segment for partition 1
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments - 3);
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments);
// The result should match again after all the segments with the non-partitioning records are committed
assertEquals(response.get("resultTable").get("rows").get(0).get(0).asInt(),
responseToCompare.get("resultTable").get("rows").get(0).get(0).asInt());
}
// Query partition 1
{
String query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'FL'";
JsonNode response = postQuery(query);
String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE DestState BETWEEN 'FL' AND 'FL'";
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip 2 completed segments and the consuming segment for partition 0
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments - 3);
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), numSegments);
// The result should match again after all the segments with the non-partitioning records are committed
assertEquals(response.get("resultTable").get("rows").get(0).get(0).asInt(),
responseToCompare.get("resultTable").get("rows").get(0).get(0).asInt());
}
}
@AfterClass
public void tearDown()
throws Exception {
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
}