blob: d698aa02920f7e8699e1a19a9feead64ba0b713b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using Org.Apache.REEF.Utilities;
using ProtoBuf;
[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1403:FileMayOnlyContainASingleNamespace", Justification = "Serializers for all protobuf types")]
[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "Serializers for all protobuf types")]
namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol
{
/// <summary>
/// Add serializer/deserializer to REEFMessage
/// </summary>
public partial class REEFMessage
{
public REEFMessage(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
{
_evaluatorHeartBeat = evaluatorHeartbeatProto;
}
public static REEFMessage Deserialize(byte[] bytes)
{
REEFMessage pbuf = null;
using (var s = new MemoryStream(bytes))
{
pbuf = Serializer.Deserialize<REEFMessage>(s);
}
return pbuf;
}
public byte[] Serialize()
{
byte[] b = null;
using (var s = new MemoryStream())
{
Serializer.Serialize<REEFMessage>(s, this);
b = new byte[s.Position];
var fullBuffer = s.GetBuffer();
Array.Copy(fullBuffer, b, b.Length);
}
return b;
}
}
/// <summary>
/// Add serializer/deserializer to EvaluatorHeartbeatProto
/// </summary>
public partial class EvaluatorHeartbeatProto
{
public static EvaluatorHeartbeatProto Deserialize(byte[] bytes)
{
EvaluatorHeartbeatProto pbuf = null;
using (var s = new MemoryStream(bytes))
{
pbuf = Serializer.Deserialize<EvaluatorHeartbeatProto>(s);
}
return pbuf;
}
public byte[] Serialize()
{
byte[] b = null;
using (var s = new MemoryStream())
{
Serializer.Serialize<EvaluatorHeartbeatProto>(s, this);
b = new byte[s.Position];
var fullBuffer = s.GetBuffer();
Array.Copy(fullBuffer, b, b.Length);
}
return b;
}
public override string ToString()
{
string contextStatus = string.Empty;
string taskStatusMessage = string.Empty;
foreach (ContextStatusProto contextStatusProto in context_status)
{
contextStatus += string.Format(CultureInfo.InvariantCulture, "evaluator {0} has context {1} in state {2} with recovery flag {3}",
evaluator_status.evaluator_id,
contextStatusProto.context_id,
contextStatusProto.context_state,
contextStatusProto.recovery);
}
if (task_status != null && task_status.task_message != null && task_status.task_message.Count > 0)
{
foreach (TaskStatusProto.TaskMessageProto taskMessageProto in task_status.task_message)
{
taskStatusMessage += ByteUtilities.ByteArraysToString(taskMessageProto.message);
}
}
return string.Format(CultureInfo.InvariantCulture, "EvaluatorHeartbeatProto: task_id=[{0}], task_status=[{1}], task_message=[{2}], evaluator_status=[{3}], context_status=[{4}], timestamp=[{5}], recoveryFlag =[{6}]",
task_status == null ? string.Empty : task_status.task_id,
task_status == null ? string.Empty : task_status.state.ToString(),
taskStatusMessage,
evaluator_status.state.ToString(),
contextStatus,
timestamp,
recovery);
}
}
}