blob: bf71f4dce9d33c587090f5dab531cfe5bb285e6a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.reef.examples.hellohttp;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.task.CompletedTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.examples.library.Command;
import org.apache.reef.examples.library.ShellTask;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
import org.apache.reef.wake.time.event.StartTime;
import org.apache.reef.wake.time.event.StopTime;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
* The Driver code for the Hello REEF Http Distributed Shell Application.
public final class HttpShellJobDriver {
private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName());
* String codec is used to encode the results
* before passing them back to the client.
public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
* Evaluator Requester.
private final EvaluatorRequestor evaluatorRequestor;
* Number of Evaluators to request (default is 2).
private final int numEvaluators = 2;
* Shell execution results from each Evaluator.
private final List<String> results = new ArrayList<>();
* Map from context ID to running evaluator context.
private final Map<String, ActiveContext> contexts = new HashMap<>();
* Job driver state.
private State state = State.INIT;
* First command to execute. Sometimes client can send us the first command
* before Evaluators are available; we need to store this command here.
private String cmd;
* Number of evaluators/tasks to complete.
private int expectCount = 0;
* Callback handler for http return message.
private HttpServerShellCmdHandler.ClientCallBackHandler httpCallbackHandler;
* Job Driver Constructor.
* @param requestor
* @param clientCallBackHandler
public HttpShellJobDriver(final EvaluatorRequestor requestor,
final HttpServerShellCmdHandler.ClientCallBackHandler clientCallBackHandler) {
this.evaluatorRequestor = requestor;
this.httpCallbackHandler = clientCallBackHandler;
LOG.log(Level.FINE, "Instantiated 'HttpShellJobDriver'");
* Construct the final result and forward it to the Client.
private void returnResults() {
final StringBuilder sb = new StringBuilder();
for (final String result : this.results) {
LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
* Submit command to all available evaluators.
* @param command shell command to execute.
private synchronized void submit(final String command) {
LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
new Object[]{command, this.contexts.size(), this.state});
assert this.state == State.READY;
this.expectCount = this.contexts.size();
this.state = State.WAIT_TASKS;
this.cmd = null;
for (final ActiveContext context : this.contexts.values()) {
this.submit(context, command);
* Submit a Task that execute the command to a single Evaluator.
* This method is called from <code>submitTask(cmd)</code>.
private void submit(final ActiveContext context, final String command) {
try {
LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
.set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
.set(TaskConfiguration.TASK, ShellTask.class)
cb.bindNamedParameter(Command.class, command);
} catch (final BindException ex) {
LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
throw new RuntimeException(ex);
* Request the evaluators.
private synchronized void requestEvaluators() {
assert this.state == State.INIT;
LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
this.state = State.WAIT_EVALUATORS;
this.expectCount = this.numEvaluators;
* Possible states of the job driver. Can be one of:
* <dl>
* <dt><code>INIT</code></dt><dd>initial state, ready to request the evaluators.</dd>
* <dt><code>WAIT_EVALUATORS</code></dt><dd>Wait for requested evaluators to initialize.</dd>
* <dt><code>READY</code></dt><dd>Ready to submitTask a new task.</dd>
* <dt><code>WAIT_TASKS</code></dt><dd>Wait for tasks to complete.</dd>
* </dl>
private enum State {
* Receive notification that an Evaluator had been allocated,
* and submitTask a new Task in that Evaluator.
final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
public void onNext(final AllocatedEvaluator eval) {
synchronized (HttpShellJobDriver.this) {
LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()});
assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
try {
ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
} catch (final BindException ex) {
LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
throw new RuntimeException(ex);
* Receive notification that the entire Evaluator had failed.
* Stop other jobs and pass this error to the job observer on the client.
final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
public void onNext(final FailedEvaluator eval) {
synchronized (HttpShellJobDriver.this) {
LOG.log(Level.SEVERE, "FailedEvaluator", eval);
for (final FailedContext failedContext : eval.getFailedContextList()) {
throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
* Receive notification that a new Context is available.
* Submit a new Distributed Shell Task to that Context.
final class ActiveContextHandler implements EventHandler<ActiveContext> {
public void onNext(final ActiveContext context) {
synchronized (HttpShellJobDriver.this) {
LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state});
assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
HttpShellJobDriver.this.contexts.put(context.getId(), context);
if (--HttpShellJobDriver.this.expectCount <= 0) {
HttpShellJobDriver.this.state = State.READY;
if (HttpShellJobDriver.this.cmd == null) {
LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
} else {
* Receive notification that the Context had completed.
* Remove context from the list of active context.
final class ClosedContextHandler implements EventHandler<ClosedContext> {
public void onNext(final ClosedContext context) {
LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
synchronized (HttpShellJobDriver.this) {
final class HttpClientCloseHandler implements EventHandler<Void> {
public void onNext(final Void aVoid) throws RuntimeException {
LOG.log(Level.INFO, "Received a close message from the client. " +
"You can put code here to properly close drivers and evaluators.");
for (final ActiveContext c : contexts.values()) {
* Receive notification that the Context had failed.
* Remove context from the list of active context and notify the client.
final class FailedContextHandler implements EventHandler<FailedContext> {
public void onNext(final FailedContext context) {
LOG.log(Level.SEVERE, "FailedContext", context);
synchronized (HttpShellJobDriver.this) {
throw new RuntimeException("Failed context: ", context.asError());
* Receive notification that the Task has completed successfully.
final class CompletedTaskHandler implements EventHandler<CompletedTask> {
public void onNext(final CompletedTask task) {
LOG.log(Level.INFO, "Completed task: {0}", task.getId());
// Take the message returned by the task and add it to the running result.
final String result = CODEC.decode(task.get());
synchronized (HttpShellJobDriver.this) {
HttpShellJobDriver.this.results.add(task.getId() + " :: " + result);
LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state});
if (--HttpShellJobDriver.this.expectCount <= 0) {
HttpShellJobDriver.this.state = State.READY;
if (HttpShellJobDriver.this.cmd != null) {
* Receive notification from the client.
final class ClientMessageHandler implements EventHandler<byte[]> {
public void onNext(final byte[] message) {
synchronized (HttpShellJobDriver.this) {
final String command = CODEC.decode(message);
LOG.log(Level.INFO, "Client message: {0} state: {1}",
new Object[]{command, HttpShellJobDriver.this.state});
assert HttpShellJobDriver.this.cmd == null;
if (HttpShellJobDriver.this.state == State.READY) {
} else {
// not ready yet - save the command for better times.
assert HttpShellJobDriver.this.state == State.WAIT_EVALUATORS;
HttpShellJobDriver.this.cmd = command;
* Job Driver is ready and the clock is set up: request the evaluators.
final class StartHandler implements EventHandler<StartTime> {
public void onNext(final StartTime startTime) {
synchronized (HttpShellJobDriver.this) {
LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
assert state == State.INIT;
* Shutting down the job driver: close the evaluators.
final class StopHandler implements EventHandler<StopTime> {
public void onNext(final StopTime time) {
synchronized (HttpShellJobDriver.this) {
LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
for (final ActiveContext context : contexts.values()) {