blob: bd0c94b677390b086709ba6ad9cafe776cbb8e01 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Net;
using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Network.NetworkService.Codec;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake;
using Org.Apache.REEF.Wake.Remote;
namespace Org.Apache.REEF.Network.NetworkService
{
/// <summary>
/// Network service used for Reef Task communication.
/// </summary>
/// <typeparam name="T">The message type</typeparam>
public class NetworkService<T> : INetworkService<T>
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(NetworkService<>));
private readonly IRemoteManager<NsMessage<T>> _remoteManager;
private readonly IObserver<NsMessage<T>> _messageHandler;
private readonly ICodec<NsMessage<T>> _codec;
private IIdentifier _localIdentifier;
private IDisposable _messageHandlerDisposable;
private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
/// <summary>
/// Create a new NetworkService.
/// </summary>
/// <param name="nsPort">The port that the NetworkService will listen on</param>
/// <param name="messageHandler">The observer to handle incoming messages</param>
/// <param name="idFactory">The factory used to create IIdentifiers</param>
/// <param name="codec">The codec used for serialization</param>
/// <param name="nameClient"></param>
/// <param name="localAddressProvider">The local address provider</param>
/// <param name="remoteManagerFactory">Used to instantiate remote manager instances.</param>
[Inject]
public NetworkService(
[Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort,
IObserver<NsMessage<T>> messageHandler,
IIdentifierFactory idFactory,
ICodec<T> codec,
INameClient nameClient,
ILocalAddressProvider localAddressProvider,
IRemoteManagerFactory remoteManagerFactory)
{
_codec = new NsMessageCodec<T>(codec, idFactory);
IPAddress localAddress = localAddressProvider.LocalAddress;
_remoteManager = remoteManagerFactory.GetInstance(localAddress, nsPort, _codec);
_messageHandler = messageHandler;
NamingClient = nameClient;
_connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
LOGGER.Log(Level.Verbose, "Started network service");
}
/// <summary>
/// Name client for registering ids
/// </summary>
public INameClient NamingClient { get; private set; }
/// <summary>
/// Open a new connection to the remote host registered to
/// the name service with the given identifier
/// </summary>
/// <param name="destinationId">The identifier of the remote host</param>
/// <returns>The IConnection used for communication</returns>
public IConnection<T> NewConnection(IIdentifier destinationId)
{
if (_localIdentifier == null)
{
throw new IllegalStateException("Cannot open connection without first registering an ID");
}
IConnection<T> connection;
if (_connectionMap.TryGetValue(destinationId, out connection))
{
return connection;
}
connection = new NsConnection<T>(_localIdentifier, destinationId,
NamingClient, _remoteManager, _connectionMap);
_connectionMap[destinationId] = connection;
return connection;
}
/// <summary>
/// Register the identifier for the NetworkService with the NameService.
/// </summary>
/// <param name="id">The identifier to register</param>
public void Register(IIdentifier id)
{
LOGGER.Log(Level.Verbose, "Registering id {0} with network service.", id);
_localIdentifier = id;
NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
// Create and register incoming message handler
var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
_messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
LOGGER.Log(Level.Verbose, "End of Registering id {0} with network service.", id);
}
/// <summary>
/// Unregister the identifier for the NetworkService with the NameService.
/// </summary>
public void Unregister()
{
if (_localIdentifier == null)
{
throw new IllegalStateException("Cannot unregister a non existant identifier");
}
NamingClient.Unregister(_localIdentifier.ToString());
_localIdentifier = null;
_messageHandlerDisposable.Dispose();
}
/// <summary>
/// Dispose of the NetworkService's resources
/// </summary>
public void Dispose()
{
NamingClient.Dispose();
_remoteManager.Dispose();
LOGGER.Log(Level.Verbose, "Disposed of network service");
}
}
}