| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nemo.runtime.master.resource; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| import com.google.protobuf.ByteString; |
| import org.apache.commons.lang3.SerializationUtils; |
| import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty; |
| import org.apache.nemo.runtime.common.RuntimeIdManager; |
| import org.apache.nemo.runtime.common.comm.ControlMessage; |
| import org.apache.nemo.runtime.common.message.MessageEnvironment; |
| import org.apache.nemo.runtime.common.message.MessageSender; |
| import org.apache.nemo.runtime.common.plan.Task; |
| import org.apache.reef.driver.context.ActiveContext; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.concurrent.NotThreadSafe; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| /** |
| * (WARNING) This class is not thread-safe, and thus should only be accessed through ExecutorRegistry. |
| * <p> |
| * Contains information/state regarding an executor. |
| * Such information may include: |
| * a) The executor's resource type. |
| * b) The executor's capacity (ex. number of cores). |
| * c) Tasks scheduled/launched for the executor. |
| * d) Name of the physical node which hosts this executor. |
| * e) (Please add other information as we implement more features). |
| */ |
| @NotThreadSafe |
| public final class ExecutorRepresenter { |
| private static final Logger LOG = LoggerFactory.getLogger(ExecutorRepresenter.class.getName()); |
| |
| private final String executorId; |
| private final ResourceSpecification resourceSpecification; |
| private final Map<String, Task> runningComplyingTasks; |
| private final Map<String, Task> runningNonComplyingTasks; |
| private final Map<Task, Integer> runningTaskToAttempt; |
| private final Set<Task> completeTasks; |
| private final Set<Task> failedTasks; |
| private final MessageSender<ControlMessage.Message> messageSender; |
| private final ActiveContext activeContext; |
| private final ExecutorService serializationExecutorService; |
| private final String nodeName; |
| |
| /** |
| * Creates a reference to the specified executor. |
| * |
| * @param executorId the executor id |
| * @param resourceSpecification specification for the executor |
| * @param messageSender provides communication context for this executor |
| * @param activeContext context on the corresponding REEF evaluator |
| * @param serializationExecutorService provides threads for message serialization |
| * @param nodeName physical name of the node where this executor resides |
| */ |
| public ExecutorRepresenter(final String executorId, |
| final ResourceSpecification resourceSpecification, |
| final MessageSender<ControlMessage.Message> messageSender, |
| final ActiveContext activeContext, |
| final ExecutorService serializationExecutorService, |
| final String nodeName) { |
| this.executorId = executorId; |
| this.resourceSpecification = resourceSpecification; |
| this.messageSender = messageSender; |
| this.runningComplyingTasks = new HashMap<>(); |
| this.runningNonComplyingTasks = new HashMap<>(); |
| this.runningTaskToAttempt = new HashMap<>(); |
| this.completeTasks = new HashSet<>(); |
| this.failedTasks = new HashSet<>(); |
| this.activeContext = activeContext; |
| this.serializationExecutorService = serializationExecutorService; |
| this.nodeName = nodeName; |
| } |
| |
| /** |
| * Marks all Tasks which were running in this executor as failed. |
| * |
| * @return set of identifiers of tasks that were running in this executor. |
| */ |
| public Set<String> onExecutorFailed() { |
| failedTasks.addAll(runningComplyingTasks.values()); |
| failedTasks.addAll(runningNonComplyingTasks.values()); |
| final Set<String> taskIds = Stream.concat(runningComplyingTasks.keySet().stream(), |
| runningNonComplyingTasks.keySet().stream()).collect(Collectors.toSet()); |
| runningComplyingTasks.clear(); |
| runningNonComplyingTasks.clear(); |
| return taskIds; |
| } |
| |
| /** |
| * Marks the Task as running, and sends scheduling message to the executor. |
| * |
| * @param task the task to run |
| */ |
| public void onTaskScheduled(final Task task) { |
| (task.getPropertyValue(ResourceSlotProperty.class).orElse(true) |
| ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task); |
| runningTaskToAttempt.put(task, task.getAttemptIdx()); |
| failedTasks.remove(task); |
| |
| |
| serializationExecutorService.execute(() -> { |
| final byte[] serialized = SerializationUtils.serialize(task); |
| sendControlMessage( |
| ControlMessage.Message.newBuilder() |
| .setId(RuntimeIdManager.generateMessageId()) |
| .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID) |
| .setType(ControlMessage.MessageType.ScheduleTask) |
| .setScheduleTaskMsg( |
| ControlMessage.ScheduleTaskMsg.newBuilder() |
| .setTask(ByteString.copyFrom(serialized)) |
| .build()) |
| .build()); |
| }); |
| } |
| |
| /** |
| * Sends control message to the executor. |
| * |
| * @param message Message object to send |
| */ |
| public void sendControlMessage(final ControlMessage.Message message) { |
| messageSender.send(message); |
| } |
| |
| /** |
| * Marks the specified Task as completed. |
| * |
| * @param taskId id of the completed task |
| */ |
| public void onTaskExecutionComplete(final String taskId) { |
| final Task completedTask = removeFromRunningTasks(taskId); |
| runningTaskToAttempt.remove(completedTask); |
| completeTasks.add(completedTask); |
| } |
| |
| /** |
| * Marks the specified Task as failed. |
| * |
| * @param taskId id of the Task |
| */ |
| public void onTaskExecutionFailed(final String taskId) { |
| final Task failedTask = removeFromRunningTasks(taskId); |
| runningTaskToAttempt.remove(failedTask); |
| failedTasks.add(failedTask); |
| } |
| |
| /** |
| * @return how many Tasks can this executor simultaneously run |
| */ |
| public int getExecutorCapacity() { |
| return resourceSpecification.getCapacity(); |
| } |
| |
| /** |
| * @return the current snapshot of set of Tasks that are running in this executor. |
| */ |
| public Set<Task> getRunningTasks() { |
| return Stream.concat(runningComplyingTasks.values().stream(), |
| runningNonComplyingTasks.values().stream()).collect(Collectors.toSet()); |
| } |
| |
| /** |
| * @return the number of running {@link Task}s. |
| */ |
| public int getNumOfRunningTasks() { |
| return getNumOfComplyingRunningTasks() + getNumOfNonComplyingRunningTasks(); |
| } |
| |
| /** |
| * @return the number of running {@link Task}s that complies to the executor slot restriction. |
| */ |
| public int getNumOfComplyingRunningTasks() { |
| return runningComplyingTasks.size(); |
| } |
| |
| /** |
| * @return the number of running {@link Task}s that does not comply to the executor slot restriction. |
| */ |
| public int getNumOfNonComplyingRunningTasks() { |
| return runningNonComplyingTasks.size(); |
| } |
| |
| /** |
| * @return the executor id |
| */ |
| public String getExecutorId() { |
| return executorId; |
| } |
| |
| /** |
| * @return the container type |
| */ |
| public String getContainerType() { |
| return resourceSpecification.getContainerType(); |
| } |
| |
| /** |
| * @return physical name of the node where this executor resides |
| */ |
| public String getNodeName() { |
| return nodeName; |
| } |
| |
| /** |
| * Shuts down this executor. |
| */ |
| public void shutDown() { |
| activeContext.close(); |
| } |
| |
| @Override |
| public String toString() { |
| final ObjectMapper mapper = new ObjectMapper(); |
| final ObjectNode node = mapper.createObjectNode(); |
| node.put("executorId", executorId); |
| node.put("runningTasks", getRunningTasks().toString()); |
| node.put("failedTasks", failedTasks.toString()); |
| return node.toString(); |
| } |
| |
| /** |
| * Removes the specified {@link Task} from the map of running tasks. |
| * |
| * @param taskId id of the task to remove |
| * @return the removed {@link Task} |
| */ |
| private Task removeFromRunningTasks(final String taskId) { |
| final Task task; |
| if (runningComplyingTasks.containsKey(taskId)) { |
| task = runningComplyingTasks.remove(taskId); |
| } else if (runningNonComplyingTasks.containsKey(taskId)) { |
| task = runningNonComplyingTasks.remove(taskId); |
| } else { |
| throw new RuntimeException(String.format("Task %s not found in its ExecutorRepresenter", taskId)); |
| } |
| return task; |
| } |
| } |
| |