blob: ef3061bca23bbcc81fe0c1d7db323b5d6abe7ca8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.vortex.driver;
import org.apache.commons.lang.SerializationUtils;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.evaluator.*;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.driver.task.TaskMessage;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.vortex.api.VortexStart;
import org.apache.reef.vortex.common.TaskletFailureReport;
import org.apache.reef.vortex.common.TaskletResultReport;
import org.apache.reef.vortex.common.WorkerReport;
import org.apache.reef.vortex.evaluator.VortexWorker;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.SingleThreadStage;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.time.event.StartTime;
import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* REEF Driver for Vortex.
*/
@Unit
@DriverSide
final class VortexDriver {
private static final Logger LOG = Logger.getLogger(VortexDriver.class.getName());
private static final int MAX_NUM_OF_FAILURES = 5;
private static final int SCHEDULER_EVENT = 0; // Dummy number to comply with onNext() interface
private final AtomicInteger numberOfFailures = new AtomicInteger(0);
private final EvaluatorRequestor evaluatorRequestor; // for requesting resources
private final VortexMaster vortexMaster; // Vortex Master
private final VortexRequestor vortexRequestor; // For sending Commands to remote workers
// Resource configuration for single thread pool
private final int evalMem;
private final int evalNum;
private final int evalCores;
private final EStage<VortexStart> vortexStartEStage;
private final VortexStart vortexStart;
private final EStage<Integer> pendingTaskletSchedulerEStage;
@Inject
private VortexDriver(final EvaluatorRequestor evaluatorRequestor,
final VortexRequestor vortexRequestor,
final VortexMaster vortexMaster,
final VortexStart vortexStart,
final VortexStartExecutor vortexStartExecutor,
final PendingTaskletScheduler pendingTaskletScheduler,
@Parameter(VortexMasterConf.WorkerMem.class) final int workerMem,
@Parameter(VortexMasterConf.WorkerNum.class) final int workerNum,
@Parameter(VortexMasterConf.WorkerCores.class) final int workerCores,
@Parameter(VortexMasterConf.NumberOfVortexStartThreads.class) final int numOfStartThreads) {
this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, numOfStartThreads);
this.vortexStart = vortexStart;
this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletScheduler, 1);
this.evaluatorRequestor = evaluatorRequestor;
this.vortexMaster = vortexMaster;
this.vortexRequestor = vortexRequestor;
this.evalMem = workerMem;
this.evalNum = workerNum;
this.evalCores = workerCores;
}
/**
* Driver started.
*/
final class StartHandler implements EventHandler<StartTime> {
@Override
public void onNext(final StartTime startTime) {
// Initial Evaluator Request
evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
.setNumber(evalNum)
.setMemory(evalMem)
.setNumberOfCores(evalCores)
.build());
// Run Vortex Start
vortexStartEStage.onNext(vortexStart);
// Run Scheduler
pendingTaskletSchedulerEStage.onNext(SCHEDULER_EVENT);
}
}
/**
* Container allocated.
*/
final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
@Override
public void onNext(final AllocatedEvaluator allocatedEvaluator) {
LOG.log(Level.INFO, "Container allocated");
final String workerId = allocatedEvaluator.getId() + "_vortex_worker";
final Configuration workerConfiguration = VortexWorkerConf.CONF
.set(VortexWorkerConf.NUM_OF_THREADS, evalCores) // NUM_OF_THREADS = evalCores
.build();
final Configuration taskConfiguration = TaskConfiguration.CONF
.set(TaskConfiguration.IDENTIFIER, workerId)
.set(TaskConfiguration.TASK, VortexWorker.class)
.set(TaskConfiguration.ON_SEND_MESSAGE, VortexWorker.class)
.set(TaskConfiguration.ON_MESSAGE, VortexWorker.DriverMessageHandler.class)
.set(TaskConfiguration.ON_CLOSE, VortexWorker.TaskCloseHandler.class)
.build();
allocatedEvaluator.submitTask(Configurations.merge(workerConfiguration, taskConfiguration));
}
}
/**
* Evaluator up and running.
*/
final class RunningTaskHandler implements EventHandler<RunningTask> {
@Override
public void onNext(final RunningTask reefTask) {
LOG.log(Level.INFO, "Worker up and running");
vortexMaster.workerAllocated(new VortexWorkerManager(vortexRequestor, reefTask));
}
}
/**
* Message received.
*/
final class TaskMessageHandler implements EventHandler<TaskMessage> {
@Override
public void onNext(final TaskMessage taskMessage) {
final String workerId = taskMessage.getId();
final WorkerReport workerReport= (WorkerReport)SerializationUtils.deserialize(taskMessage.get());
switch (workerReport.getType()) {
case TaskletResult:
final TaskletResultReport taskletResultReport = (TaskletResultReport)workerReport;
vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
break;
case TaskletFailure:
final TaskletFailureReport taskletFailureReport = (TaskletFailureReport)workerReport;
vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), taskletFailureReport.getException());
break;
default:
throw new RuntimeException("Unknown Report");
}
}
}
/**
* Evaluator preempted.
* TODO[REEF-501]: Distinguish different types of FailedEvaluator in Vortex.
*/
final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
@Override
public void onNext(final FailedEvaluator failedEvaluator) {
LOG.log(Level.INFO, "Evaluator preempted");
if (numberOfFailures.incrementAndGet() >= MAX_NUM_OF_FAILURES) {
throw new RuntimeException("Exceeded max number of failures");
} else {
// We request a new evaluator to take the place of the preempted one
evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
.setNumber(1)
.setMemory(evalMem)
.setNumberOfCores(evalCores)
.build());
vortexMaster.workerPreempted(failedEvaluator.getFailedTask().get().getId());
}
}
}
}