blob: 2519a791a15acae7861e2d7dd1431e5ad532352c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
* Integration test that provides example of {@link PinotTaskGenerator} and {@link PinotTaskExecutor} and tests simple
* minion functionality.
*/
public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Accessed by the plug-in classes
public static final String TASK_TYPE = "TestTask";
public static final String TABLE_NAME_1 = "testTable1";
public static final String TABLE_NAME_2 = "testTable2";
public static final String TABLE_NAME_3 = "testTable3";
public static final int NUM_TASKS = 2;
public static final int NUM_CONFIGS = 4;
public static final AtomicBoolean HOLD = new AtomicBoolean();
public static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
public static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean();
public static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean();
public static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean();
private static final long STATE_TRANSITION_TIMEOUT_MS = 60_000L; // 1 minute
private static final long ZK_CALLBACK_TIMEOUT_MS = 30_000L; // 30 seconds
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotTaskManager _taskManager;
@BeforeClass
public void setUp()
throws Exception {
startZk();
startController();
startBroker();
startServer();
startMinion();
// Set task timeout in cluster config
PinotHelixResourceManager helixResourceManager = _controllerStarter.getHelixResourceManager();
helixResourceManager.getHelixAdmin().setConfig(
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
.forCluster(helixResourceManager.getHelixClusterName()).build(),
Collections.singletonMap(TASK_TYPE + MinionConstants.TIMEOUT_MS_KEY_SUFFIX, Long.toString(600_000L)));
// Add 3 offline tables, where 2 of them have TestTask enabled
TableTaskConfig taskConfig = new TableTaskConfig(Collections.singletonMap(TASK_TYPE, Collections.emptyMap()));
addTableConfig(
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setTaskConfig(taskConfig).build());
addTableConfig(
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_2).setTaskConfig(taskConfig).build());
addTableConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_3).build());
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
}
@Test
public void testTaskTimeout() {
PinotTaskGenerator taskGenerator = _taskManager.getTaskGeneratorRegistry().getTaskGenerator(TASK_TYPE);
assertNotNull(taskGenerator);
assertEquals(taskGenerator.getTaskTimeoutMs(), 600_000L);
}
private void verifyTaskCount(String task, int errors, int waiting, int running, int total) {
// Wait for at most 10 seconds for Helix to generate the tasks
TestUtils.waitForCondition((aVoid) -> {
PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
return taskCount.getError() == errors && taskCount.getWaiting() == waiting && taskCount.getRunning() == running
&& taskCount.getTotal() == total;
}, 10_000L, "Failed to reach expected task count");
}
@Test
public void testStopResumeDeleteTaskQueue() {
// Hold the task
HOLD.set(true);
// No tasks before we start.
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0);
// Should create the task queues and generate a task
String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
assertNotNull(task1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1));
// Since we have two tables, two sub-tasks are generated -- one for each table.
// The default concurrent sub-tasks per minion instance is 1, and we have one minion
// instance spun up. So, one sub-tasks gets scheduled in a minion, and the other one
// waits.
verifyTaskCount(task1, 0, 1, 1, 2);
// Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait
// since we have one minion instance that is still running one of the sub-tasks.
String task2 = _taskManager.scheduleTask(TASK_TYPE);
assertNotNull(task2);
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2));
verifyTaskCount(task2, 0, 2, 0, 2);
// Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
// Our test task generator does not generate if there are already this many sub-tasks in the
// running+waiting count already.
assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
assertNull(_taskManager.scheduleTask(TASK_TYPE));
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
assertEquals(taskStates.size(), NUM_TASKS);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.IN_PROGRESS) {
return false;
}
}
assertTrue(TASK_START_NOTIFIED.get());
assertFalse(TASK_SUCCESS_NOTIFIED.get());
assertFalse(TASK_CANCELLED_NOTIFIED.get());
assertFalse(TASK_ERROR_NOTIFIED.get());
return true;
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks IN_PROGRESS");
// Wait at most 30 seconds for ZK callback to update the controller gauges
ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
String inProgressGauge = TASK_TYPE + "." + TaskState.IN_PROGRESS;
String stoppedGauge = TASK_TYPE + "." + TaskState.STOPPED;
String completedGauge = TASK_TYPE + "." + TaskState.COMPLETED;
TestUtils.waitForCondition(
input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS
&& controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0,
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
// Stop the task queue
_helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
// Wait at most 60 seconds for all tasks STOPPED
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
assertEquals(taskStates.size(), NUM_TASKS);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.STOPPED) {
return false;
}
}
assertTrue(TASK_START_NOTIFIED.get());
assertFalse(TASK_SUCCESS_NOTIFIED.get());
assertTrue(TASK_CANCELLED_NOTIFIED.get());
assertFalse(TASK_ERROR_NOTIFIED.get());
return true;
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
// Wait at most 30 seconds for ZK callback to update the controller gauges
TestUtils.waitForCondition(
input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0,
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
// Task deletion requires the task queue to be stopped,
// so deleting task1 here before resuming the task queue.
assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
_helixTaskResourceManager.deleteTask(task1, false);
// Resume the task queue, and let the task complete
_helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
HOLD.set(false);
// Wait at most 60 seconds for all tasks COMPLETED
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
for (TaskState taskState : taskStates) {
if (taskState != TaskState.COMPLETED) {
return false;
}
}
// Task deletion happens eventually along with other state transitions.
assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
assertEquals(taskStates.size(), (NUM_TASKS - 1));
assertTrue(TASK_START_NOTIFIED.get());
assertTrue(TASK_SUCCESS_NOTIFIED.get());
assertTrue(TASK_CANCELLED_NOTIFIED.get());
assertFalse(TASK_ERROR_NOTIFIED.get());
return true;
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
// Wait at most 30 seconds for ZK callback to update the controller gauges
TestUtils.waitForCondition(
input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == (NUM_TASKS - 1),
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
// Delete the task queue
_helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
// Wait at most 60 seconds for task queue to be deleted
TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TASK_TYPE),
STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
// Wait at most 30 seconds for ZK callback to update the controller gauges
TestUtils.waitForCondition(
input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0,
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
}
@AfterClass
public void tearDown()
throws Exception {
dropOfflineTable(TABLE_NAME_1);
dropOfflineTable(TABLE_NAME_2);
dropOfflineTable(TABLE_NAME_3);
stopMinion();
stopServer();
stopBroker();
stopController();
stopZk();
}
}