blob: d166f3c4329e179600470cde2ae12da22c6453f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.yarn;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.heron.api.exception.InvalidTopologyException;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.scheduler.SchedulerMain;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.Cluster;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.Environ;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.HeronCorePackageName;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.HttpPort;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.Role;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.TopologyJar;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.TopologyName;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.TopologyPackageName;
import org.apache.heron.scheduler.yarn.HeronConfigurationOptions.VerboseLogMode;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.PackingPlan.ContainerPlan;
import org.apache.heron.spi.packing.Resource;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.task.CompletedTask;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;
/**
* {@link HeronMasterDriver} is a topology's ApplicationMaster running on YARN clusters. It performs
* container management for Heron's YARN Scheduler using REEF framework. This includes making
* container request for heron instances, providing package needed to start Heron components,
* monitoring the containers and killing containers when requested.
* <p></p>
* REEF libraries communicate with YARN RM on behalf of this AM. The communication between REEF and
* {@link HeronMasterDriver} is event driven. {@link HeronMasterDriver} provides handlers for
* some of the events generated by the framework, including
* <ol>
* <li>configuring newly allocated containers, {@link ContainerAllocationHandler}</li>
* <li>launching Heron components in the containers {@link HeronWorkerLauncher}</li>
* <li>handing errors/failures, {@link FailedContainerHandler} and
* {@link HeronWorkerTaskFailureHandler}</li>
* </ol>
* <p></p>
* On initialization {@link HeronMasterDriver} unpacks Heron package and starts the Heron scheduler.
* It then receives a {@link PackingPlan} from the scheduler. It then submit new container requests
* to the REEF framework and provides the desired container size. Some of the handlers listed above
* are then invoked and heron components will be activated.
* <p></p>
* The containers allocated by YARN may be bigger than the requested size. This is a consequence of
* a YARN scheduler optimization, which allocates resources in fixed memory and CPU increments. For
* e.g. if the memory increment is 100 MB, YARN may allocated a container of 200 MB when 150 MB is
* requested. As a result, the actual resource allocation may be little more than what was
* requested. Moreover YARN does not provide any mechanism to match the container request to
* allocated container. So the AM needs to find a suitable heron {@link ContainerPlan} to be
* launched in the the newly allocated YARN container.
*/
@Unit
public class HeronMasterDriver {
static final int TM_MEM_SIZE_MB = 1024;
static final int TMANAGER_CONTAINER_ID = 0;
static final int MB = 1024 * 1024;
private static final Logger LOG = Logger.getLogger(HeronMasterDriver.class.getName());
private final String topologyPackageName;
private final String heronCorePackageName;
private final EvaluatorRequestor requestor;
private final REEFFileNames reefFileNames;
private final String localHeronConfDir;
private final String cluster;
private final String role;
private final String topologyName;
private final String env;
private final String topologyJar;
private final int httpPort;
private final boolean verboseMode;
// This map contains all the workers this application will be managing. Some of these workers
// may be allocated and running.
private HashMap<Integer, ContainerPlan> containerPlans = new HashMap<>();
// This map contains all the allocated workers managed by this scheduler. The workers can be
// looked up by heron's executor id or REEF's container id.
private MultiKeyWorkerMap multiKeyWorkerMap;
private TManager tManager;
// TODO: https://github.com/apache/incubator-heron/issues/949: implement Driver HA
private String componentRamMap;
private AtomicBoolean isTopologyKilled = new AtomicBoolean(false);
@Inject
public HeronMasterDriver(EvaluatorRequestor requestor,
final REEFFileNames fileNames,
@Parameter(Cluster.class) String cluster,
@Parameter(Role.class) String role,
@Parameter(TopologyName.class) String topologyName,
@Parameter(Environ.class) String env,
@Parameter(TopologyJar.class) String topologyJar,
@Parameter(TopologyPackageName.class) String topologyPackageName,
@Parameter(HeronCorePackageName.class) String heronCorePackageName,
@Parameter(HttpPort.class) int httpPort,
@Parameter(VerboseLogMode.class) boolean verboseMode)
throws IOException {
// REEF related initialization
this.requestor = requestor;
this.reefFileNames = fileNames;
// Heron related initialization
this.localHeronConfDir = ".";
this.cluster = cluster;
this.role = role;
this.topologyName = topologyName;
this.topologyPackageName = topologyPackageName;
this.heronCorePackageName = heronCorePackageName;
this.env = env;
this.topologyJar = topologyJar;
this.httpPort = httpPort;
this.verboseMode = verboseMode;
this.multiKeyWorkerMap = new MultiKeyWorkerMap();
// This instance of Driver will be used for managing topology containers
HeronMasterDriverProvider.setInstance(this);
}
private static int getCpuForExecutor(Resource resource) {
return (int) Math.ceil(resource.getCpu());
}
/**
* Container allocation is asynchronous. Requests all containers in the input packing plan
* serially to ensure allocated resources match the required resources.
*/
void scheduleHeronWorkers(PackingPlan topologyPacking) throws ContainerAllocationException {
this.componentRamMap = topologyPacking.getComponentRamDistribution();
scheduleHeronWorkers(topologyPacking.getContainers());
}
/*
* Must be invoked after workers are scheduled. TManager needs component RAM map.
*/
void launchTManager() {
tManager = buildTManager(Executors.newSingleThreadExecutor());
tManager.launch();
}
/**
* Container allocation is asynchronous. Requests all containers in the input set serially
* to ensure allocated resources match the required resources.
*/
void scheduleHeronWorkers(Set<ContainerPlan> containers) throws ContainerAllocationException {
for (ContainerPlan containerPlan : containers) {
int id = containerPlan.getId();
if (containerPlans.containsKey(containerPlan.getId())) {
throw new ContainerAllocationException("Received duplicate allocation request for " + id);
}
Resource reqResource = containerPlan.getRequiredResource();
containerPlans.put(id, containerPlan);
requestContainerForWorker(id, new HeronWorker(id, reqResource));
}
}
/**
* YARN allocates resources in fixed increments of memory and CPU. As a result, the actual
* resource allocation may be little more than what was requested. This method finds the biggest
* heron containerPlan that will fit the allocated YARN container. In some cases the YARN CPU
* scheduling may be disabled, resulting in default core allocation to each container. This
* method can ignore core fitting in such a case.
*/
@VisibleForTesting
Optional<HeronWorker> findLargestFittingWorker(AllocatedEvaluator evaluator,
Collection<HeronWorker> pendingWorkers,
boolean ignoreCpu) {
ByteAmount allocatedRam
= ByteAmount.fromMegabytes(evaluator.getEvaluatorDescriptor().getMemory());
int allocatedCores = evaluator.getEvaluatorDescriptor().getNumberOfCores();
HeronWorker biggestFittingWorker = null;
for (HeronWorker worker : pendingWorkers) {
if (worker.mem.greaterThan(allocatedRam)) {
continue;
}
if (!ignoreCpu) {
if (worker.cores > allocatedCores) {
continue;
}
}
if (biggestFittingWorker != null) {
if (worker.mem.lessThan(biggestFittingWorker.mem)
|| worker.cores < biggestFittingWorker.cores) {
continue;
}
}
biggestFittingWorker = worker;
}
return Optional.fromNullable(biggestFittingWorker);
}
@VisibleForTesting
Set<HeronWorker> getWorkersAwaitingAllocation() {
Set<HeronWorker> workersAwaitingAllocation = new HashSet<>();
for (Integer id : containerPlans.keySet()) {
if (multiKeyWorkerMap.lookupByWorkerId(id).isPresent()) {
// this container plan is already allocated to a container
continue;
}
workersAwaitingAllocation
.add(new HeronWorker(id, containerPlans.get(id).getRequiredResource()));
}
LOG.info("Number of workers awaiting allocation: " + workersAwaitingAllocation.size());
return workersAwaitingAllocation;
}
public void killTopology() {
LOG.log(Level.INFO, "Kill topology: {0}", topologyName);
isTopologyKilled.set(true);
tManager.killTManager();
for (HeronWorker worker : multiKeyWorkerMap.getHeronWorkers()) {
AllocatedEvaluator evaluator = multiKeyWorkerMap.detachEvaluatorAndRemove(worker);
LOG.log(Level.INFO, "Killing container {0} for worker {1}",
new Object[]{evaluator.getId(), worker.workerId});
evaluator.close();
}
}
/**
* Terminates any yarn containers associated with the given containers.
*/
public void killWorkers(Set<ContainerPlan> containers) {
for (ContainerPlan container : containers) {
LOG.log(Level.INFO, "Find and kill container for worker {0}", container.getId());
Optional<HeronWorker> worker = multiKeyWorkerMap.lookupByWorkerId(container.getId());
if (worker.isPresent()) {
LOG.log(Level.INFO, "Killing container {0} for worker {1}",
new Object[]{worker.get().evaluator.getId(), worker.get().workerId});
AllocatedEvaluator evaluator = multiKeyWorkerMap.detachEvaluatorAndRemove(worker.get());
evaluator.close();
} else {
LOG.log(Level.WARNING, "Did not find worker for {0}", container.getId());
}
containerPlans.remove(container.getId());
}
}
public void restartTopology() throws ContainerAllocationException {
for (HeronWorker worker : multiKeyWorkerMap.getHeronWorkers()) {
restartWorker(worker.workerId);
}
}
public void restartWorker(int id) throws ContainerAllocationException {
LOG.log(Level.INFO, "Find & restart container for id={0}", id);
Optional<HeronWorker> worker = multiKeyWorkerMap.lookupByWorkerId(id);
if (!worker.isPresent()) {
LOG.log(Level.WARNING, "Requesting a new container for: {0}", id);
ContainerPlan containerPlan = containerPlans.get(id);
if (containerPlan == null) {
throw new IllegalArgumentException(
String.format("There is no container for %s in packing plan.", id));
}
worker = Optional.of(new HeronWorker(id, containerPlan.getRequiredResource()));
} else {
AllocatedEvaluator evaluator = multiKeyWorkerMap.detachEvaluatorAndRemove(worker.get());
LOG.log(Level.INFO, "Shutting down container {0}", evaluator.getId());
evaluator.close();
}
requestContainerForWorker(worker.get().workerId, worker.get());
}
@VisibleForTesting
void requestContainerForWorker(int id, final HeronWorker worker) {
int cpu = worker.cores;
ByteAmount mem = worker.mem;
EvaluatorRequest evaluatorRequest = createEvaluatorRequest(cpu, mem);
LOG.info(String.format("Requesting container for worker: %d, RAM: %s, CPU: %d", id, mem, cpu));
requestor.submit(evaluatorRequest);
}
@VisibleForTesting
EvaluatorRequest createEvaluatorRequest(int cpu, ByteAmount mem) {
return EvaluatorRequest
.newBuilder()
.setNumber(1)
.setMemory(((Long) mem.asMegabytes()).intValue())
.setNumberOfCores(cpu)
.build();
}
@VisibleForTesting
Configuration createContextConfig(int executorId) {
return ContextConfiguration.CONF
.set(ContextConfiguration.IDENTIFIER, executorId + "")
.build();
}
String getComponentRamMap() {
return componentRamMap;
}
void submitHeronExecutorTask(int workerId) {
Optional<HeronWorker> worker = multiKeyWorkerMap.lookupByWorkerId(workerId);
if (!worker.isPresent()) {
return;
}
LOG.log(Level.INFO, "Submitting evaluator task for id: {0}", workerId);
// topologyName and other configurations are required by Heron Executor and Task to load
// configuration files. Using REEF configuration model is better than depending on external
// persistence.
final Configuration taskConf = HeronTaskConfiguration.CONF
.set(TaskConfiguration.TASK, HeronExecutorTask.class)
.set(TaskConfiguration.ON_CLOSE, HeronExecutorTask.HeronExecutorTaskTerminator.class)
.set(TaskConfiguration.IDENTIFIER, workerId + "")
.set(HeronTaskConfiguration.TOPOLOGY_NAME, topologyName)
.set(HeronTaskConfiguration.TOPOLOGY_JAR, topologyJar)
.set(HeronTaskConfiguration.TOPOLOGY_PACKAGE_NAME, topologyPackageName)
.set(HeronTaskConfiguration.HERON_CORE_PACKAGE_NAME, heronCorePackageName)
.set(HeronTaskConfiguration.ROLE, role)
.set(HeronTaskConfiguration.ENV, env)
.set(HeronTaskConfiguration.CLUSTER, cluster)
.set(HeronTaskConfiguration.COMPONENT_RAM_MAP, getComponentRamMap())
.set(HeronTaskConfiguration.CONTAINER_ID, workerId)
.set(HeronTaskConfiguration.VERBOSE, verboseMode)
.build();
worker.get().context.submitTask(taskConf);
}
@VisibleForTesting
Optional<Integer> lookupByEvaluatorId(String id) {
Optional<HeronWorker> result = multiKeyWorkerMap.lookupByEvaluatorId(id);
if (!result.isPresent()) {
return Optional.absent();
}
return Optional.of(result.get().workerId);
}
@VisibleForTesting
Optional<ContainerPlan> lookupByContainerPlan(int id) {
return Optional.fromNullable(containerPlans.get(id));
}
@VisibleForTesting
TManager buildTManager(ExecutorService executor) {
TManager tManagerManager = new TManager();
tManagerManager.executor = executor;
return tManagerManager;
}
/**
* {@link HeronWorker} is a data class which connects reef ids, heron ids and related objects.
* All the pointers in an instance are related to one container. A container is a reef object,
* owns one heron worker id and its handlers
*/
@VisibleForTesting
static final class HeronWorker {
private int workerId;
private int cores;
private ByteAmount mem;
private AllocatedEvaluator evaluator;
private ActiveContext context;
HeronWorker(int id, int cores, ByteAmount mem) {
this.workerId = id;
this.cores = cores;
this.mem = mem;
}
HeronWorker(int id, Resource resource) {
this.workerId = id;
this.cores = getCpuForExecutor(resource);
this.mem = resource.getRam();
}
public int getWorkerId() {
return workerId;
}
}
/**
* {@link MultiKeyWorkerMap} is a helper class to provide multi key lookup of a
* {@link HeronWorker} instance. It also ensures thread safety and update order.
*/
private static final class MultiKeyWorkerMap {
private Map<Integer, HeronWorker> workerMap = new HashMap<>();
private Map<String, HeronWorker> evaluatorWorkerMap = new HashMap<>();
void assignEvaluatorToWorker(HeronWorker worker, AllocatedEvaluator evaluator) {
worker.evaluator = evaluator;
synchronized (workerMap) {
workerMap.put(worker.workerId, worker);
evaluatorWorkerMap.put(evaluator.getId(), worker);
}
}
Optional<HeronWorker> lookupByEvaluatorId(String evaluatorId) {
synchronized (workerMap) {
return Optional.fromNullable(evaluatorWorkerMap.get(evaluatorId));
}
}
Optional<HeronWorker> lookupByWorkerId(int workerId) {
HeronWorker worker;
synchronized (workerMap) {
worker = workerMap.get(workerId);
}
if (worker == null) {
LOG.log(Level.INFO, "Container for executor id: {0} not found.", workerId);
}
return Optional.fromNullable(worker);
}
AllocatedEvaluator detachEvaluatorAndRemove(HeronWorker worker) {
synchronized (workerMap) {
workerMap.remove(worker.workerId);
evaluatorWorkerMap.remove(worker.evaluator.getId());
}
AllocatedEvaluator evaluator = worker.evaluator;
worker.evaluator = null;
return evaluator;
}
ArrayList<HeronWorker> getHeronWorkers() {
synchronized (workerMap) {
return new ArrayList<>(workerMap.values());
}
}
}
/**
* {@link ContainerAllocationException} represents an error while trying to allocate a
* container
*/
public static final class ContainerAllocationException extends Exception {
static final long serialVersionUID = 1L;
public ContainerAllocationException(String message) {
this(message, null);
}
public ContainerAllocationException(String message, Exception e) {
super(message, e);
}
}
/**
* This class manages the TManager executor process, including launching the TManager, monitoring it
* and killing it when needed.
*/
@VisibleForTesting
class TManager implements Runnable {
private ExecutorService executor;
private Future<?> tManagerFuture;
private CountDownLatch tManagerErrorCounter = new CountDownLatch(3);
void launch() {
LOG.log(Level.INFO, "Launching executor for TM: {0}", topologyName);
tManagerFuture = executor.submit(this);
// the following task will restart the tManager if it fails
executor.submit(new Runnable() {
@Override
public void run() {
try {
tManagerFuture.get();
LOG.log(Level.INFO, "TManager executor terminated, {0}", topologyName);
} catch (InterruptedException | ExecutionException e) {
LOG.log(Level.WARNING, "Error while waiting for TManager executor", e);
}
if (isTopologyKilled.get()) {
LOG.log(Level.INFO, "The topology is killed, {0}", topologyName);
return;
}
tManagerErrorCounter.countDown();
long counter = tManagerErrorCounter.getCount();
if (counter > 0) {
LOG.log(Level.WARNING, "Restarting TManager, attempts left: {0}", counter);
launch();
}
}
});
}
void killTManager() {
LOG.log(Level.INFO, "Killing TManager process: {0}", topologyName);
if (!tManagerFuture.isDone()) {
tManagerFuture.cancel(true);
}
executor.shutdownNow();
}
HeronExecutorTask getTManagerExecutorTask() {
return new HeronExecutorTask(reefFileNames, TMANAGER_CONTAINER_ID,
cluster, role, topologyName, env, topologyPackageName, heronCorePackageName, topologyJar,
getComponentRamMap(), verboseMode);
}
@Override
public void run() {
HeronExecutorTask tManagerTask = getTManagerExecutorTask();
try {
tManagerTask.startExecutor();
} catch (InvalidTopologyException e) {
throw new RuntimeException(e);
}
}
}
/**
* {@link HeronSchedulerLauncher} is the first class initialized on the server by REEF. This is
* responsible for unpacking binaries and launching Heron Scheduler.
*/
class HeronSchedulerLauncher implements EventHandler<StartTime> {
@Override
public void onNext(StartTime value) {
String globalFolder = reefFileNames.getGlobalFolder().getPath();
HeronReefUtils.extractPackageInSandbox(globalFolder, topologyPackageName, localHeronConfDir);
HeronReefUtils.extractPackageInSandbox(globalFolder, heronCorePackageName, localHeronConfDir);
launchScheduler();
}
private void launchScheduler() {
try {
LOG.log(Level.INFO, "Launching Heron scheduler");
SchedulerMain schedulerMain = SchedulerMain.createInstance(cluster,
role,
env,
topologyJar,
topologyName,
httpPort,
false);
schedulerMain.runScheduler();
} catch (IOException | InvalidTopologyException e) {
throw new RuntimeException("Failed to launch Heron Scheduler", e);
}
}
}
/**
* Initializes worker on the allocated container
*/
class ContainerAllocationHandler implements EventHandler<AllocatedEvaluator> {
@Override
public void onNext(AllocatedEvaluator evaluator) {
EvaluatorDescriptor descriptor = evaluator.getEvaluatorDescriptor();
LOG.log(Level.INFO, String.format("New container received, id: %s, mem: %d, cores: %d",
evaluator.getId(), descriptor.getMemory(), descriptor.getNumberOfCores()));
Optional<HeronWorker> result;
HeronWorker worker;
synchronized (containerPlans) {
Set<HeronWorker> workersAwaitingAllocation = getWorkersAwaitingAllocation();
if (workersAwaitingAllocation.isEmpty()) {
LOG.log(Level.INFO, "Could not find any workers waiting for allocation, closing {0}",
evaluator.getId());
evaluator.close();
return;
}
result = findLargestFittingWorker(evaluator, workersAwaitingAllocation, true);
if (!result.isPresent()) {
LOG.warning("Could not find a fitting worker in awaiting workers");
// TODO may need counting of missed allocation
evaluator.close();
return;
}
worker = result.get();
LOG.info(String.format("Worker:%d, cores:%d, mem:%s fits in the allocated container",
worker.workerId, worker.cores, worker.mem));
workersAwaitingAllocation.remove(worker);
multiKeyWorkerMap.assignEvaluatorToWorker(worker, evaluator);
}
LOG.log(Level.INFO, "Activating container {0} for heron worker, id: {1}",
new Object[]{evaluator.getId(), worker.workerId});
Configuration context = createContextConfig(worker.workerId);
evaluator.submitContext(context);
}
}
/**
* Initializes worker on the allocated container
*/
class FailedContainerHandler implements EventHandler<FailedEvaluator> {
@Override
public void onNext(FailedEvaluator evaluator) {
LOG.log(Level.WARNING, "Container:{0} failed", evaluator.getId());
Optional<HeronWorker> worker = multiKeyWorkerMap.lookupByEvaluatorId(evaluator.getId());
if (!worker.isPresent()) {
LOG.log(Level.WARNING,
"Unknown executor id for failed container: {0}, skip renew action",
evaluator.getId());
return;
}
LOG.log(Level.INFO, "Trying to relaunch worker {0} running on failed container {1}",
new Object[]{worker.get().workerId, evaluator.getId()});
multiKeyWorkerMap.detachEvaluatorAndRemove(worker.get());
requestContainerForWorker(worker.get().workerId, worker.get());
}
}
/**
* Once the container starts, this class starts Heron's executor process. Heron executor is
* started as a task. This task can be killed and the container can be reused.
*/
public final class HeronWorkerLauncher implements EventHandler<ActiveContext> {
@Override
public void onNext(ActiveContext context) {
if (isTopologyKilled.get()) {
LOG.log(Level.WARNING, "Topology has been killed, close new context: {0}", context.getId());
context.close();
return;
}
int workerId = Integer.valueOf(context.getId());
Optional<HeronWorker> worker = multiKeyWorkerMap.lookupByWorkerId(workerId);
if (!worker.isPresent()) {
context.close();
return;
}
worker.get().context = context;
submitHeronExecutorTask(workerId);
}
}
/**
* This class manages active tasks. The task handlers provided by REEF will be memorized to be
* used later for operations like topology restart. Restarting a task does not require new
* container request.
*/
public final class HeronWorkerStartHandler implements EventHandler<RunningTask> {
@Override
public void onNext(RunningTask runningTask) {
LOG.log(Level.INFO, "Task, id:{0}, has started.", runningTask.getId());
}
}
public final class HeronWorkerTaskFailureHandler implements EventHandler<FailedTask> {
@Override
public void onNext(FailedTask failedTask) {
LOG.log(Level.WARNING, "Task {0} failed. Relaunching the task", failedTask.getId());
if (isTopologyKilled.get()) {
LOG.info("The topology is killed. Ignore task fail event");
return;
}
submitHeronExecutorTask(Integer.valueOf(failedTask.getId()));
}
}
public final class HeronWorkerTaskCompletedErrorHandler implements EventHandler<CompletedTask> {
@Override
public void onNext(CompletedTask task) {
LOG.log(Level.INFO, "Task {0} completed.", task.getId());
if (isTopologyKilled.get()) {
LOG.info("The topology is killed. Ignore task complete event");
return;
}
LOG.log(Level.WARNING, "Task should not complete, relaunching {0}", task.getId());
submitHeronExecutorTask(Integer.valueOf(task.getId()));
}
}
}