| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.integration.tests; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import java.io.File; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.helix.task.TaskState; |
| import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; |
| import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; |
| import org.apache.pinot.common.minion.MinionTaskMetadataUtils; |
| import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; |
| import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; |
| import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; |
| import org.apache.pinot.core.common.MinionConstants; |
| import org.apache.pinot.minion.MinionContext; |
| import org.apache.pinot.spi.config.table.TableConfig; |
| import org.apache.pinot.spi.config.table.TableTaskConfig; |
| import org.apache.pinot.spi.data.Schema; |
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; |
| import org.apache.pinot.util.TestUtils; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| import static org.testng.Assert.*; |
| |
| /** |
| * Integration test for minion task of type "PurgeTask" |
| */ |
| public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTest { |
| private static final String PURGE_FIRST_RUN_TABLE = "myTable1"; |
| private static final String PURGE_DELTA_PASSED_TABLE = "myTable2"; |
| private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3"; |
| |
| protected PinotHelixTaskResourceManager _helixTaskResourceManager; |
| protected PinotTaskManager _taskManager; |
| protected PinotHelixResourceManager _pinotHelixResourceManager; |
| protected String _tableName; |
| |
| protected final File _segmentDir1 = new File(_tempDir, "segmentDir1"); |
| protected final File _segmentDir2 = new File(_tempDir, "segmentDir2"); |
| protected final File _segmentDir3 = new File(_tempDir, "segmentDir3"); |
| |
| protected final File _tarDir1 = new File(_tempDir, "tarDir1"); |
| protected final File _tarDir2 = new File(_tempDir, "tarDir2"); |
| protected final File _tarDir3 = new File(_tempDir, "tarDir3"); |
| |
| @BeforeClass |
| public void setUp() |
| throws Exception { |
| TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _tarDir1, |
| _segmentDir2, _tarDir2, _segmentDir3, _tarDir3); |
| |
| // Start the Pinot cluster |
| startZk(); |
| startController(); |
| startBrokers(1); |
| startServers(1); |
| |
| // Create and upload the schema and table config |
| Schema schema = createSchema(); |
| addSchema(schema); |
| setTableName(PURGE_DELTA_NOT_PASSED_TABLE); |
| TableConfig purgeDeltaNotPassedTableConfig = createOfflineTableConfig(); |
| purgeDeltaNotPassedTableConfig.setTaskConfig(getPurgeTaskConfig()); |
| setTableName(PURGE_FIRST_RUN_TABLE); |
| TableConfig purgeTableConfig = createOfflineTableConfig(); |
| purgeTableConfig.setTaskConfig(getPurgeTaskConfig()); |
| |
| setTableName(PURGE_DELTA_PASSED_TABLE); |
| TableConfig purgeDeltaPassedTableConfig = createOfflineTableConfig(); |
| purgeDeltaPassedTableConfig.setTaskConfig(getPurgeTaskConfig()); |
| |
| addTableConfig(purgeTableConfig); |
| addTableConfig(purgeDeltaPassedTableConfig); |
| addTableConfig(purgeDeltaNotPassedTableConfig); |
| |
| // Unpack the Avro files |
| List<File> avroFiles = unpackAvroData(_tempDir); |
| |
| // Create and upload segments |
| ClusterIntegrationTestUtils |
| .buildSegmentsFromAvro(avroFiles, purgeTableConfig, schema, 0, _segmentDir1, _tarDir1); |
| ClusterIntegrationTestUtils |
| .buildSegmentsFromAvro(avroFiles, purgeDeltaPassedTableConfig, |
| schema, 0, _segmentDir2, _tarDir2); |
| ClusterIntegrationTestUtils |
| .buildSegmentsFromAvro(avroFiles, purgeDeltaNotPassedTableConfig, |
| schema, 0, _segmentDir3, _tarDir3); |
| |
| uploadSegments(PURGE_FIRST_RUN_TABLE, _tarDir1); |
| uploadSegments(PURGE_DELTA_PASSED_TABLE, _tarDir2); |
| uploadSegments(PURGE_DELTA_NOT_PASSED_TABLE, _tarDir3); |
| |
| // Initialize the query generator |
| setUpQueryGenerator(avroFiles); |
| startMinion(); |
| setRecordPurger(); |
| _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); |
| _taskManager = _controllerStarter.getTaskManager(); |
| _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager(); |
| |
| //set up metadata on segment to check how code handle passed and not passed delay |
| String tablenameOfflineNotPassed = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE); |
| String tablenameOfflinePassed = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE); |
| |
| //set up passed delay |
| List<SegmentZKMetadata> segmentsZKMetadataDeltaPassed = _pinotHelixResourceManager |
| .getSegmentsZKMetadata(tablenameOfflinePassed); |
| |
| Map<String, String> customSegmentMetadataPassed = new HashMap<>(); |
| customSegmentMetadataPassed.put(MinionConstants.PurgeTask.TASK_TYPE |
| + MinionConstants.TASK_TIME_SUFFIX, String.valueOf(System.currentTimeMillis() - 88400000)); |
| |
| for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataDeltaPassed) { |
| segmentZKMetadata.setCustomMap(customSegmentMetadataPassed); |
| _pinotHelixResourceManager.updateZkMetadata(tablenameOfflinePassed, segmentZKMetadata); |
| } |
| //set up not passed delay |
| List<SegmentZKMetadata> segmentsZKMetadataDeltaNotPassed = _pinotHelixResourceManager |
| .getSegmentsZKMetadata(tablenameOfflineNotPassed); |
| Map<String, String> customSegmentMetadataNotPassed = new HashMap<>(); |
| customSegmentMetadataNotPassed.put(MinionConstants.PurgeTask.TASK_TYPE |
| + MinionConstants.TASK_TIME_SUFFIX, String.valueOf(System.currentTimeMillis() - 4000)); |
| for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataDeltaNotPassed) { |
| segmentZKMetadata.setCustomMap(customSegmentMetadataNotPassed); |
| _pinotHelixResourceManager.updateZkMetadata(tablenameOfflineNotPassed, segmentZKMetadata); |
| } |
| } |
| |
| private void setRecordPurger() { |
| MinionContext minionContext = MinionContext.getInstance(); |
| minionContext.setRecordPurgerFactory(rawTableName -> { |
| List<String> tableNames = Arrays.asList(PURGE_FIRST_RUN_TABLE, |
| PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE); |
| if (tableNames.contains(rawTableName)) { |
| return row -> row.getValue("Quarter").equals(1); |
| } else { |
| return null; |
| } |
| }); |
| } |
| |
| @Override |
| public String getTableName() { |
| return _tableName; |
| } |
| |
| public void setTableName(String tableName) { |
| _tableName = tableName; |
| } |
| |
| private TableTaskConfig getPurgeTaskConfig() { |
| Map<String, String> tableTaskConfigs = new HashMap<>(); |
| tableTaskConfigs.put(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD, "1d"); |
| return new TableTaskConfig(Collections.singletonMap(MinionConstants.PurgeTask.TASK_TYPE, tableTaskConfigs)); |
| } |
| |
| /** |
| * Test purge with no metadata on the segments (checking null safe implementation) |
| */ |
| @Test |
| public void testFirstRunPurge() |
| throws Exception { |
| // Expected purge task generation : |
| // 1. No previous purge run so all segment should be processed and purge metadata sould be added to the segments |
| // 2. Check that we cannot run on same time two purge generation ensuring running segment will be skipped |
| // 3. Check segment metadata to ensure purgeTime is updated into the metadata |
| // 4. Check after the first run of the purge if we rerun a purge task generation no task should be scheduled |
| // 5. Check the purge process itself by setting an expecting number of row |
| |
| String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE); |
| |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE); |
| assertTrue(_helixTaskResourceManager.getTaskQueues() |
| .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); |
| // Will not schedule task if there's incomplete task |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); |
| waitForTaskToComplete(); |
| |
| // check that metadat |
| for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { |
| // Check purgeTimeIn |
| assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE |
| + MinionConstants.TASK_TIME_SUFFIX)); |
| } |
| // should not reload a new purge as the last time purge is not greater than last + 1day (default purge delay) |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); |
| |
| // 28057 Rows with quarter = 1 |
| // 115545 Totals Rows |
| // Expecting 87488 to the final time |
| String sqlQuery = "SELECT count(*) FROM " + PURGE_FIRST_RUN_TABLE; |
| JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| assertTrue(actualJson.toString().contains("\"rows\":[[87488]]"), |
| "Expected results to contain: \"rows\":[[87488]] but found: " + actualJson); |
| |
| // Drop the table |
| dropOfflineTable(PURGE_FIRST_RUN_TABLE); |
| |
| // Check if the task metadata is cleaned up on table deletion |
| verifyTableDelete(offlineTableName); |
| } |
| |
| /** |
| * Test purge with passed delay |
| */ |
| @Test |
| public void testPassedDelayTimePurge() |
| throws Exception { |
| // Expected purge task generation : |
| // 1. The purge time on this test is greater than the threshold expected (863660000 > 1d (86400000) ) |
| // 2. Check that we cannot run on same time two purge generation ensuring running segment will be skipped |
| // 3. Check segment metadata to ensure purgeTime is updated into the metadata |
| // 4. Check after the first run of the purge if we rerun a purge task generation no task should be scheduled |
| // 5. Check the purge process itself by setting an expecting number of row |
| |
| String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE); |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE); |
| assertTrue(_helixTaskResourceManager.getTaskQueues() |
| .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); |
| // Will not schedule task if there's incomplete task |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); |
| waitForTaskToComplete(); |
| // check that metadata contains expected values |
| for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { |
| // Check purgeTimeIn |
| assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE |
| + MinionConstants.TASK_TIME_SUFFIX)); |
| //check that the purge have been run on these segments |
| assertTrue(System.currentTimeMillis() - Long.parseLong(metadata.getCustomMap() |
| .get(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)) < 86400000); |
| } |
| // should not reload a new purge as the last time purge is not greater than last + 1day (default purge delay) |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); |
| |
| // 28057 Rows with quarter = 1 |
| // 115545 Totals Rows |
| // Expecting 87488 to the final time |
| String sqlQuery = "SELECT count(*) FROM " + PURGE_DELTA_PASSED_TABLE; |
| JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| |
| assertTrue(actualJson.toString().contains("\"rows\":[[87488]]"), |
| "Expected results to contain: \"rows\":[[87488]] but found: " + actualJson); |
| |
| // Drop the table |
| dropOfflineTable(PURGE_DELTA_PASSED_TABLE); |
| |
| // Check if the task metadata is cleaned up on table deletion |
| verifyTableDelete(offlineTableName); |
| } |
| |
| /** |
| * Test purge with not passed delay |
| */ |
| @Test |
| public void testNotPassedDelayTimePurge() |
| throws Exception { |
| // Expected no purge task generation : |
| // 1. segment purge time is set to System.currentTimeMillis() - |
| // 4000 so a new purge should not be triggered as the delay is 1d 86400000 ms |
| // 2. Check no task Have been scheduled |
| // 3. Check segment metadata to ensure purgeTime is not updated into the metadata |
| // 4. Check the purge process itself have not been runned by setting an expecting number of row |
| |
| String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE); |
| |
| //no task should be schedule as the delay is not passed |
| assertNull( |
| _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); |
| for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { |
| assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE |
| + MinionConstants.TASK_TIME_SUFFIX)); |
| //check that the purge have not been run on these segments |
| assertTrue(System.currentTimeMillis() - Long.parseLong(metadata.getCustomMap() |
| .get(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)) > 4000); |
| assertTrue(System.currentTimeMillis() - Long.parseLong(metadata.getCustomMap() |
| .get(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)) < 86400000); |
| } |
| // 28057 Rows with quarter = 1 |
| // 115545 Totals Rows |
| // Expecting 87488 to the final time |
| String sqlQuery = "SELECT count(*) FROM " + PURGE_DELTA_NOT_PASSED_TABLE; |
| JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl); |
| |
| assertTrue(actualJson.toString().contains("\"rows\":[[115545]]"), |
| "Expected results to contain: \"rows\":[[115545]] but found: " + actualJson); |
| |
| // Drop the table |
| dropOfflineTable(PURGE_DELTA_NOT_PASSED_TABLE); |
| |
| // Check if the task metadata is cleaned up on table deletion |
| verifyTableDelete(offlineTableName); |
| } |
| |
| protected void verifyTableDelete(String tableNameWithType) { |
| TestUtils.waitForCondition(input -> { |
| // Check if the segment lineage is cleaned up |
| if (SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType) != null) { |
| return false; |
| } |
| // Check if the task metadata is cleaned up |
| if (MinionTaskMetadataUtils |
| .fetchTaskMetadata(_propertyStore, MinionConstants.PurgeTask.TASK_TYPE, tableNameWithType) != null) { |
| return false; |
| } |
| return true; |
| }, 1_000L, 60_000L, "Failed to delete table"); |
| } |
| |
| protected void waitForTaskToComplete() { |
| TestUtils.waitForCondition(input -> { |
| // Check task state |
| for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.PurgeTask.TASK_TYPE) |
| .values()) { |
| if (taskState != TaskState.COMPLETED) { |
| return false; |
| } |
| } |
| return true; |
| }, 600_000L, "Failed to complete task"); |
| } |
| |
| @AfterClass |
| public void tearDown() |
| throws Exception { |
| stopMinion(); |
| stopServer(); |
| stopBroker(); |
| stopController(); |
| stopZk(); |
| FileUtils.deleteDirectory(_tempDir); |
| } |
| } |