blob: 197d9017fa4e26c51fc2739a1e955d99e6acb6c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.LegacySinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import java.util.Map;
/**
* Represents a task that can run on a worker. The general contracts surrounding Tasks are:
* <ul>
* <li>Tasks must operate on a single datasource.</li>
* <li>Tasks should be immutable, since the task ID is used as a proxy for the task in many locations.</li>
* <li>Task IDs must be unique. This can be done by naming them using UUIDs or the current timestamp.</li>
* <li>Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
* useful for producing sharded segments.</li>
* <li>Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
* to release locks early if they desire.</li>
* </ul>
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = "kill", value = KillUnusedSegmentsTask.class),
@Type(name = "move", value = MoveTask.class),
@Type(name = "archive", value = ArchiveTask.class),
@Type(name = "restore", value = RestoreTask.class),
@Type(name = "index", value = IndexTask.class),
@Type(name = ParallelIndexSupervisorTask.TYPE, value = ParallelIndexSupervisorTask.class),
@Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class),
// for backward compatibility
@Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value = LegacySinglePhaseSubTask.class),
@Type(name = PartialHashSegmentGenerateTask.TYPE, value = PartialHashSegmentGenerateTask.class),
@Type(name = PartialDimensionCardinalityTask.TYPE, value = PartialDimensionCardinalityTask.class),
@Type(name = PartialRangeSegmentGenerateTask.TYPE, value = PartialRangeSegmentGenerateTask.class),
@Type(name = PartialDimensionDistributionTask.TYPE, value = PartialDimensionDistributionTask.class),
@Type(name = PartialGenericSegmentMergeTask.TYPE, value = PartialGenericSegmentMergeTask.class),
@Type(name = "index_hadoop", value = HadoopIndexTask.class),
@Type(name = "index_realtime", value = RealtimeIndexTask.class),
@Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
@Type(name = "noop", value = NoopTask.class),
@Type(name = "compact", value = CompactionTask.class)
})
public interface Task
{
/**
* Returns ID of this task. Must be unique across all tasks ever created.
*
* @return task ID
*/
String getId();
/**
* Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
* a common convention is to set group ID equal to task ID.
*
* @return task group ID
*/
String getGroupId();
/**
* Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
* be used for task scheduling, cluster resource management, etc.
*
* The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint.
*
* It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case.
*
* @return task priority
*
* @see Tasks for default task priorities
* @see org.apache.druid.indexing.overlord.http.OverlordResource#taskPost
*/
default int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
}
/**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.
*
* @return {@link TaskResource} for this task
*/
TaskResource getTaskResource();
/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.
*
* @return task type label
*/
String getType();
/**
* Get the nodeType for if/when this task publishes on zookeeper.
*
* @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to
* publish to zookeeper.
*/
String getNodeType();
/**
* Returns the datasource this task operates on. Each task can operate on only one datasource.
*/
String getDataSource();
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
*
* @param <T> query result type
*
* @return query runners for this task
*/
<T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
* @return true if this Task type is queryable, such as streaming ingestion tasks
*/
boolean supportsQueries();
/**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
* extra classpath should be prepended, this should return null or the empty string.
*/
String getClasspathPrefix();
/**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
* <p/>
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
*
* @return true if ready, false if not ready yet
*
* @throws Exception if the task should be considered a failure
*/
boolean isReady(TaskActionClient taskActionClient) throws Exception;
/**
* Returns whether or not this task can restore its progress from its on-disk working directory. Restorable tasks
* may be started with a non-empty working directory. Tasks that exit uncleanly may still have a chance to attempt
* restores, meaning that restorable tasks should be able to deal with potentially partially written on-disk state.
*/
boolean canRestore();
/**
* Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be
* terminated with extreme prejudice.
*
* This method can be called at any time no matter when {@link #run} is executed. Regardless of when this method is
* called with respect to {@link #run}, its implementations must not allow a resource leak or lingering executions
* (local or remote).
*
* Depending on the task executor type, one of the two cases below can happen when the task is killed.
* - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills
* the process running the task, which triggers
* {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}.
* - When the task is executed by an indexer, {@link org.apache.druid.indexing.overlord.ThreadingTaskRunner#shutdown}
* calls this method directly.
*
* If the task has some resources to clean up on abnormal exit, e.g., sub tasks of parallel indexing task
* or Hadoop jobs spawned by Hadoop indexing tasks, those resource cleanups should be done in this method.
*
* @param taskConfig TaskConfig for this task
*/
void stopGracefully(TaskConfig taskConfig);
/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the task
* should be considered a failure.
*
* @param toolbox Toolbox for this task
*
* @return Some kind of finished status (isRunnable must be false).
*
* @throws Exception if this task failed
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;
default Map<String, Object> addToContext(String key, Object val)
{
getContext().put(key, val);
return getContext();
}
default Map<String, Object> addToContextIfAbsent(String key, Object val)
{
getContext().putIfAbsent(key, val);
return getContext();
}
Map<String, Object> getContext();
default <ContextValueType> ContextValueType getContextValue(String key)
{
return (ContextValueType) getContext().get(key);
}
default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
{
final ContextValueType value = getContextValue(key);
return value == null ? defaultValue : value;
}
}