blob: 6c63fd3a5a3d7483404cc819458ba743a12302d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.minion;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
@Test(groups = "stateless")
public class PinotTaskManagerStatelessTest extends ControllerTest {
private static final String RAW_TABLE_NAME = "myTable";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final long TIMEOUT_IN_MS = 10_000L;
private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManagerStatelessTest.class);
@BeforeClass
public void setUp()
throws Exception {
startZk();
}
@Test
public void testDefaultPinotTaskManagerNoScheduler()
throws Exception {
startController();
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
assertNull(taskManager.getScheduler());
stopController();
}
@Test
public void testSkipLateCronSchedule()
throws Exception {
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE, "true");
properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, "10");
startController(properties);
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
.addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
.addSingleValueDimension("complexMapStr", FieldSpec.DataType.STRING).build();
addSchema(schema);
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Scheduler scheduler = taskManager.getScheduler();
assertNotNull(scheduler);
// Add Table with one task.
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 * * ? * * *")))).build();
waitForEVToDisappear(tableConfig.getTableName());
addTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *", true, 10);
dropOfflineTable(RAW_TABLE_NAME);
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
stopFakeInstances();
stopController();
}
@Test
public void testPinotTaskManagerSchedulerWithUpdate()
throws Exception {
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
startController(properties);
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
.addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
.addSingleValueDimension("complexMapStr", FieldSpec.DataType.STRING).build();
addSchema(schema);
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Scheduler scheduler = taskManager.getScheduler();
assertNotNull(scheduler);
// 1. Add Table
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
waitForEVToDisappear(tableConfig.getTableName());
addTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *");
// 2. Update table to new schedule
tableConfig.setTaskConfig(new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */20 * ? * * *"))));
updateTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */20 * ? * * *");
// 3. Update table to new task and schedule
tableConfig.setTaskConfig(new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */30 * ? * * *"),
"MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
updateTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn.contains(
MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */30 * ? * * *");
validateJob(MinionConstants.MergeRollupTask.TASK_TYPE, "0 */10 * ? * * *");
// 4. Remove one task from the table
tableConfig.setTaskConfig(
new TableTaskConfig(ImmutableMap.of("MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
updateTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have MergeRollupTask only");
validateJob(MinionConstants.MergeRollupTask.TASK_TYPE, "0 */10 * ? * * *");
// 4. Drop table
dropOfflineTable(RAW_TABLE_NAME);
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
stopFakeInstances();
stopController();
}
@Test
public void testPinotTaskManagerSchedulerWithRestart()
throws Exception {
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
startController(properties);
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
.addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
.addSingleValueDimension("complexMapStr", FieldSpec.DataType.STRING).build();
addSchema(schema);
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Scheduler scheduler = taskManager.getScheduler();
assertNotNull(scheduler);
// Add Table with one task.
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
waitForEVToDisappear(tableConfig.getTableName());
addTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *");
// Restart controller.
restartController();
TestUtils.waitForCondition((aVoid) -> {
try {
long tableSize = getTableSize(OFFLINE_TABLE_NAME);
return tableSize == TableSizeReader.DEFAULT_SIZE_WHEN_MISSING_OR_ERROR;
} catch (Exception e) {
return false;
}
}, 5000L, "Failed to restart controller");
// Update table to add a new task
tableConfig.setTaskConfig(new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"),
"MergeRollupTask", ImmutableMap.of("schedule", "0 */20 * ? * * *"))));
updateTableConfig(tableConfig);
// Task is put into table config.
TableConfig tableConfigAfterRestart =
_controllerStarter.getHelixResourceManager().getTableConfig(OFFLINE_TABLE_NAME);
Map<String, String> taskCfgs =
tableConfigAfterRestart.getTaskConfig().getConfigsForTaskType(MinionConstants.MergeRollupTask.TASK_TYPE);
assertTrue(taskCfgs.containsKey("schedule"));
// The new MergeRollup task wouldn't be scheduled if not eagerly checking table configs
// after setting up subscriber on ChildChanges zk event when controller gets restarted.
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn.contains(
MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");
dropOfflineTable(RAW_TABLE_NAME);
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
stopFakeInstances();
stopController();
}
private void waitForJobGroupNames(PinotTaskManager taskManager, Predicate<List<String>> predicate,
String errorMessage) {
TestUtils.waitForCondition(aVoid -> {
try {
Scheduler scheduler = taskManager.getScheduler();
List<String> jobGroupNames = scheduler.getJobGroupNames();
return predicate.test(jobGroupNames);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}, TIMEOUT_IN_MS, errorMessage);
}
private void validateJob(String taskType, String cronExpression)
throws Exception {
validateJob(taskType, cronExpression, false, 600);
}
private void validateJob(String taskType, String cronExpression, boolean skipLateCronSchedule, int maxDelayInSeconds)
throws Exception {
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Scheduler scheduler = taskManager.getScheduler();
assert scheduler != null;
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.groupEquals(taskType));
assertEquals(jobKeys.size(), 1);
JobKey jobKey = jobKeys.iterator().next();
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
assertEquals(jobDetail.getJobClass(), CronJobScheduleJob.class);
assertEquals(jobDetail.getKey().getName(), OFFLINE_TABLE_NAME);
assertEquals(jobDetail.getKey().getGroup(), taskType);
assertSame(jobDetail.getJobDataMap().get("PinotTaskManager"), taskManager);
assertSame(jobDetail.getJobDataMap().get("LeadControllerManager"), _controllerStarter.getLeadControllerManager());
assertEquals(jobDetail.getJobDataMap().get("SkipLateCronSchedule"), skipLateCronSchedule);
assertEquals(jobDetail.getJobDataMap().get("MaxCronScheduleDelayInSeconds"), maxDelayInSeconds);
// jobDetail and jobTrigger are not added atomically by the scheduler,
// the jobDetail is added to an internal map firstly, and jobTrigger
// is added to another internal map afterwards, so we check for the existence
// of jobTrigger with some waits to be more defensive.
TestUtils.waitForCondition(aVoid -> {
try {
return scheduler.getTriggersOfJob(jobKey).size() == 1;
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}, TIMEOUT_IN_MS, "JobDetail exiting but missing JobTrigger");
// There is no guarantee that previous changes have been applied, therefore we need to
// retry the check for a bit
TestUtils.waitForCondition(aVoid -> {
try {
List<? extends Trigger> triggersOfJob = scheduler.getTriggersOfJob(jobKey);
Trigger trigger = triggersOfJob.iterator().next();
assertTrue(trigger instanceof CronTrigger);
assertEquals(((CronTrigger) trigger).getCronExpression(), cronExpression);
} catch (SchedulerException ex) {
throw new RuntimeException(ex);
} catch (AssertionError assertionError) {
LOGGER.warn("Unexpected cron expression. Hasn't been replicated yet?", assertionError);
}
return true;
}, TIMEOUT_IN_MS, 500L, "Cron expression didn't change to " + cronExpression);
}
@AfterClass
public void tearDown() {
stopZk();
}
}