blob: 447171e3aa29768c2a429f5f6024e841c0131d23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Integration test for minion task of type "RealtimeToOfflineSegmentsTask"
* With every task run, a new segment is created in the offline table for 1 day. Watermark also keeps progressing
* accordingly.
*/
public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends RealtimeClusterIntegrationTest {
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotTaskManager _taskManager;
private PinotHelixResourceManager _pinotHelixResourceManager;
private long _dataSmallestTimeMs;
private String _realtimeTableName;
private String _offlineTableName;
@Override
protected TableTaskConfig getTaskConfig() {
return new TableTaskConfig(
Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>()));
}
@Override
protected boolean useLlc() {
return true;
}
@BeforeClass
public void setUp()
throws Exception {
// Setup realtime table, and blank offline table
super.setUp();
addTableConfig(createOfflineTableConfig());
startMinion();
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
_pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
_realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
_offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(_realtimeTableName);
long minSegmentTimeMs = Long.MAX_VALUE;
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
minSegmentTimeMs = Math.min(minSegmentTimeMs, segmentZKMetadata.getStartTimeMs());
}
}
_dataSmallestTimeMs = minSegmentTimeMs;
}
@Test
public void testRealtimeToOfflineSegmentsTask()
throws IOException {
List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(_offlineTableName);
Assert.assertTrue(segmentsZKMetadata.isEmpty());
// The number of offline segments would be equal to the product of number of partitions for all the
// partition columns if segment partitioning is configured.
SegmentPartitionConfig segmentPartitionConfig =
getOfflineTableConfig().getIndexingConfig().getSegmentPartitionConfig();
int numOfflineSegmentsPerTask =
segmentPartitionConfig != null ? segmentPartitionConfig.getColumnPartitionMap().values().stream()
.map(ColumnPartitionConfig::getNumPartitions).reduce((a, b) -> a * b)
.orElseThrow(() -> new RuntimeException("Expected accumulated result but not found.")) : 1;
long expectedWatermark = _dataSmallestTimeMs + 86400000;
for (int i = 0; i < 3; i++) {
// Schedule task
Assert.assertNotNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
Assert.assertNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
// Wait at most 600 seconds for all tasks COMPLETED
waitForTaskToComplete(expectedWatermark);
// check segment is in offline
segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(_offlineTableName);
Assert.assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask * (i + 1)));
long expectedOfflineSegmentTimeMs = expectedWatermark - 86400000;
for (int j = (numOfflineSegmentsPerTask * i); j < segmentsZKMetadata.size(); j++) {
SegmentZKMetadata segmentZKMetadata = segmentsZKMetadata.get(j);
Assert.assertEquals(segmentZKMetadata.getStartTimeMs(), expectedOfflineSegmentTimeMs);
Assert.assertEquals(segmentZKMetadata.getEndTimeMs(), expectedOfflineSegmentTimeMs);
if (segmentPartitionConfig != null) {
Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(),
segmentPartitionConfig.getColumnPartitionMap().keySet());
for (String partitionColumn : segmentPartitionConfig.getColumnPartitionMap().keySet()) {
Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions(partitionColumn).size(), 1);
}
}
}
expectedWatermark += 86400000;
}
this.testHardcodedQueries();
// Delete the table
dropRealtimeTable(_realtimeTableName);
// Check if the metadata is cleaned up on table deletion
verifyTableDelete(_realtimeTableName);
}
@Nullable
@Override
protected SegmentPartitionConfig getSegmentPartitionConfig() {
Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
ColumnPartitionConfig columnOneConfig = new ColumnPartitionConfig("murmur", 3);
columnPartitionConfigMap.put("AirlineID", columnOneConfig);
ColumnPartitionConfig columnTwoConfig = new ColumnPartitionConfig("hashcode", 2);
columnPartitionConfigMap.put("OriginAirportID", columnTwoConfig);
return new SegmentPartitionConfig(columnPartitionConfigMap);
}
protected void verifyTableDelete(String tableNameWithType) {
TestUtils.waitForCondition(input -> {
// Check if the task metadata is cleaned up
if (MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore,
MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, tableNameWithType) != null) {
return false;
}
return true;
}, 1_000L, 60_000L, "Failed to delete table");
}
private void waitForTaskToComplete(long expectedWatermark) {
TestUtils.waitForCondition(input -> {
// Check task state
for (TaskState taskState : _helixTaskResourceManager.getTaskStates(
MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values()) {
if (taskState != TaskState.COMPLETED) {
return false;
}
}
return true;
}, 600_000L, "Failed to complete task");
// Check segment ZK metadata
ZNRecord znRecord = _taskManager.getClusterInfoAccessor()
.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _realtimeTableName);
RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
Assert.assertNotNull(minionTaskMetadata);
Assert.assertEquals(minionTaskMetadata.getWatermarkMs(), expectedWatermark);
}
@Test(enabled = false)
@Override
public void testDictionaryBasedQueries() {
}
@Test(enabled = false)
@Override
public void testHardcodedQueries() {
}
@Test(enabled = false)
@Override
public void testQueriesFromQueryFile() {
}
@Test(enabled = false)
@Override
public void testGeneratedQueries() {
}
@Test(enabled = false)
@Override
public void testQueryExceptions() {
}
@Test(enabled = false)
@Override
public void testInstanceShutdown() {
}
@AfterClass
public void tearDown()
throws Exception {
stopMinion();
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
}