blob: 0ec807896a0b0eb2a51a979a4b3fb011635e8ff6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Threading;
using System.Threading.Tasks;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
{
/// <summary>
/// Streaming codec for MapInputWithControlMessage
/// </summary>
/// <typeparam name="TMapInput"></typeparam>
internal sealed class MapInputWithControlMessageCodec<TMapInput> : IStreamingCodec<MapInputWithControlMessage<TMapInput>>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(MapInputWithControlMessage<>));
private readonly IStreamingCodec<TMapInput> _baseCodec;
[Inject]
private MapInputWithControlMessageCodec(IStreamingCodec<TMapInput> baseCodec)
{
_baseCodec = baseCodec;
}
/// <summary>
/// Reads message from reader
/// </summary>
/// <param name="reader">reader from which to read the message</param>
/// <returns>Read message</returns>
MapInputWithControlMessage<TMapInput> IStreamingCodec<MapInputWithControlMessage<TMapInput>>.Read(
IDataReader reader)
{
byte[] messageType = new byte[1];
reader.Read(ref messageType, 0, 1);
MapControlMessage controlMessage;
switch (messageType[0])
{
case 0:
controlMessage = MapControlMessage.AnotherRound;
TMapInput message = _baseCodec.Read(reader);
return new MapInputWithControlMessage<TMapInput>(message, controlMessage);
case 1:
controlMessage = MapControlMessage.Stop;
return new MapInputWithControlMessage<TMapInput>(controlMessage);
}
Exceptions.Throw(new Exception("Control message type not valid in Codec read"), Logger);
return null;
}
/// <summary>
/// Writes message to the writer
/// </summary>
/// <param name="obj">Message to write</param>
/// <param name="writer">Writer used to write the message</param>
void IStreamingCodec<MapInputWithControlMessage<TMapInput>>.Write(MapInputWithControlMessage<TMapInput> obj,
IDataWriter writer)
{
switch (obj.ControlMessage)
{
case MapControlMessage.AnotherRound:
writer.Write(new byte[] { 0 }, 0, 1);
_baseCodec.Write(obj.Message, writer);
break;
case MapControlMessage.Stop:
writer.Write(new byte[] { 1 }, 0, 1);
break;
}
}
/// <summary>
/// Reads message asynchronously from reader
/// </summary>
/// <param name="reader">reader from which to read the message</param>
/// <param name="token">Cancellation token</param>
/// <returns>Read message</returns>
async Task<MapInputWithControlMessage<TMapInput>> IStreamingCodec<MapInputWithControlMessage<TMapInput>>.ReadAsync(
IDataReader reader, CancellationToken token)
{
byte[] messageType = new byte[1];
await reader.ReadAsync(messageType, 0, 1, token);
MapControlMessage controlMessage = MapControlMessage.AnotherRound;
switch (messageType[0])
{
case 0:
controlMessage = MapControlMessage.AnotherRound;
TMapInput message = await _baseCodec.ReadAsync(reader, token);
return new MapInputWithControlMessage<TMapInput>(message, controlMessage);
case 1:
controlMessage = MapControlMessage.Stop;
return new MapInputWithControlMessage<TMapInput>(controlMessage);
}
Exceptions.Throw(new Exception("Control message type not valis in Codec read"), Logger);
return null;
}
/// <summary>
/// Writes message asynchronously to the writer
/// </summary>
/// <param name="obj">Message to write</param>
/// <param name="writer">Writer used to write the message</param>
/// <param name="token">Cancellation token</param>
async Task IStreamingCodec<MapInputWithControlMessage<TMapInput>>.WriteAsync(
MapInputWithControlMessage<TMapInput> obj, IDataWriter writer, CancellationToken token)
{
switch (obj.ControlMessage)
{
case MapControlMessage.AnotherRound:
await writer.WriteAsync(new byte[] { 0 }, 0, 1, token);
await _baseCodec.WriteAsync(obj.Message, writer, token);
break;
case MapControlMessage.Stop:
writer.Write(new byte[] { 1 }, 0, 1);
break;
}
}
}
}