| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.controller.api.resources; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.google.common.collect.HashBiMap; |
| import io.swagger.annotations.Api; |
| import io.swagger.annotations.ApiKeyAuthDefinition; |
| import io.swagger.annotations.ApiOperation; |
| import io.swagger.annotations.ApiParam; |
| import io.swagger.annotations.ApiResponse; |
| import io.swagger.annotations.ApiResponses; |
| import io.swagger.annotations.Authorization; |
| import io.swagger.annotations.SecurityDefinition; |
| import io.swagger.annotations.SwaggerDefinition; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Executor; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| import javax.inject.Inject; |
| import javax.ws.rs.DELETE; |
| import javax.ws.rs.DefaultValue; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.NotFoundException; |
| import javax.ws.rs.POST; |
| import javax.ws.rs.PUT; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.PathParam; |
| import javax.ws.rs.Produces; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.container.AsyncResponse; |
| import javax.ws.rs.container.Suspended; |
| import javax.ws.rs.core.Context; |
| import javax.ws.rs.core.HttpHeaders; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.UriInfo; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.exception.ExceptionUtils; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.task.TaskPartitionState; |
| import org.apache.helix.task.TaskState; |
| import org.apache.http.conn.HttpClientConnectionManager; |
| import org.apache.pinot.common.exception.TableNotFoundException; |
| import org.apache.pinot.common.minion.BaseTaskGeneratorInfo; |
| import org.apache.pinot.common.minion.TaskManagerStatusCache; |
| import org.apache.pinot.common.utils.DatabaseUtils; |
| import org.apache.pinot.controller.ControllerConf; |
| import org.apache.pinot.controller.api.access.AccessType; |
| import org.apache.pinot.controller.api.access.Authenticate; |
| import org.apache.pinot.controller.api.exception.ControllerApplicationException; |
| import org.apache.pinot.controller.api.exception.NoTaskScheduledException; |
| import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException; |
| import org.apache.pinot.controller.api.exception.UnknownTaskTypeException; |
| import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; |
| import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; |
| import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; |
| import org.apache.pinot.controller.util.CompletionServiceHelper; |
| import org.apache.pinot.core.auth.Actions; |
| import org.apache.pinot.core.auth.Authorize; |
| import org.apache.pinot.core.auth.TargetType; |
| import org.apache.pinot.core.minion.PinotTaskConfig; |
| import org.apache.pinot.spi.config.task.AdhocTaskConfig; |
| import org.apache.pinot.spi.utils.JsonUtils; |
| import org.glassfish.grizzly.http.server.Request; |
| import org.glassfish.jersey.server.ManagedAsync; |
| import org.quartz.CronTrigger; |
| import org.quartz.JobDataMap; |
| import org.quartz.JobDetail; |
| import org.quartz.JobKey; |
| import org.quartz.Scheduler; |
| import org.quartz.SchedulerException; |
| import org.quartz.SchedulerMetaData; |
| import org.quartz.SimpleTrigger; |
| import org.quartz.Trigger; |
| import org.quartz.impl.matchers.GroupMatcher; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.pinot.spi.utils.CommonConstants.DATABASE; |
| import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; |
| import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; |
| |
| |
| /** |
| * Task related rest APIs. |
| * <ul> |
| * <li>GET '/tasks/tasktypes': List all task types</li> |
| * <li>GET '/tasks/{taskType}/state': Get the state (task queue state) for the given task type</li> |
| * <li>GET '/tasks/{taskType}/tasks': List all tasks for the given task type</li> |
| * <li>GET '/tasks/{taskType}/taskstates': Get a map from task to task state for the given task type</li> |
| * <li>GET '/tasks/task/{taskName}/state': Get the task state for the given task</li> |
| * <li>GET '/tasks/task/{taskName}/config': Get the task config (a list of child task configs) for the given task</li> |
| * <li>POST '/tasks/schedule': Schedule tasks</li> |
| * <li>POST '/tasks/execute': Execute an adhoc task</li> |
| * <li>PUT '/tasks/{taskType}/cleanup': Clean up finished tasks (COMPLETED, FAILED) for the given task type</li> |
| * <li>PUT '/tasks/{taskType}/stop': Stop all running/pending tasks (as well as the task queue) for the given task |
| * type</li> |
| * <li>PUT '/tasks/{taskType}/resume': Resume all stopped tasks (as well as the task queue) for the given task |
| * type</li> |
| * <li>DELETE '/tasks/{taskType}': Delete all tasks (as well as the task queue) for the given task type</li> |
| * </ul> |
| */ |
| @Api(tags = Constants.TASK_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY), |
| @Authorization(value = DATABASE)}) |
| @SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { |
| @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, |
| key = SWAGGER_AUTHORIZATION_KEY), |
| @ApiKeyAuthDefinition(name = DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE, |
| description = "Database context passed through http header. If no context is provided 'default' database " |
| + "context will be considered.")})) |
| @Path("/") |
| public class PinotTaskRestletResource { |
| public static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskRestletResource.class); |
| |
| private static final String TASK_QUEUE_STATE_STOP = "STOP"; |
| private static final String TASK_QUEUE_STATE_RESUME = "RESUME"; |
| |
| @Inject |
| PinotHelixTaskResourceManager _pinotHelixTaskResourceManager; |
| |
| @Inject |
| PinotTaskManager _pinotTaskManager; |
| |
| @Inject |
| TaskManagerStatusCache _taskManagerStatusCache; |
| |
| @Inject |
| PinotHelixResourceManager _pinotHelixResourceManager; |
| |
| @Inject |
| Executor _executor; |
| |
| @Inject |
| HttpClientConnectionManager _connectionManager; |
| |
| @Inject |
| ControllerConf _controllerConf; |
| |
| @Context |
| private UriInfo _uriInfo; |
| |
| @GET |
| @Path("/tasks/tasktypes") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("List all task types") |
| public Set<String> listTaskTypes() { |
| return _pinotHelixTaskResourceManager.getTaskTypes(); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/state") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get the state (task queue state) for the given task type") |
| public TaskState getTaskQueueState( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { |
| return _pinotHelixTaskResourceManager.getTaskQueueState(taskType); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/tasks") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("List all tasks for the given task type") |
| public Set<String> getTasks(@ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { |
| return _pinotHelixTaskResourceManager.getTasks(taskType); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/{tableNameWithType}/state") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("List all tasks for the given task type") |
| public Map<String, TaskState> getTaskStatesByTable( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, |
| @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") |
| String tableNameWithType, @Context HttpHeaders headers) { |
| tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); |
| return _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableNameWithType); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/{tableNameWithType}/metadata") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get task metadata for the given task type and table") |
| public String getTaskMetadataByTable( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, |
| @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") |
| String tableNameWithType, @Context HttpHeaders headers) { |
| try { |
| tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); |
| return _pinotHelixTaskResourceManager.getTaskMetadataByTable(taskType, tableNameWithType); |
| } catch (JsonProcessingException e) { |
| throw new ControllerApplicationException(LOGGER, String |
| .format("Failed to format task metadata into Json for task type: %s from table: %s", taskType, |
| tableNameWithType), Response.Status.INTERNAL_SERVER_ERROR, e); |
| } |
| } |
| |
| @DELETE |
| @Path("/tasks/{taskType}/{tableNameWithType}/metadata") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DELETE_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Delete task metadata for the given task type and table") |
| public SuccessResponse deleteTaskMetadataByTable( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, |
| @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") |
| String tableNameWithType, @Context HttpHeaders headers) { |
| tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); |
| _pinotHelixTaskResourceManager.deleteTaskMetadataByTable(taskType, tableNameWithType); |
| return new SuccessResponse( |
| String.format("Successfully deleted metadata for task type: %s from table: %s", taskType, tableNameWithType)); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/taskcounts") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Fetch count of sub-tasks for each of the tasks for the given task type") |
| public Map<String, PinotHelixTaskResourceManager.TaskCount> getTaskCounts( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { |
| return _pinotHelixTaskResourceManager.getTaskCounts(taskType); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/debug") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DEBUG_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Fetch information for all the tasks for the given task type") |
| public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> getTasksDebugInfo( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, |
| @ApiParam(value = "verbosity (Prints information for all the tasks for the given task type." |
| + "By default, only prints subtask details for running and error tasks. " |
| + "Value of > 0 prints subtask details for all tasks)") |
| @DefaultValue("0") @QueryParam("verbosity") int verbosity) { |
| return _pinotHelixTaskResourceManager.getTasksDebugInfo(taskType, verbosity); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/{tableNameWithType}/debug") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DEBUG_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Fetch information for all the tasks for the given task type and table") |
| public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> getTasksDebugInfo( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, |
| @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") |
| String tableNameWithType, |
| @ApiParam(value = "verbosity (Prints information for all the tasks for the given task type and table." |
| + "By default, only prints subtask details for running and error tasks. " |
| + "Value of > 0 prints subtask details for all tasks)") |
| @DefaultValue("0") @QueryParam("verbosity") int verbosity, @Context HttpHeaders headers) { |
| tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); |
| return _pinotHelixTaskResourceManager.getTasksDebugInfoByTable(taskType, tableNameWithType, verbosity); |
| } |
| |
| @GET |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("/tasks/generator/{tableNameWithType}/{taskType}/debug") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @ApiOperation("Fetch task generation information for the recent runs of the given task for the given table") |
| public String getTaskGenerationDebugInto( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, |
| @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") |
| String tableNameWithType, |
| @ApiParam(value = "Whether to only lookup local cache for logs", defaultValue = "false") @QueryParam("localOnly") |
| boolean localOnly, @Context HttpHeaders httpHeaders) |
| throws JsonProcessingException { |
| tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, httpHeaders); |
| if (localOnly) { |
| BaseTaskGeneratorInfo taskGeneratorMostRecentRunInfo = |
| _taskManagerStatusCache.fetchTaskGeneratorInfo(tableNameWithType, taskType); |
| if (taskGeneratorMostRecentRunInfo == null) { |
| throw new ControllerApplicationException(LOGGER, "Task generation information not found", |
| Response.Status.NOT_FOUND); |
| } |
| |
| return JsonUtils.objectToString(taskGeneratorMostRecentRunInfo); |
| } |
| |
| // Call all controllers |
| List<InstanceConfig> controllers = _pinotHelixResourceManager.getAllControllerInstanceConfigs(); |
| // Relying on original schema that was used to query the controller |
| URI uri = _uriInfo.getRequestUri(); |
| String scheme = uri.getScheme(); |
| String finalTableNameWithType = tableNameWithType; |
| List<String> controllerUrls = controllers.stream().map(controller -> String |
| .format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(), |
| Integer.parseInt(controller.getPort()), finalTableNameWithType, taskType)).collect(Collectors.toList()); |
| |
| CompletionServiceHelper completionServiceHelper = |
| new CompletionServiceHelper(_executor, _connectionManager, HashBiMap.create(0)); |
| Map<String, String> requestHeaders = new HashMap<>(); |
| httpHeaders.getRequestHeaders().keySet().forEach(header -> { |
| requestHeaders.put(header, httpHeaders.getHeaderString(header)); |
| }); |
| LOGGER.debug("Getting task generation info with controllerUrls: {}", controllerUrls); |
| CompletionServiceHelper.CompletionServiceResponse serviceResponse = |
| completionServiceHelper.doMultiGetRequest(controllerUrls, null, true, requestHeaders, 10000); |
| |
| List<JsonNode> result = new ArrayList<>(); |
| serviceResponse._httpResponses.values().forEach(resp -> { |
| try { |
| result.add(JsonUtils.stringToJsonNode(resp)); |
| } catch (IOException e) { |
| LOGGER.error("Failed to parse controller response {}", resp, e); |
| } |
| }); |
| |
| return JsonUtils.objectToString(result); |
| } |
| |
| @GET |
| @Path("/tasks/task/{taskName}/debug") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DEBUG_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Fetch information for the given task name") |
| public PinotHelixTaskResourceManager.TaskDebugInfo getTaskDebugInfo( |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName, |
| @ApiParam(value = "verbosity (Prints information for the given task name." |
| + "By default, only prints subtask details for running and error tasks. " |
| + "Value of > 0 prints subtask details for all tasks)") |
| @DefaultValue("0") @QueryParam("verbosity") int verbosity) { |
| return _pinotHelixTaskResourceManager.getTaskDebugInfo(taskName, verbosity); |
| } |
| |
| @GET |
| @Path("/tasks/{taskType}/taskstates") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get a map from task to task state for the given task type") |
| public Map<String, TaskState> getTaskStates( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { |
| return _pinotHelixTaskResourceManager.getTaskStates(taskType); |
| } |
| |
| @GET |
| @Path("/tasks/task/{taskName}/state") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get the task state for the given task") |
| public TaskState getTaskState( |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { |
| return _pinotHelixTaskResourceManager.getTaskState(taskName); |
| } |
| |
| @GET |
| @Path("/tasks/subtask/{taskName}/state") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get the states of all the sub tasks for the given task") |
| public Map<String, TaskPartitionState> getSubtaskStates( |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { |
| return _pinotHelixTaskResourceManager.getSubtaskStates(taskName); |
| } |
| |
| @GET |
| @Path("/tasks/task/{taskName}/config") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get the task config (a list of child task configs) for the given task") |
| public List<PinotTaskConfig> getTaskConfigs( |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { |
| return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName); |
| } |
| |
| @GET |
| @Path("/tasks/task/{taskName}/runtime/config") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get the task runtime config for the given task") |
| public Map<String, String> getTaskConfig( |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { |
| return _pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName); |
| } |
| |
| @GET |
| @Path("/tasks/subtask/{taskName}/config") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get the configs of specified sub tasks for the given task") |
| public Map<String, PinotTaskConfig> getSubtaskConfigs( |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName, |
| @ApiParam(value = "Sub task names separated by comma") @QueryParam("subtaskNames") @Nullable |
| String subtaskNames) { |
| return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, subtaskNames); |
| } |
| |
| @GET |
| @Path("/tasks/subtask/{taskName}/progress") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get progress of specified sub tasks for the given task tracked by minion worker in memory") |
| public String getSubtaskProgress(@Context HttpHeaders httpHeaders, |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName, |
| @ApiParam(value = "Sub task names separated by comma") @QueryParam("subtaskNames") @Nullable |
| String subtaskNames) { |
| // Relying on original schema that was used to query the controller |
| String scheme = _uriInfo.getRequestUri().getScheme(); |
| List<InstanceConfig> workers = _pinotHelixResourceManager.getAllMinionInstanceConfigs(); |
| Map<String, String> workerEndpoints = new HashMap<>(); |
| for (InstanceConfig worker : workers) { |
| workerEndpoints.put(worker.getId(), |
| String.format("%s://%s:%d", scheme, worker.getHostName(), Integer.parseInt(worker.getPort()))); |
| } |
| Map<String, String> requestHeaders = new HashMap<>(); |
| httpHeaders.getRequestHeaders().keySet().forEach(header -> { |
| requestHeaders.put(header, httpHeaders.getHeaderString(header)); |
| }); |
| int timeoutMs = _controllerConf.getMinionAdminRequestTimeoutSeconds() * 1000; |
| try { |
| Map<String, Object> progress = _pinotHelixTaskResourceManager |
| .getSubtaskProgress(taskName, subtaskNames, _executor, _connectionManager, workerEndpoints, requestHeaders, |
| timeoutMs); |
| return JsonUtils.objectToString(progress); |
| } catch (UnknownTaskTypeException | NoTaskScheduledException e) { |
| throw new ControllerApplicationException(LOGGER, "Not task with name: " + taskName, Response.Status.NOT_FOUND, e); |
| } catch (Exception e) { |
| throw new ControllerApplicationException(LOGGER, String |
| .format("Failed to get worker side progress for task: %s due to error: %s", taskName, |
| ExceptionUtils.getStackTrace(e)), Response.Status.INTERNAL_SERVER_ERROR, e); |
| } |
| } |
| |
| @GET |
| @Path("/tasks/subtask/workers/progress") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Get progress of all subtasks with specified state tracked by minion worker in memory") |
| @ApiResponses(value = { |
| @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") |
| }) |
| public String getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders, |
| @ApiParam(value = "Subtask state (UNKNOWN,IN_PROGRESS,SUCCEEDED,CANCELLED,ERROR)", required = true) |
| @QueryParam("subTaskState") String subTaskState, |
| @ApiParam(value = "Minion worker IDs separated by comma") @QueryParam("minionWorkerIds") @Nullable |
| String minionWorkerIds) { |
| Set<String> selectedMinionWorkers = new HashSet<>(); |
| if (StringUtils.isNotEmpty(minionWorkerIds)) { |
| selectedMinionWorkers.addAll( |
| Arrays.stream(StringUtils.split(minionWorkerIds, ',')).map(String::trim).collect(Collectors.toList())); |
| } |
| // Relying on original schema that was used to query the controller |
| String scheme = _uriInfo.getRequestUri().getScheme(); |
| List<InstanceConfig> allMinionWorkerInstanceConfigs = _pinotHelixResourceManager.getAllMinionInstanceConfigs(); |
| Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>(); |
| for (InstanceConfig worker : allMinionWorkerInstanceConfigs) { |
| if (selectedMinionWorkers.isEmpty() || selectedMinionWorkers.contains(worker.getId())) { |
| selectedMinionWorkerEndpoints.put(worker.getId(), |
| String.format("%s://%s:%d", scheme, worker.getHostName(), Integer.parseInt(worker.getPort()))); |
| } |
| } |
| Map<String, String> requestHeaders = new HashMap<>(); |
| httpHeaders.getRequestHeaders().keySet().forEach(header -> |
| requestHeaders.put(header, httpHeaders.getHeaderString(header))); |
| int timeoutMs = _controllerConf.getMinionAdminRequestTimeoutSeconds() * 1000; |
| try { |
| Map<String, Object> minionWorkerIdSubtaskProgressMap = |
| _pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(subTaskState, _executor, _connectionManager, |
| selectedMinionWorkerEndpoints, requestHeaders, timeoutMs); |
| return JsonUtils.objectToString(minionWorkerIdSubtaskProgressMap); |
| } catch (Exception e) { |
| throw new ControllerApplicationException(LOGGER, |
| String.format("Failed to get minion worker side progress for subtasks with state %s due to error: %s", |
| subTaskState, ExceptionUtils.getStackTrace(e)), Response.Status.INTERNAL_SERVER_ERROR, e); |
| } |
| } |
| |
| @GET |
| @Path("/tasks/scheduler/information") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_SCHEDULER_INFO) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Fetch cron scheduler information") |
| public Map<String, Object> getCronSchedulerInformation() |
| throws SchedulerException { |
| Scheduler scheduler = _pinotTaskManager.getScheduler(); |
| if (scheduler == null) { |
| throw new NotFoundException("Task scheduler is disabled"); |
| } |
| SchedulerMetaData metaData = scheduler.getMetaData(); |
| Map<String, Object> schedulerMetaData = new HashMap<>(); |
| schedulerMetaData.put("Version", metaData.getVersion()); |
| schedulerMetaData.put("SchedulerName", metaData.getSchedulerName()); |
| schedulerMetaData.put("SchedulerInstanceId", metaData.getSchedulerInstanceId()); |
| schedulerMetaData.put("getThreadPoolClass", metaData.getThreadPoolClass()); |
| schedulerMetaData.put("getThreadPoolSize", metaData.getThreadPoolSize()); |
| schedulerMetaData.put("SchedulerClass", metaData.getSchedulerClass()); |
| schedulerMetaData.put("Clustered", metaData.isJobStoreClustered()); |
| schedulerMetaData.put("JobStoreClass", metaData.getJobStoreClass()); |
| schedulerMetaData.put("NumberOfJobsExecuted", metaData.getNumberOfJobsExecuted()); |
| schedulerMetaData.put("InStandbyMode", metaData.isInStandbyMode()); |
| schedulerMetaData.put("RunningSince", metaData.getRunningSince()); |
| List<Map> jobDetails = new ArrayList<>(); |
| for (String groupName : scheduler.getJobGroupNames()) { |
| for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) { |
| Map<String, Object> jobMap = new HashMap<>(); |
| List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); |
| jobMap.put("JobKey", jobKey); |
| jobMap.put("NextFireTime", triggers.get(0).getNextFireTime()); |
| jobMap.put("PreviousFireTime", triggers.get(0).getPreviousFireTime()); |
| jobDetails.add(jobMap); |
| } |
| } |
| schedulerMetaData.put("JobDetails", jobDetails); |
| return schedulerMetaData; |
| } |
| |
| @GET |
| @Path("/tasks/scheduler/jobKeys") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_SCHEDULER_INFO) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Fetch cron scheduler job keys") |
| public List<JobKey> getCronSchedulerJobKeys() |
| throws SchedulerException { |
| Scheduler scheduler = _pinotTaskManager.getScheduler(); |
| if (scheduler == null) { |
| throw new NotFoundException("Task scheduler is disabled"); |
| } |
| List<JobKey> jobKeys = new ArrayList<>(); |
| for (String group : scheduler.getTriggerGroupNames()) { |
| jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEquals(group))); |
| } |
| return jobKeys; |
| } |
| |
| @GET |
| @Path("/tasks/scheduler/jobDetails") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_SCHEDULER_INFO) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation("Fetch cron scheduler job keys") |
| public Map<String, Object> getCronSchedulerJobDetails( |
| @ApiParam(value = "Table name (with type suffix)", required = true) @QueryParam("tableName") String tableName, |
| @ApiParam(value = "Task type") @QueryParam("taskType") String taskType, @Context HttpHeaders headers) |
| throws SchedulerException { |
| Scheduler scheduler = _pinotTaskManager.getScheduler(); |
| if (scheduler == null) { |
| throw new NotFoundException("Task scheduler is disabled"); |
| } |
| tableName = DatabaseUtils.translateTableName(tableName, headers); |
| JobKey jobKey = JobKey.jobKey(tableName, taskType); |
| if (!scheduler.checkExists(jobKey)) { |
| throw new NotFoundException( |
| "Unable to find job detail for table name - " + tableName + ", task type - " + taskType); |
| } |
| JobDetail schedulerJobDetail = scheduler.getJobDetail(jobKey); |
| Map<String, Object> jobDetail = new HashMap<>(); |
| jobDetail.put("JobKey", schedulerJobDetail.getKey()); |
| jobDetail.put("Description", schedulerJobDetail.getDescription()); |
| jobDetail.put("JobClass", schedulerJobDetail.getJobClass()); |
| JobDataMap jobData = schedulerJobDetail.getJobDataMap(); |
| Map<String, String> jobDataMap = new HashMap<>(); |
| for (String key : jobData.getKeys()) { |
| jobDataMap.put(key, jobData.get(key).toString()); |
| } |
| jobDetail.put("JobDataMap", jobDataMap); |
| List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); |
| List<Map> triggerMaps = new ArrayList<>(); |
| if (!triggers.isEmpty()) { |
| for (Trigger trigger : triggers) { |
| Map<String, Object> triggerMap = new HashMap<>(); |
| if (trigger instanceof SimpleTrigger) { |
| SimpleTrigger simpleTrigger = (SimpleTrigger) trigger; |
| triggerMap.put("TriggerType", SimpleTrigger.class.getSimpleName()); |
| triggerMap.put("RepeatInterval", simpleTrigger.getRepeatInterval()); |
| triggerMap.put("RepeatCount", simpleTrigger.getRepeatCount()); |
| triggerMap.put("TimesTriggered", simpleTrigger.getTimesTriggered()); |
| triggerMap.put("NextFireTime", simpleTrigger.getNextFireTime()); |
| triggerMap.put("PreviousFireTime", simpleTrigger.getPreviousFireTime()); |
| } else if (trigger instanceof CronTrigger) { |
| CronTrigger cronTrigger = (CronTrigger) trigger; |
| triggerMap.put("TriggerType", CronTrigger.class.getSimpleName()); |
| triggerMap.put("TimeZone", cronTrigger.getTimeZone()); |
| triggerMap.put("CronExpression", cronTrigger.getCronExpression()); |
| triggerMap.put("ExpressionSummary", cronTrigger.getExpressionSummary()); |
| triggerMap.put("NextFireTime", cronTrigger.getNextFireTime()); |
| triggerMap.put("PreviousFireTime", cronTrigger.getPreviousFireTime()); |
| } |
| triggerMaps.add(triggerMap); |
| } |
| } |
| jobDetail.put("Triggers", triggerMaps); |
| return jobDetail; |
| } |
| |
| @POST |
| @Path("/tasks/schedule") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CREATE_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Authenticate(AccessType.UPDATE) |
| @ApiOperation("Schedule tasks and return a map from task type to task name scheduled") |
| public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType, |
| @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName, |
| @ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag") |
| @Nullable String minionInstanceTag, @Context HttpHeaders headers) { |
| String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE; |
| if (taskType != null) { |
| // Schedule task for the given task type |
| List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType, |
| DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag) |
| : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag); |
| return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ',')); |
| } else { |
| // Schedule tasks for all task types |
| Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable( |
| DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag) |
| : _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag); |
| return allTaskNames.entrySet().stream() |
| .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue()))); |
| } |
| } |
| |
| @POST |
| @ManagedAsync |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("/tasks/execute") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.EXECUTE_TASK) |
| @Authenticate(AccessType.CREATE) |
| @ApiOperation("Execute a task on minion") |
| public void executeAdhocTask(AdhocTaskConfig adhocTaskConfig, @Suspended AsyncResponse asyncResponse, |
| @Context Request requestContext) { |
| try { |
| asyncResponse.resume(_pinotTaskManager.createTask(adhocTaskConfig.getTaskType(), adhocTaskConfig.getTableName(), |
| adhocTaskConfig.getTaskName(), adhocTaskConfig.getTaskConfigs())); |
| } catch (TableNotFoundException e) { |
| throw new ControllerApplicationException(LOGGER, "Failed to find table: " + adhocTaskConfig.getTableName(), |
| Response.Status.NOT_FOUND, e); |
| } catch (TaskAlreadyExistsException e) { |
| throw new ControllerApplicationException(LOGGER, "Task already exists: " + adhocTaskConfig.getTaskName(), |
| Response.Status.CONFLICT, e); |
| } catch (UnknownTaskTypeException e) { |
| throw new ControllerApplicationException(LOGGER, "Unknown task type: " + adhocTaskConfig.getTaskType(), |
| Response.Status.NOT_FOUND, e); |
| } catch (NoTaskScheduledException e) { |
| throw new ControllerApplicationException(LOGGER, |
| "No task is generated for table: " + adhocTaskConfig.getTableName() + ", with task type: " |
| + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST); |
| } catch (Exception e) { |
| throw new ControllerApplicationException(LOGGER, |
| "Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e), Response.Status.INTERNAL_SERVER_ERROR, e); |
| } |
| } |
| |
| @PUT |
| @Path("/tasks/{taskType}/cleanup") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CLEANUP_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Authenticate(AccessType.UPDATE) |
| @ApiOperation("Clean up finished tasks (COMPLETED, FAILED) for the given task type") |
| public SuccessResponse cleanUpTasks( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { |
| _pinotHelixTaskResourceManager.cleanUpTaskQueue(taskType); |
| return new SuccessResponse("Successfully cleaned up tasks for task type: " + taskType); |
| } |
| |
| @PUT |
| @Path("/tasks/{taskType}/stop") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.STOP_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Authenticate(AccessType.UPDATE) |
| @ApiOperation("Stop all running/pending tasks (as well as the task queue) for the given task type") |
| public SuccessResponse stopTasks( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { |
| _pinotHelixTaskResourceManager.stopTaskQueue(taskType); |
| return new SuccessResponse("Successfully stopped tasks for task type: " + taskType); |
| } |
| |
| @PUT |
| @Path("/tasks/{taskType}/resume") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.RESUME_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Authenticate(AccessType.UPDATE) |
| @ApiOperation("Resume all stopped tasks (as well as the task queue) for the given task type") |
| public SuccessResponse resumeTasks( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { |
| _pinotHelixTaskResourceManager.resumeTaskQueue(taskType); |
| return new SuccessResponse("Successfully resumed tasks for task type: " + taskType); |
| } |
| |
| @DELETE |
| @Path("/tasks/{taskType}") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DELETE_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Authenticate(AccessType.DELETE) |
| @ApiOperation("Delete all tasks (as well as the task queue) for the given task type") |
| public SuccessResponse deleteTasks( |
| @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, |
| @ApiParam(value = "Whether to force deleting the tasks (expert only option, enable with cautious") |
| @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) { |
| _pinotHelixTaskResourceManager.deleteTaskQueue(taskType, forceDelete); |
| return new SuccessResponse("Successfully deleted tasks for task type: " + taskType); |
| } |
| |
| @DELETE |
| @Path("/tasks/task/{taskName}") |
| @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DELETE_TASK) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Authenticate(AccessType.DELETE) |
| @ApiOperation("Delete a single task given its task name") |
| public SuccessResponse deleteTask( |
| @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName, |
| @ApiParam(value = "Whether to force deleting the task (expert only option, enable with cautious") |
| @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) { |
| _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete); |
| return new SuccessResponse("Successfully deleted task: " + taskName); |
| } |
| } |