| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.integration.tests; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import javax.annotation.Nullable; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.helix.task.TaskState; |
| import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; |
| import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; |
| import org.apache.pinot.common.minion.MergeRollupTaskMetadata; |
| import org.apache.pinot.common.minion.MinionTaskMetadataUtils; |
| import org.apache.pinot.common.utils.SqlResultComparator; |
| import org.apache.pinot.common.utils.TarGzCompressionUtils; |
| import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; |
| import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; |
| import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; |
| import org.apache.pinot.core.common.MinionConstants; |
| import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; |
| import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; |
| import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; |
| import org.apache.pinot.spi.config.table.ColumnPartitionConfig; |
| import org.apache.pinot.spi.config.table.SegmentPartitionConfig; |
| import org.apache.pinot.spi.config.table.TableConfig; |
| import org.apache.pinot.spi.config.table.TableTaskConfig; |
| import org.apache.pinot.spi.config.table.TableType; |
| import org.apache.pinot.spi.data.Schema; |
| import org.apache.pinot.spi.utils.builder.TableConfigBuilder; |
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; |
| import org.apache.pinot.util.TestUtils; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| |
| |
| /** |
| * Integration test for minion task of type "MergeRollupTask" |
| */ |
| public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrationTest { |
| private static final String SINGLE_LEVEL_CONCAT_TEST_TABLE = "myTable1"; |
| private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2"; |
| private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3"; |
| private static final long TIMEOUT_IN_MS = 10_000L; |
| |
| protected PinotHelixTaskResourceManager _helixTaskResourceManager; |
| protected PinotTaskManager _taskManager; |
| protected PinotHelixResourceManager _pinotHelixResourceManager; |
| |
| protected final File _segmentDir1 = new File(_tempDir, "segmentDir1"); |
| protected final File _segmentDir2 = new File(_tempDir, "segmentDir2"); |
| protected final File _segmentDir3 = new File(_tempDir, "segmentDir3"); |
| protected final File _tarDir1 = new File(_tempDir, "tarDir1"); |
| protected final File _tarDir2 = new File(_tempDir, "tarDir2"); |
| protected final File _tarDir3 = new File(_tempDir, "tarDir3"); |
| |
| @BeforeClass |
| public void setUp() |
| throws Exception { |
| TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2, |
| _tarDir3); |
| |
| // Start the Pinot cluster |
| startZk(); |
| startController(); |
| startBrokers(1); |
| startServers(1); |
| |
| // Create and upload the schema and table config |
| Schema schema = createSchema(); |
| addSchema(schema); |
| TableConfig singleLevelConcatTableConfig = |
| createOfflineTableConfig(SINGLE_LEVEL_CONCAT_TEST_TABLE, getSingleLevelConcatTaskConfig()); |
| TableConfig singleLevelRollupTableConfig = |
| createOfflineTableConfig(SINGLE_LEVEL_ROLLUP_TEST_TABLE, getSingleLevelRollupTaskConfig(), |
| getMultiColumnsSegmentPartitionConfig()); |
| TableConfig multiLevelConcatTableConfig = |
| createOfflineTableConfig(MULTI_LEVEL_CONCAT_TEST_TABLE, getMultiLevelConcatTaskConfig()); |
| addTableConfig(singleLevelConcatTableConfig); |
| addTableConfig(singleLevelRollupTableConfig); |
| addTableConfig(multiLevelConcatTableConfig); |
| |
| // Unpack the Avro files |
| List<File> avroFiles = unpackAvroData(_tempDir); |
| |
| // Create and upload segments |
| ClusterIntegrationTestUtils |
| .buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, _tarDir1); |
| buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "1"); |
| buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2"); |
| ClusterIntegrationTestUtils |
| .buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, _tarDir3); |
| uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1); |
| uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2); |
| uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3); |
| |
| // Set up the H2 connection |
| setUpH2Connection(avroFiles); |
| |
| // Initialize the query generator |
| setUpQueryGenerator(avroFiles); |
| |
| startMinion(); |
| _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); |
| _taskManager = _controllerStarter.getTaskManager(); |
| _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager(); |
| } |
| |
| private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig) { |
| return createOfflineTableConfig(tableName, taskConfig, null); |
| } |
| |
| private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig, |
| @Nullable SegmentPartitionConfig partitionConfig) { |
| return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setSchemaName(getSchemaName()) |
| .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) |
| .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) |
| .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) |
| .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) |
| .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()) |
| .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) |
| .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build(); |
| } |
| |
| private TableTaskConfig getSingleLevelConcatTaskConfig() { |
| Map<String, String> tableTaskConfigs = new HashMap<>(); |
| tableTaskConfigs.put("100days.mergeType", "concat"); |
| tableTaskConfigs.put("100days.bufferTimePeriod", "1d"); |
| tableTaskConfigs.put("100days.bucketTimePeriod", "100d"); |
| tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000"); |
| tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000"); |
| tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min"); |
| tableTaskConfigs.put("WeatherDelay.aggregationType", "sum"); |
| return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); |
| } |
| |
| private TableTaskConfig getSingleLevelRollupTaskConfig() { |
| Map<String, String> tableTaskConfigs = new HashMap<>(); |
| tableTaskConfigs.put("150days.mergeType", "rollup"); |
| tableTaskConfigs.put("150days.bufferTimePeriod", "1d"); |
| tableTaskConfigs.put("150days.bucketTimePeriod", "150d"); |
| tableTaskConfigs.put("150days.roundBucketTimePeriod", "7d"); |
| return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); |
| } |
| |
| private TableTaskConfig getMultiLevelConcatTaskConfig() { |
| Map<String, String> tableTaskConfigs = new HashMap<>(); |
| tableTaskConfigs.put("45days.mergeType", "concat"); |
| tableTaskConfigs.put("45days.bufferTimePeriod", "1d"); |
| tableTaskConfigs.put("45days.bucketTimePeriod", "45d"); |
| tableTaskConfigs.put("45days.maxNumRecordsPerSegment", "100000"); |
| tableTaskConfigs.put("45days.maxNumRecordsPerTask", "100000"); |
| |
| tableTaskConfigs.put("90days.mergeType", "concat"); |
| tableTaskConfigs.put("90days.bufferTimePeriod", "1d"); |
| tableTaskConfigs.put("90days.bucketTimePeriod", "90d"); |
| tableTaskConfigs.put("90days.maxNumRecordsPerSegment", "100000"); |
| tableTaskConfigs.put("90days.maxNumRecordsPerTask", "100000"); |
| return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); |
| } |
| |
| private SegmentPartitionConfig getMultiColumnsSegmentPartitionConfig() { |
| Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>(); |
| ColumnPartitionConfig columnOneConfig = new ColumnPartitionConfig("murmur", 1); |
| columnPartitionConfigMap.put("AirlineID", columnOneConfig); |
| ColumnPartitionConfig columnTwoConfig = new ColumnPartitionConfig("murmur", 1); |
| columnPartitionConfigMap.put("Month", columnTwoConfig); |
| return new SegmentPartitionConfig(columnPartitionConfigMap); |
| } |
| |
| private static void buildSegmentsFromAvroWithPostfix(List<File> avroFiles, TableConfig tableConfig, |
| org.apache.pinot.spi.data.Schema schema, int baseSegmentIndex, File segmentDir, File tarDir, String postfix) |
| throws Exception { |
| int numAvroFiles = avroFiles.size(); |
| ExecutorService executorService = Executors.newFixedThreadPool(numAvroFiles); |
| List<Future<Void>> futures = new ArrayList<>(numAvroFiles); |
| for (int i = 0; i < numAvroFiles; i++) { |
| File avroFile = avroFiles.get(i); |
| int segmentIndex = i + baseSegmentIndex; |
| futures.add(executorService.submit(() -> { |
| SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); |
| segmentGeneratorConfig.setInputFilePath(avroFile.getPath()); |
| segmentGeneratorConfig.setOutDir(segmentDir.getPath()); |
| segmentGeneratorConfig.setTableName(tableConfig.getTableName()); |
| // Test segment with space and special character in the file name |
| segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + "_" + postfix); |
| |
| // Build the segment |
| SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); |
| driver.init(segmentGeneratorConfig); |
| driver.build(); |
| |
| // Tar the segment |
| String segmentName = driver.getSegmentName(); |
| File indexDir = new File(segmentDir, segmentName); |
| File segmentTarFile = new File(tarDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); |
| TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile); |
| return null; |
| })); |
| } |
| executorService.shutdown(); |
| for (Future<Void> future : futures) { |
| future.get(); |
| } |
| } |
| |
| /** |
| * Test single level concat task with maxNumRecordPerTask, maxNumRecordPerSegment constraints |
| */ |
| @Test |
| public void testSingleLevelConcat() |
| throws Exception { |
| // The original segments are time partitioned by month: |
| // segmentName (totalDocs) |
| // myTable1_16071_16101_3 (9746) |
| // myTable1_16102_16129_4 (8690) |
| // myTable1_16130_16159_5 (9621) |
| // myTable1_16160_16189_6 (9454) |
| // myTable1_16190_16220_7 (10329) |
| // myTable1_16221_16250_8 (10468) |
| // myTable1_16251_16281_9 (10499) |
| // myTable1_16282_16312_10 (10196) |
| // myTable1_16313_16342_11 (9136) |
| // myTable1_16343_16373_0 (9292) |
| // myTable1_16374_16404_1 (8736) |
| // myTable1_16405_16435_2 (9378) |
| |
| // Expected merge tasks and result segments: |
| // 1. |
| // {myTable1_16071_16101_3} |
| // -> {merged_100days_T1_0_myTable1_16071_16099_0, merged_100days_T1_0_myTable1_16100_16101_1} |
| // 2. |
| // {merged_100days_T1_0_myTable1_16100_16101_1, myTable1_16102_16129_4, myTable1_16130_16159_5} |
| // -> {merged_100days_T2_0_myTable1_16100_???_0(15000), merged_100days_T2_0_myTable1_???_16159_1} |
| // {myTable1_16160_16189_6, myTable1_16190_16220_7} |
| // -> {merged_100days_T2_1_myTable1_16160_16199_0, merged_100days_T2_1_myTable1_16200_16220_1} |
| // 3. |
| // {merged_100days_T2_1_myTable1_16200_16220_1, myTable1_16221_16250_8} |
| // -> {merged_100days_T3_0_myTable1_16200_???_0(15000), merged_100days_T3_0_myTable1_???_16250_1} |
| // {myTable1_16251_16281_9, myTable1_16282_16312_10} |
| // -> {merged_100days_T3_1_myTable1_16251_???_0(15000), merged_100days_T3_1_myTable1_???_16299_1, |
| // merged_100days_T3_1_myTable1_16300_16312_2} |
| // 4. |
| // {merged_100days_T3_1_myTable1_16300_16312_2, myTable1_16313_16342_11, myTable1_16343_16373_0} |
| // -> {merged_100days_T4_0_myTable1_16300_???_0(15000), merged_100days_T4_0_myTable1_???_16373_1} |
| // {myTable1_16374_16404_1} |
| // -> {merged_100days_T4_1_16374_16399_0, merged_100days_T4_1_16400_16404_1} |
| // 5. |
| // {merged_100days_T4_1_16400_16404_1, myTable1_16405_16435_2} |
| // -> {merged_100days_T5_0_myTable1_16400_16435_0} |
| |
| String sqlQuery = "SELECT count(*) FROM myTable1"; // 115545 rows for the test table |
| JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| int[] expectedNumSubTasks = {1, 2, 2, 2, 1}; |
| int[] expectedNumSegmentsQueried = {13, 12, 13, 13, 12}; |
| long expectedWatermark = 16000 * 86_400_000L; |
| String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE); |
| int numTasks = 0; |
| for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); |
| tasks != null; tasks = |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { |
| assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); |
| assertTrue(_helixTaskResourceManager.getTaskQueues() |
| .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); |
| // Will not schedule task if there's incomplete task |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); |
| waitForTaskToComplete(); |
| |
| // Check watermark |
| MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord( |
| _taskManager.getClusterInfoAccessor() |
| .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName)); |
| assertNotNull(minionTaskMetadata); |
| assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark); |
| expectedWatermark += 100 * 86_400_000L; |
| |
| // Check metadata of merged segments |
| for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { |
| if (metadata.getSegmentName().startsWith("merged")) { |
| // Check merged segment zk metadata |
| assertNotNull(metadata.getCustomMap()); |
| assertEquals("100days", |
| metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY)); |
| // Check merged segments are time partitioned |
| assertEquals(metadata.getEndTimeMs() / (86_400_000L * 100), metadata.getStartTimeMs() / (86_400_000L * 100)); |
| } |
| } |
| |
| final int finalNumTasks = numTasks; |
| TestUtils.waitForCondition(aVoid -> { |
| try { |
| // Check num total doc of merged segments are the same as the original segments |
| JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| if (!SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery)) { |
| return false; |
| } |
| // Check query routing |
| int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt(); |
| return numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks]; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }, TIMEOUT_IN_MS, "Timeout while validating segments"); |
| } |
| // Check total tasks |
| assertEquals(numTasks, 5); |
| |
| assertTrue(_controllerStarter.getControllerMetrics() |
| .containsGauge("mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days")); |
| |
| // Drop the table |
| dropOfflineTable(SINGLE_LEVEL_CONCAT_TEST_TABLE); |
| |
| // Check if the task metadata is cleaned up on table deletion |
| verifyTableDelete(offlineTableName); |
| } |
| |
| /** |
| * Test single level rollup task with duplicate data (original segments * 2) |
| */ |
| @Test |
| public void testSingleLevelRollup() |
| throws Exception { |
| // The original segments are time partitioned by month: |
| // segmentName (totalDocs) |
| // myTable2_16071_16101_3_1, myTable2_16071_16101_3_2 (9746) |
| // myTable2_16102_16129_4_1, myTable2_16102_16129_4_2 (8690) |
| // myTable2_16130_16159_5_1, myTable2_16130_16159_5_2 (9621) |
| // myTable2_16160_16189_6_1, myTable2_16160_16189_6_2 (9454) |
| // myTable2_16190_16220_7_1, myTable2_16190_16220_7_2 (10329) |
| // myTable2_16221_16250_8_1, myTable2_16221_16250_8_2 (10468) |
| // myTable2_16251_16281_9_1, myTable2_16251_16281_9_2 (10499) |
| // myTable2_16282_16312_10_1, myTable2_16282_16312_10_2 (10196) |
| // myTable2_16313_16342_11_1, myTable2_16313_16342_11_2 (9136) |
| // myTable2_16343_16373_0_1, myTable2_16343_16373_0_2 (9292) |
| // myTable2_16374_16404_1_1, myTable2_16374_16404_1_2 (8736) |
| // myTable2_16405_16435_2_1, myTable2_16405_16435_2_2 (9378) |
| |
| // Expected merge tasks and result segments: |
| // 1. |
| // {myTable2_16071_16101_3_1, myTable2_16071_16101_3_2, myTable2_16102_16129_4_1, myTable2_16102_16129_4_2, |
| // myTable2_16130_16159_5_1, myTable2_16130_16159_5_2, myTable2_16160_16189_6_1, myTable2_16160_16189_6_2 |
| // myTable2_16190_16220_7} |
| // -> {merged_150days_T1_0_myTable2_16065_16198_0, merged_150days_T1_0_myTable2_16205_16219_1} |
| // 2. |
| // {merged_150days_T1_0_myTable2_16205_16219_1, myTable2_16221_16250_8_1, myTable2_16221_16250_8_2, |
| // myTable2_16251_16281_9_1, myTable2_16251_16281_9_2, myTable2_16282_16312_10_1 |
| // myTable2_16282_16312_10_2, myTable2_16313_16342_11_1, myTable2_16313_16342_11_2, |
| // myTable2_16343_16373_0_1, myTable2_16343_16373_0_2} |
| // -> {merged_150days_1628644088146_0_myTable2_16205_16345_0, |
| // merged_150days_1628644088146_0_myTable2_16352_16373_1} |
| // 3. |
| // {merged_150days_1628644088146_0_myTable2_16352_16373_1, myTable2_16374_16404_1_1, myTable2_16374_16404_1_2 |
| // myTable2_16405_16435_2_1, myTable2_16405_16435_2_2} |
| // -> {merged_150days_1628644105127_0_myTable2_16352_16429_0} |
| |
| String sqlQuery = "SELECT count(*) FROM myTable2"; // 115545 rows for the test table |
| JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| int[] expectedNumSegmentsQueried = {16, 7, 3}; |
| long expectedWatermark = 16050 * 86_400_000L; |
| String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE); |
| int numTasks = 0; |
| for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); |
| tasks != null; tasks = |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { |
| assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), 1); |
| assertTrue(_helixTaskResourceManager.getTaskQueues() |
| .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); |
| // Will not schedule task if there's incomplete task |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); |
| waitForTaskToComplete(); |
| |
| // Check watermark |
| MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord( |
| _taskManager.getClusterInfoAccessor() |
| .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName)); |
| assertNotNull(minionTaskMetadata); |
| assertEquals((long) minionTaskMetadata.getWatermarkMap().get("150days"), expectedWatermark); |
| expectedWatermark += 150 * 86_400_000L; |
| |
| // Check metadata of merged segments |
| for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { |
| if (metadata.getSegmentName().startsWith("merged")) { |
| // Check merged segment zk metadata |
| assertNotNull(metadata.getCustomMap()); |
| assertEquals("150days", |
| metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY)); |
| // Check merged segments are time partitioned |
| assertEquals(metadata.getEndTimeMs() / (86_400_000L * 150), metadata.getStartTimeMs() / (86_400_000L * 150)); |
| } |
| } |
| |
| final int finalNumTasks = numTasks; |
| TestUtils.waitForCondition(aVoid -> { |
| try { |
| // Check total doc of merged segments are less than the original segments |
| JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| if (actualJson.get("resultTable").get("rows").get(0).get(0).asInt() >= expectedJson.get("resultTable") |
| .get("rows").get(0).get(0).asInt()) { |
| return false; |
| } |
| // Check query routing |
| int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt(); |
| return numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks]; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }, TIMEOUT_IN_MS, "Timeout while validating segments"); |
| } |
| |
| // Check total doc is half of the original after all merge tasks are finished |
| JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| assertEquals(actualJson.get("resultTable").get("rows").get(0).get(0).asInt(), |
| expectedJson.get("resultTable").get("rows").get(0).get(0).asInt() / 2); |
| // Check time column is rounded |
| JsonNode responseJson = |
| postQuery("SELECT count(*), DaysSinceEpoch FROM myTable2 GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch"); |
| for (int i = 0; i < responseJson.get("resultTable").get("rows").size(); i++) { |
| int daysSinceEpoch = responseJson.get("resultTable").get("rows").get(i).get(1).asInt(); |
| assertTrue(daysSinceEpoch % 7 == 0); |
| } |
| // Check total tasks |
| assertEquals(numTasks, 3); |
| |
| assertTrue(_controllerStarter.getControllerMetrics() |
| .containsGauge("mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days")); |
| } |
| |
| /** |
| * Test multi level concat task |
| */ |
| @Test |
| public void testMultiLevelConcat() |
| throws Exception { |
| // The original segments are time partitioned by month: |
| // segmentName (totalDocs) |
| // myTable3_16071_16101_3 (9746) |
| // myTable3_16102_16129_4 (8690) |
| // myTable3_16130_16159_5 (9621) |
| // myTable3_16160_16189_6 (9454) |
| // myTable3_16190_16220_7 (10329) |
| // myTable3_16221_16250_8 (10468) |
| // myTable3_16251_16281_9 (10499) |
| // myTable3_16282_16312_10 (10196) |
| // myTable3_16313_16342_11 (9136) |
| // myTable3_16343_16373_0 (9292) |
| // myTable3_16374_16404_1 (8736) |
| // myTable3_16405_16435_2 (9378) |
| |
| // Expected merge tasks and results: |
| // 1. |
| // 45days: {myTable3_16071_16101_3, myTable3_16102_16129_4} |
| // -> {merged_45days_T1_0_myTable3_16071_16109_0, merged_45days_T1_0_myTable3_16110_16129_1} |
| // watermark: {45days: 16065, 90days: null} |
| // 2. |
| // 45days: {merged_45days_T1_0_myTable3_16110_16129_1, myTable3_16130_16159_5} |
| // -> {merged_45days_T2_0_myTable3_16110_16154_0, merged_45days_T2_0_myTable3_16155_16159_1} |
| // 90days: {merged_45days_T1_0_myTable3_16071_16109_0} |
| // -> {merged_90days_T2_0_myTable3_16071_16109_0} |
| // watermark: {45days: 16110, 90days: 16020} |
| // 3. |
| // 45days: {merged_45days_T2_0_myTable3_16155_16159_1, myTable3_16160_16189_6, myTable3_16190_16220_7} |
| // -> {merged_45days_T3_0_myTable3_16155_16199_0, merged_45days_T3_0_myTable3_16200_16220_1} |
| // watermark: {45days: 16155, 90days: 16020} |
| // 4. |
| // 45days: {merged_45days_T3_-_myTable3_16200_16220_1, myTable3_16221_16250_8} |
| // -> {merged_45days_T4_0_myTable3_16200_16244_0, merged_45days_T4_0_myTable3_16245_16250_1} |
| // 90days: {merged_45days_T2_0_myTable3_16110_16154_0, merged_45days_T3_0_myTable3_16155_16199_0} |
| // -> {merged_90days_T4_0_myTable3_16110_16199_0} |
| // watermark: {45days: 16200, 90days: 16110} |
| // 5. |
| // 45days: {merged_45days_T4_0_myTable3_16245_16250_1, myTable3_16251_16281_9, myTable3_16282_16312_10} |
| // -> {merged_45days_T5_0_myTable3_16245_16289_0, merged_45days_T5_0_myTable3_16290_16312_1} |
| // watermark: {45days: 16245, 90days: 16110} |
| // 6. |
| // 45days: {merged_45days_T5_0_myTable3_16290_16312_1, myTable3_16313_16342_11} |
| // -> {merged_45days_T6_0_myTable3_16290_16334_0, merged_45days_T6_0_myTable3_16335_16342_1} |
| // 90days: {merged_45days_T4_0_myTable3_16200_16244_0, merged_45days_T5_0_myTable3_16245_16289_0} |
| // -> {merged_90days_T6_0_myTable3_16200_16289_0} |
| // watermark: {45days: 16290, 90days: 16200} |
| // 7. |
| // 45days: {merged_45days_T6_0_myTable3_16335_16342_1, myTable_16343_16373_0, myTable_16374_16404_1} |
| // -> {merged_45days_T7_0_myTable3_16335_16379_0, merged_45days_T7_0_myTable3_16380_16404_1} |
| // watermark: {45days: 16335, 90days: 16200} |
| // 8. |
| // 45days: {merged_45days_T7_0_myTable3_16380_16404_1, myTable3_16405_16435_2} |
| // -> {merged_45days_T8_0_myTable3_16380_16424_0, merged_45days_T8_1_myTable3_16425_16435_1} |
| // 90days: {merged_45days_T6_0_myTable3_16290_16334_0, merged_45days_T7_0_myTable3_16335_16379_0} |
| // -> {merged_90days_T8_0_myTable3_16290_16379_0} |
| // watermark: {45days:16380, 90days: 16290} |
| // 9. |
| // 45days: no segment left, not scheduling |
| // 90days: [16380, 16470) is not a valid merge window because windowEndTime > 45days watermark, not scheduling |
| |
| String sqlQuery = "SELECT count(*) FROM myTable3"; // 115545 rows for the test table |
| JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| int[] expectedNumSubTasks = {1, 2, 1, 2, 1, 2, 1, 2, 1}; |
| int[] expectedNumSegmentsQueried = {12, 12, 11, 10, 9, 8, 7, 6, 5}; |
| Long[] expectedWatermarks45Days = {16065L, 16110L, 16155L, 16200L, 16245L, 16290L, 16335L, 16380L}; |
| Long[] expectedWatermarks90Days = {null, 16020L, 16020L, 16110L, 16110L, 16200L, 16200L, 16290L}; |
| for (int i = 0; i < expectedWatermarks45Days.length; i++) { |
| expectedWatermarks45Days[i] *= 86_400_000L; |
| } |
| for (int i = 1; i < expectedWatermarks90Days.length; i++) { |
| expectedWatermarks90Days[i] *= 86_400_000L; |
| } |
| |
| String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(MULTI_LEVEL_CONCAT_TEST_TABLE); |
| int numTasks = 0; |
| for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); |
| tasks != null; tasks = |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { |
| assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); |
| assertTrue(_helixTaskResourceManager.getTaskQueues() |
| .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); |
| // Will not schedule task if there's incomplete task |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); |
| waitForTaskToComplete(); |
| |
| // Check watermark |
| MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord( |
| _taskManager.getClusterInfoAccessor() |
| .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName)); |
| assertNotNull(minionTaskMetadata); |
| assertEquals(minionTaskMetadata.getWatermarkMap().get("45days"), expectedWatermarks45Days[numTasks]); |
| assertEquals(minionTaskMetadata.getWatermarkMap().get("90days"), expectedWatermarks90Days[numTasks]); |
| |
| // Check metadata of merged segments |
| for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { |
| if (metadata.getSegmentName().startsWith("merged")) { |
| // Check merged segment zk metadata |
| assertNotNull(metadata.getCustomMap()); |
| if (metadata.getSegmentName().startsWith("merged_45days")) { |
| assertEquals("45days", |
| metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY)); |
| assertEquals(metadata.getEndTimeMs() / (86_400_000L * 45), metadata.getStartTimeMs() / (86_400_000L * 45)); |
| } |
| if (metadata.getSegmentName().startsWith("merged_90days")) { |
| assertEquals("90days", |
| metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY)); |
| assertEquals(metadata.getEndTimeMs() / (86_400_000L * 90), metadata.getStartTimeMs() / (86_400_000L * 90)); |
| } |
| } |
| } |
| |
| final int finalNumTasks = numTasks; |
| TestUtils.waitForCondition(aVoid -> { |
| try { |
| // Check total doc of merged segments are the same as the original segments |
| JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| if (!SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery)) { |
| return false; |
| } |
| // Check query routing |
| int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt(); |
| return numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks]; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }, TIMEOUT_IN_MS, "Timeout while validating segments"); |
| } |
| // Check total tasks |
| assertEquals(numTasks, 8); |
| |
| assertTrue(_controllerStarter.getControllerMetrics() |
| .containsGauge("mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days")); |
| assertTrue(_controllerStarter.getControllerMetrics() |
| .containsGauge("mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days")); |
| } |
| |
| protected void verifyTableDelete(String tableNameWithType) { |
| TestUtils.waitForCondition(input -> { |
| // Check if the segment lineage is cleaned up |
| if (SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType) != null) { |
| return false; |
| } |
| // Check if the task metadata is cleaned up |
| if (MinionTaskMetadataUtils |
| .fetchTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType) != null) { |
| return false; |
| } |
| return true; |
| }, 1_000L, 60_000L, "Failed to delete table"); |
| } |
| |
| protected void waitForTaskToComplete() { |
| TestUtils.waitForCondition(input -> { |
| // Check task state |
| for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE) |
| .values()) { |
| if (taskState != TaskState.COMPLETED) { |
| return false; |
| } |
| } |
| return true; |
| }, 600_000L, "Failed to complete task"); |
| } |
| |
| @AfterClass |
| public void tearDown() |
| throws Exception { |
| stopMinion(); |
| stopServer(); |
| stopBroker(); |
| stopController(); |
| stopZk(); |
| FileUtils.deleteDirectory(_tempDir); |
| } |
| } |