blob: ae48068c1bdb53d19a9c40fb901791755d7a53a0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using Org.Apache.REEF.Common.Exceptions;
using Org.Apache.REEF.Common.Runtime;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
using Org.Apache.REEF.IMRU.OnREEF.Driver;
using Org.Apache.REEF.Network.Group.Task;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote.Impl;
namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
{
internal abstract class TaskHostBase : ITask, IObserver<ICloseEvent>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(TaskHostBase));
/// <summary>
/// Shows if the object has been disposed.
/// </summary>
protected int _disposed;
/// <summary>
/// Group Communication client for the task
/// </summary>
protected readonly IGroupCommClient _groupCommunicationsClient;
/// <summary>
/// Task close Coordinator to handle the work when receiving task close event
/// </summary>
private readonly TaskCloseCoordinator _taskCloseCoordinator;
/// <summary>
/// Specify whether to invoke garbage collector or not
/// </summary>
protected readonly bool _invokeGc;
/// <summary>
/// CommunicationGroupClient for the task
/// </summary>
protected readonly ICommunicationGroupClient _communicationGroupClient;
/// <summary>
/// The cancellation token to control the group communication operation cancellation
/// </summary>
protected readonly CancellationTokenSource _cancellationSource;
/// <summary>
/// Machine status for log purpose
/// </summary>
private readonly MachineStatus _machineStatus = new MachineStatus();
/// <summary>
/// Task host base class to hold the common stuff of both mapper and update tasks
/// </summary>
/// <param name="groupCommunicationsClient">Group Communication Client</param>
/// <param name="taskCloseCoordinator">The class that handles the close event for the task</param>
/// <param name="invokeGc">specify if want to invoke garbage collector or not </param>
protected TaskHostBase(
IGroupCommClient groupCommunicationsClient,
TaskCloseCoordinator taskCloseCoordinator,
bool invokeGc)
{
Logger.Log(Level.Info, "Entering TaskHostBase constructor with machine status {0}.", _machineStatus.ToString());
_groupCommunicationsClient = groupCommunicationsClient;
_communicationGroupClient = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
_invokeGc = invokeGc;
_taskCloseCoordinator = taskCloseCoordinator;
_cancellationSource = new CancellationTokenSource();
}
/// <summary>
/// Handle the exceptions in the Call() method
/// Default to IMRUSystemException to make it recoverable
/// </summary>
public byte[] Call(byte[] memento)
{
Logger.Log(Level.Info, "Entering {0} Call() with machine status {1}.", TaskHostName, _machineStatus.ToString());
try
{
_groupCommunicationsClient.Initialize(_cancellationSource);
return TaskBody(memento);
}
catch (Exception e)
{
if (e is IMRUTaskAppException)
{
throw;
}
if (IsCommunicationException(e))
{
HandleCommunicationException(e);
}
else
{
HandleSystemException(e);
}
}
finally
{
Logger.Log(Level.Info, "TaskHostBase::Finally");
_taskCloseCoordinator.SignalTaskStopped();
}
Logger.Log(Level.Info, "{0} returned with cancellation token:{1}.", TaskHostName, _cancellationSource.IsCancellationRequested);
return null;
}
private static bool IsCommunicationException(Exception e)
{
if (e is OperationCanceledException || e is IOException || e is TcpClientConnectionException ||
e is RecoverableNetworkException || e is SocketException ||
(e is AggregateException && e.InnerException != null && e.InnerException is IOException))
{
return true;
}
return false;
}
/// <summary>
/// The body of Call method. Subclass must override it.
/// </summary>
protected abstract byte[] TaskBody(byte[] memento);
/// <summary>
/// Task host name
/// </summary>
protected abstract string TaskHostName { get; }
/// <summary>
/// Task close handler. Call TaskCloseCoordinator to handle the event.
/// </summary>
public void OnNext(ICloseEvent closeEvent)
{
_taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
}
/// <summary>
/// Dispose function. Dispose IGroupCommunicationsClient.
/// </summary>
public virtual void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
_groupCommunicationsClient.Dispose();
}
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
/// <summary>
/// Convert the exception into IMRUTaskGroupCommunicationException
/// </summary>
protected void HandleCommunicationException(Exception e)
{
HandleException(e, new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError, e));
}
/// <summary>
/// Convert the exception into IMRUSystemException
/// </summary>
protected void HandleSystemException(Exception e)
{
HandleException(e, new IMRUTaskSystemException(TaskManager.TaskSystemError, e));
}
/// <summary>
/// Convert the exception into IMRUTaskAppException
/// </summary>
protected void HandleTaskAppException(Exception e)
{
HandleException(e, new IMRUTaskAppException(TaskManager.TaskAppError, e));
}
/// <summary>
/// Log and throw target exception if cancellation token is not set
/// In the cancellation case, simply log and return.
/// </summary>
private void HandleException(Exception originalException, Exception targetException)
{
Logger.Log(Level.Error,
"Received exception in {0} with cancellation token {1}: [{2}]",
TaskHostName,
_cancellationSource.IsCancellationRequested,
originalException);
if (!_cancellationSource.IsCancellationRequested)
{
Logger.Log(Level.Error,
"{0} is throwing {1} with cancellation token: {2}.",
TaskHostName,
targetException.GetType(),
_cancellationSource.IsCancellationRequested);
throw targetException;
}
}
}
}