blob: 0944680104653e09c92ee8aba5fd9d7b6ab0fac8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Net;
using System.Text;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Examples.Tasks.HelloTask;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Logging;
using ContextConfiguration = Org.Apache.REEF.Common.Context.ContextConfiguration;
using IRunningTask = Org.Apache.REEF.Driver.Task.IRunningTask;
namespace Org.Apache.REEF.Tests.Functional.Bridge
enum DriverStatus
Init = 0,
Idle = 1,
RunningTasks = 2,
CompleteTasks = 3
internal enum TaskStatus
Submitting = 0,
Running = 1,
Completed = 2
/// <summary>
/// A demo class that contains basic handlers. It runs given tasks and is able to get request from http server and start to run the tasks again.
/// It handle various http requests. It also monitoring task status and driver status.
/// When IsRetain is configured as false. Context will be closed after executing all the tasks.
/// The default of IsRetain is true, that would retain the evaluators and wait for http requests to submit more jobs.
/// </summary>
public class HelloSimpleEventHandlers :
private const int NumberOfTasks = 5;
private static readonly Logger Logger = Logger.GetLogger(typeof(HelloSimpleEventHandlers));
private readonly IList<IActiveContext> _activeContexts = new List<IActiveContext>();
private DriverStatus _driveStatus;
private TaskContext _taskContext;
private readonly bool _isRetain;
private readonly int _numberOfEvaluators;
private readonly IEvaluatorRequestor _evaluatorRequestor;
private HelloSimpleEventHandlers(IEvaluatorRequestor evaluatorRequestor,
[Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators,
[Parameter(typeof(IsRetain))] bool isRetain)
Logger.Log(Level.Info, "HelloSimpleEventHandlers constructor");
_taskContext = new TaskContext();
_taskContext.TotalTasks = NumberOfTasks;
_driveStatus = DriverStatus.Init;
_isRetain = isRetain;
_numberOfEvaluators = numberOfEvaluators;
_evaluatorRequestor = evaluatorRequestor;
public static string ParsePathInfo(string pathInfo)
string[] p = pathInfo.Split('/');
foreach (string s in p)
Logger.Log(Level.Info, s);
if (p.Length > 3)
return p[3];
return null;
public static void BuildHttpResponse(
ReefHttpResponse response,
HttpStatusCode httpStatusCode,
string strResponse)
response.Status = httpStatusCode;
response.OutputStream = ByteUtilities.StringToByteArrays(strResponse);
public static void BuildHttpResponse(
ReefHttpResponse response,
HttpStatusCode httpStatusCode,
byte[] bytesResponse)
response.Status = httpStatusCode;
response.OutputStream = bytesResponse;
public void OnNext(IDriverStarted driverStarted)
using (Logger.LogFunction("HelloSimpleEventHandlers::evaluatorRequestor received"))
int evaluatorsNumber = _numberOfEvaluators;
int memory = 1024 * 3;
int cpuCoreCount = 1;
string rack = "WonderlandRack";
string evaluatorBatchId = "evaluatorThatRequires3GBofMemory";
var request =
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
string taskId = "Task_" + allocatedEvaluator.Id;
var descriptor = allocatedEvaluator.GetEvaluatorDescriptor();
Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator is assigned with {0} MB of memory and {1} cores.", descriptor.Memory, descriptor.VirtualCore));
using (Logger.LogFunction("HelloSimpleEventHandlers::allocatedEvaluator received {0}.", taskId))
IConfiguration contextConfiguration = ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier, "HelloSimpleEventHandlersContext_" + Guid.NewGuid().ToString("N")).Build();
public void OnNext(IActiveContext activeContext)
using (Logger.LogFunction("HelloSimpleEventHandlers::activeContext received"))
Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received activeContext, EvaluatorId id: {0}", activeContext.EvaluatorId));
_driveStatus = DriverStatus.RunningTasks;
public void OnNext(ICompletedTask value)
using (Logger.LogFunction("HelloSimpleEventHandlers::CompletedTask received"))
Logger.Log(Level.Info, "Received CompletedTask: {0}, task id: {1}.", value.Id, _taskContext.CurrentTaskId());
var messageStr = value.Message == null || value.Message.Length == 0 ?
string.Empty : ByteUtilities.ByteArraysToString(value.Message);
Logger.Log(Level.Verbose, "Message received from CompletedTask {0} is: [{1}]", value.Id, messageStr);
_taskContext.UpdateTaskStatus(value.Id, TaskStatus.Completed);
public void OnError(Exception error)
Logger.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Exception in coral handlers Msg: {1} Stack: {2}", error.Message, error.StackTrace));
public void OnCompleted()
public void OnNext(IRunningTask value)
_taskContext.UpdateTaskStatus(_taskContext.CurrentTaskId(), TaskStatus.Running);
public void OnNext(IFailedTask value)
public void OnNext(IFailedEvaluator value)
public void OnNext(ICompletedEvaluator completedEvaluator)
string messageStr = string.Format(
"HelloSimpleEventHandlers: Evaluator [{0}] is done.",
public string GetSpecification()
return "crystal";
public void OnHttpRequest(ReefHttpRequest request, ReefHttpResponse response)
string target = ParsePathInfo(request.PathInfo);
Logger.Log(Level.Info, "Target: " + target + ". PathInfo: " + request.PathInfo);
if (target != null && target.Equals("driverstatus"))
Logger.Log(Level.Info, "Target: " + target + ". Driver status: " + _driveStatus.ToString());
string msg = string.Format(CultureInfo.CurrentCulture, "Current Driver status: {0} ", _driveStatus.ToString());
BuildHttpResponse(response, HttpStatusCode.OK, msg);
if (target != null && target.Equals("taskstatus"))
Logger.Log(Level.Info, "Target: " + target + ". TaskStatus string: " + _taskContext.TaskStatusString());
BuildHttpResponse(response, HttpStatusCode.OK, _taskContext.TaskStatusString());
if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && _driveStatus == DriverStatus.Init)
BuildHttpResponse(response, HttpStatusCode.OK, "Driver is not ready, wait a few second then send request again!!!");
if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && _driveStatus == DriverStatus.RunningTasks)
string msg = string.Format(CultureInfo.CurrentCulture,
"A job is running. Please check driver status and then submit your job again.");
BuildHttpResponse(response, HttpStatusCode.OK, msg);
if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && _driveStatus == DriverStatus.Idle)
string numberOfTasks = GetQueryValue(request.Querystring, "numberoftasks");
if (numberOfTasks == null)
BuildHttpResponse(response, HttpStatusCode.OK, "Please specify number of tasks to run");
_driveStatus = DriverStatus.RunningTasks;
using (Logger.LogFunction("HelloSimpleEventHandlers::Processing a new Job from web request"))
_taskContext = new TaskContext();
_taskContext.TotalTasks = int.Parse(numberOfTasks, CultureInfo.CurrentCulture);
BuildHttpResponse(response, HttpStatusCode.OK, "Job from web request is submitted and is running!!!");
foreach (var c in _activeContexts)
BuildHttpResponse(response, HttpStatusCode.OK, "Unsupported query");
private static IDictionary<string, string> ParseQueryString(string queryString)
IDictionary<string, string> queryPairs = new Dictionary<string, string>();
if (!string.IsNullOrEmpty(queryString))
string[] queries = queryString.Split('&');
foreach (string query in queries)
string[] pairs = query.Split('=');
if (pairs.Length == 2 && !pairs[0].Equals(string.Empty) && !pairs[1].Equals(string.Empty))
queryPairs[pairs[0]] = pairs[1];
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "query key: {0}, Query value: {1}.", pairs[0], pairs[1]));
return queryPairs;
private static string GetQueryValue(string queryString, string name)
IDictionary<string, string> pairs = ParseQueryString(queryString);
string v;
pairs.TryGetValue(name, out v);
return v;
private void SubmitNextTask(IActiveContext activeContext)
Logger.Log(Level.Info, "SubmitNextTask with evaluator id: " + activeContext.EvaluatorId);
IConfiguration finalConfiguration = GetNextTaskConfiguration();
if (null != finalConfiguration)
Logger.Log(Level.Info, "Executing task id " + _taskContext.CurrentTaskId());
Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Submitting Task {0}", _taskContext.CurrentTaskId()));
if (_taskContext.TaskCompleted == _taskContext.TotalTasks)
Logger.Log(Level.Info, "All tasks submitted and completed, active context remains idle");
_driveStatus = DriverStatus.Idle;
if (!_isRetain)
private IConfiguration GetNextTaskConfiguration()
string nextTaskId = _taskContext.NextTaskId();
Logger.Log(Level.Info, "GetNextTaskConfiguration, nextTaskId: " + nextTaskId);
if (nextTaskId != null)
IConfiguration taskConfiguration = TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier, nextTaskId)
.Set(TaskConfiguration.Task, GenericType<HelloTask>.Class)
.Set(TaskConfiguration.OnMessage, GenericType<HelloTask.HelloDriverMessageHandler>.Class)
.Set(TaskConfiguration.OnSendMessage, GenericType<HelloTaskMessage>.Class)
return taskConfiguration;
return null;
class TaskContext
private static readonly Logger Logger = Logger.GetLogger(typeof(TaskContext));
private readonly IList<string> _taskIds = new List<string>();
private readonly IDictionary<string, TaskStatus> _tasks = new Dictionary<string, TaskStatus>();
public TaskContext()
NextTaskIndex = 0;
TaskCompleted = 0;
public int TotalTasks { get; set; }
public int NextTaskIndex { get; set; }
public int TaskCompleted { get; set; }
public string NextTaskId()
Logger.Log(Level.Verbose, "NextTaskId: " + NextTaskIndex);
if (NextTaskIndex < TotalTasks)
string id = "Jan7DemoTask_" + DateTime.Now.Ticks;
_tasks.Add(id, TaskStatus.Submitting);
return id;
return null;
public string CurrentTaskId()
Logger.Log(Level.Verbose, "CurrentTaskIndex: " + (NextTaskIndex - 1));
if (NextTaskIndex <= TotalTasks)
Logger.Log(Level.Verbose, "CurrentTaskId: " + _taskIds[NextTaskIndex - 1]);
return _taskIds[NextTaskIndex - 1];
return null; // either not started or completed
public void UpdateTaskStatus(string taskId, TaskStatus status)
_tasks[taskId] = status;
public string TaskStatusString()
Logger.Log(Level.Verbose, "TaskStatusString 1, nextTaskIndex: " + NextTaskIndex);
StringBuilder sb = new StringBuilder();
if (_tasks.Count > 0)
foreach (var pair in _tasks)
sb.AppendLine("Task id: " + pair.Key + " Task status: " + pair.Value.ToString());
sb.Append("No task is running yet");
return sb.ToString();
[NamedParameter(Documentation = "NumberOfTasks", ShortName = "NumberOfTasks", DefaultValue = "2")]
class NumberOfEvaluators : Name<int>
[NamedParameter(Documentation = "Retain", ShortName = "Retain", DefaultValue = "true")]
class IsRetain : Name<bool>