blob: cc56c6e80df9c157765257ab947fb7e313f261d8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using Google.Protobuf;
using Grpc.Core;
using Org.Apache.REEF.Bridge.Core.Common.Client;
using Org.Apache.REEF.Bridge.Core.Common.Client.Config;
using Org.Apache.REEF.Bridge.Core.Common.Client.Events;
using Org.Apache.REEF.Bridge.Core.Proto;
using Org.Apache.REEF.Common.Client;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
using Void = Org.Apache.REEF.Bridge.Core.Proto.Void;
namespace Org.Apache.REEF.Bridge.Core.Grpc.Client
{
/// <summary>
/// gRPC based client service.
/// </summary>
internal sealed class ClientService : BridgeClient.BridgeClientBase, IClientService
{
private static readonly Logger Log = Logger.GetLogger(typeof(ClientService));
private static readonly Void Void = new Void();
private readonly IObserver<ISubmittedJob> _submittedJobHandler;
private readonly IObserver<IRunningJob> _runningJobHandler;
private readonly IObserver<ICompletedJob> _completedJobHandler;
private readonly IObserver<IFailedJob> _failedJobHandler;
private readonly IObserver<IJobMessage> _jobMessageHandler;
private readonly IObserver<IFailedRuntime> _failedRuntimeHandler;
private readonly IObserver<IWakeError> _wakeErrorHandler;
private REEFClient.REEFClientClient _clientStub = null;
private string _jobId = "unknown";
[Inject]
private ClientService(
[Parameter(Value = typeof(ClientParameters.SubmittedJobHandler))] IObserver<ISubmittedJob> submittedJobHandler,
[Parameter(Value = typeof(ClientParameters.RunningJobHandler))] IObserver<IRunningJob> runningJobHandler,
[Parameter(Value = typeof(ClientParameters.CompletedJobHandler))] IObserver<ICompletedJob> completedJobHandler,
[Parameter(Value = typeof(ClientParameters.FailedJobHandler))] IObserver<IFailedJob> failedJobHandler,
[Parameter(Value = typeof(ClientParameters.JobMessageHandler))] IObserver<IJobMessage> jobMessageHandler,
[Parameter(Value = typeof(ClientParameters.FailedRuntimeHandler))] IObserver<IFailedRuntime> failedRuntimeHandler,
[Parameter(Value = typeof(ClientParameters.WakeErrorHandler))] IObserver<IWakeError> wakeErrorHandler)
{
_submittedJobHandler = submittedJobHandler;
_runningJobHandler = runningJobHandler;
_completedJobHandler = completedJobHandler;
_failedJobHandler = failedJobHandler;
_jobMessageHandler = jobMessageHandler;
_failedRuntimeHandler = failedRuntimeHandler;
_wakeErrorHandler = wakeErrorHandler;
LauncherStatus = LauncherStatus.InitStatus;
}
public bool IsDone => LauncherStatus.IsDone;
public void Reset()
{
LauncherStatus = LauncherStatus.InitStatus;
}
public LauncherStatus LauncherStatus { get; private set; }
public void Close(byte[] message = null)
{
try
{
_clientStub?.DriverControlHandler(new DriverControlOp()
{
JobId = _jobId,
Message = message == null ? ByteString.Empty : ByteString.CopyFrom(message),
Operation = DriverControlOp.Types.Operation.Close
});
}
catch (Exception e)
{
Log.Log(Level.Warning, "exception occurred when trying to close job", e);
}
LauncherStatus = LauncherStatus.ForceCloseStatus;
_clientStub = null;
}
public void Send(byte[] message)
{
if (_clientStub != null)
{
_clientStub.DriverControlHandler(new DriverControlOp()
{
JobId = _jobId,
Message = ByteString.CopyFrom(message),
Operation = DriverControlOp.Types.Operation.Message
});
}
else
{
throw new IllegalStateException("Client service is closed");
}
}
public override Task<Void> RegisterREEFClient(REEFClientRegistration request, ServerCallContext context)
{
Log.Log(Level.Info, "REEF Client registered on port {0}", request.Port);
Channel driverServiceChannel = new Channel("127.0.0.1", (int)request.Port, ChannelCredentials.Insecure);
_clientStub = new REEFClient.REEFClientClient(driverServiceChannel);
return Task.FromResult(Void);
}
public override Task<Void> JobMessageHandler(JobMessageEvent request, ServerCallContext context)
{
Log.Log(Level.Info, "Job message from job id {0}", request.JobId);
_jobMessageHandler.OnNext(new JobMessage(request.JobId, request.Message.ToByteArray()));
return Task.FromResult(Void);
}
public override Task<Void> JobSumittedHandler(JobSubmittedEvent request, ServerCallContext context)
{
Log.Log(Level.Info, "Job id {0} submitted", request.JobId);
UpdateStatusAndNotify(LauncherStatus.SubmittedStatus);
_submittedJobHandler.OnNext(new SubmittedJob(request.JobId));
_jobId = request.JobId;
return Task.FromResult(Void);
}
public override Task<Void> JobRunningHandler(JobRunningEvent request, ServerCallContext context)
{
Log.Log(Level.Info, "Job id {0} running", request.JobId);
UpdateStatusAndNotify(LauncherStatus.RunningStatus);
_runningJobHandler.OnNext(new RunningJob(this, request.JobId));
return Task.FromResult(Void);
}
public override Task<Void> JobCompletedHandler(JobCompletedEvent request, ServerCallContext context)
{
if (IsDone) return Task.FromResult(Void);
Log.Log(Level.Info, "Job id {0} completed", request.JobId);
UpdateStatusAndNotify(LauncherStatus.CompletedStatus);
_completedJobHandler.OnNext(new CompletedJob(request.JobId));
return Task.FromResult(Void);
}
public override Task<Void> JobFailedHandler(JobFailedEvent request, ServerCallContext context)
{
if (IsDone) return Task.FromResult(Void);
Log.Log(Level.Info, "Job id {0} failed on {1}", request.JobId, request.Exception.Name);
var jobFailedEvent = new FailedJob(request.JobId,
request.Exception.Message,
request.Exception.Data.ToByteArray());
UpdateStatusAndNotify(LauncherStatus.Failed(jobFailedEvent.AsError()));
_failedJobHandler.OnNext(jobFailedEvent);
return Task.FromResult(Void);
}
public override Task<Void> RuntimeErrorHandler(ExceptionInfo request, ServerCallContext context)
{
if (!IsDone)
{
Log.Log(Level.Info, "Runtime error {0}", request.Message);
UpdateStatusAndNotify(LauncherStatus.FailedStatus);
_failedRuntimeHandler.OnNext(new FailedRuntime(_jobId, request.Message, request.Data.ToByteArray()));
}
return Task.FromResult(Void);
}
public override Task<Void> WakeErrorHandler(ExceptionInfo request, ServerCallContext context)
{
if (!IsDone)
{
Log.Log(Level.Info, "Wake error {0}", request.Message);
UpdateStatusAndNotify(LauncherStatus.FailedStatus);
_wakeErrorHandler.OnNext(new WakeError(_jobId,
request.Message,
Optional<byte[]>.Of(request.Data.ToByteArray())));
}
return Task.FromResult(Void);
}
private void UpdateStatusAndNotify(LauncherStatus status)
{
lock (this)
{
LauncherStatus = status;
Monitor.PulseAll(this);
}
}
}
}