blob: 6152e0596442d6c1d81556b3936adf9064acbe72 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System.Reactive;
using System.Collections.Generic;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Task;
using Org.Apache.REEF.Network.Group.Task.Impl;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Network.Group.Pipelining;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Network.Group.Operators.Impl
{
/// <summary>
/// Group Communication Operator used to receive broadcast messages in pipelined fashion.
/// </summary>
/// <typeparam name="T">The type of message being sent.</typeparam>
[Private]
public sealed class BroadcastReceiver<T> : IBroadcastReceiver<T>, IGroupCommOperatorInternal
{
private const int PipelineVersion = 2;
private readonly IOperatorTopology<PipelineMessage<T>> _topology;
private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastReceiver<T>));
private readonly bool _initialize;
/// <summary>
/// Creates a new BroadcastReceiver.
/// </summary>
/// <param name="operatorName">The operator identifier</param>
/// <param name="groupName">The name of the CommunicationGroup that the operator belongs to</param>
/// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
/// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The node's topology graph</param>
/// <param name="networkHandler">The incoming message handler</param>
/// <param name="dataConverter">The converter used to convert original message to pipelined ones and vice versa.</param>
[Inject]
private BroadcastReceiver(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
ICommunicationGroupNetworkObserver networkHandler,
IPipelineDataConverter<T> dataConverter)
{
OperatorName = operatorName;
GroupName = groupName;
Version = PipelineVersion;
PipelineDataConverter = dataConverter;
_topology = topology;
_initialize = initialize;
var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
networkHandler.Register(operatorName, msgHandler);
}
/// <summary>
/// Returns the operator identifier.
/// </summary>
public string OperatorName { get; private set; }
/// <summary>
/// Returns the name of the CommunicationGroup that the operator belongs to.
/// </summary>
public string GroupName { get; private set; }
/// <summary>
/// Returns the operator version.
/// </summary>
public int Version { get; private set; }
/// <summary>
/// Returns the IPipelineDataConvert used to convert messages to pipeline form and vice-versa
/// </summary>
public IPipelineDataConverter<T> PipelineDataConverter { get; private set; }
/// <summary>
/// Receive a message from parent BroadcastSender.
/// </summary>
/// <returns>The incoming message</returns>
public T Receive()
{
PipelineMessage<T> message;
var messageList = new List<PipelineMessage<T>>();
do
{
message = _topology.ReceiveFromParent();
if (_topology.HasChildren())
{
_topology.SendToChildren(message, MessageType.Data);
}
messageList.Add(message);
}
while (!message.IsLast);
return PipelineDataConverter.FullMessage(messageList);
}
/// <summary>
/// Ensure all parent and children nodes in the topology are registered with teh Name Service.
/// </summary>
void IGroupCommOperatorInternal.WaitForRegistration()
{
if (_initialize)
{
_topology.Initialize();
}
}
}
}