blob: 32f388b7c5dd0b41dd88f39624ac54bc7484c6e8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Net;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Wake.Remote.Impl
{
/// <summary>
/// Manages incoming and outgoing messages between remote hosts.
/// </summary>
public sealed class DefaultRemoteManager<T> : IRemoteManager<T>
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultRemoteManager<T>));
private readonly ObserverContainer<T> _observerContainer;
private readonly TransportServer<IRemoteEvent<T>> _server;
private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
private readonly ICodec<IRemoteEvent<T>> _codec;
private readonly ITcpClientConnectionFactory _tcpClientFactory;
/// <summary>
/// Constructs a DefaultRemoteManager listening on the specified address and any
/// available port.
/// </summary>
/// <param name="localAddress">The address to listen on</param>
/// <param name="port">The port to listen on</param>
/// <param name="codec">The codec used for serializing messages</param>
/// <param name="tcpPortProvider">provides port numbers to listen</param>
/// <param name="tcpClientFactory">provides TcpClient for given endpoint</param>
internal DefaultRemoteManager(IPAddress localAddress,
int port,
ICodec<T> codec,
ITcpPortProvider tcpPortProvider,
ITcpClientConnectionFactory tcpClientFactory)
{
if (localAddress == null)
{
throw new ArgumentNullException("localAddress");
}
if (port < 0)
{
throw new ArgumentException("Listening port must be greater than or equal to zero");
}
if (codec == null)
{
throw new ArgumentNullException("codec");
}
_tcpClientFactory = tcpClientFactory;
_observerContainer = new ObserverContainer<T>();
_codec = new RemoteEventCodec<T>(codec);
_cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
// Begin to listen for incoming messages
_server = new TransportServer<IRemoteEvent<T>>(localEndpoint,
_observerContainer,
_codec,
tcpPortProvider);
_server.Run();
LocalEndpoint = _server.LocalEndpoint;
Identifier = new SocketRemoteIdentifier(LocalEndpoint);
}
/// <summary>
/// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
/// </summary>
/// <param name="localAddressProvider">The local address provider</param>
/// <param name="codec">The codec used for serializing messages</param>
/// <param name="tcpClientFactory">provides TcpClient for given endpoint</param>
internal DefaultRemoteManager(
ILocalAddressProvider localAddressProvider,
ICodec<T> codec,
ITcpClientConnectionFactory tcpClientFactory)
{
using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager"))
{
if (codec == null)
{
throw new ArgumentNullException("codec");
}
_tcpClientFactory = tcpClientFactory;
_observerContainer = new ObserverContainer<T>();
_codec = new RemoteEventCodec<T>(codec);
_cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
LocalEndpoint = new IPEndPoint(localAddressProvider.LocalAddress, 0);
Identifier = new SocketRemoteIdentifier(LocalEndpoint);
}
}
/// <summary>
/// Gets the RemoteIdentifier for the DefaultRemoteManager
/// </summary>
public IRemoteIdentifier Identifier { get; private set; }
/// <summary>
/// Gets the local IPEndPoint for the DefaultRemoteManager
/// </summary>
public IPEndPoint LocalEndpoint { get; private set; }
/// <summary>
/// Returns an IObserver used to send messages to the remote host at
/// the specified IPEndpoint.
/// </summary>
/// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
/// <returns>An IObserver used to send messages to the remote host</returns>
public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
{
if (remoteEndpoint == null)
{
throw new ArgumentNullException("remoteEndpoint");
}
SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
if (id == null)
{
throw new ArgumentException("ID not supported");
}
return GetRemoteObserver(id.Addr);
}
/// <summary>
/// Returns an IObserver used to send messages to the remote host at
/// the specified IPEndpoint.
/// </summary>
/// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
/// <returns>An IObserver used to send messages to the remote host</returns>
public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
{
if (remoteEndpoint == null)
{
throw new ArgumentNullException("remoteEndpoint");
}
ProxyObserver remoteObserver;
if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
{
remoteObserver = CreateRemoteObserver(remoteEndpoint);
_cachedClients[remoteEndpoint] = remoteObserver;
}
return remoteObserver;
}
public IRemoteObserver<T> GetUnmanagedRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
{
if (remoteEndpoint == null)
{
throw new ArgumentNullException("remoteEndpoint");
}
SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
if (id == null)
{
throw new ArgumentException("ID not supported");
}
return GetUnmanagedObserver(id.Addr);
}
public IRemoteObserver<T> GetUnmanagedObserver(IPEndPoint remoteEndpoint)
{
if (remoteEndpoint == null)
{
throw new ArgumentNullException("remoteEndpoint");
}
ProxyObserver remoteObserver = CreateRemoteObserver(remoteEndpoint);
return remoteObserver;
}
private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint)
{
TransportClient<IRemoteEvent<T>> client =
new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer, _tcpClientFactory);
var msg = string.Format("NewClientConnection: Local {0} connected to Remote {1}",
client.Link.LocalEndpoint.ToString(),
client.Link.RemoteEndpoint.ToString());
LOGGER.Log(Level.Info, msg);
ProxyObserver remoteObserver = new ProxyObserver(client);
return remoteObserver;
}
/// <summary>
/// Registers an IObserver used to handle incoming messages from the remote host
/// at the specified IPEndPoint.
/// The IDisposable that is returned can be used to unregister the IObserver.
/// </summary>
/// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
/// <param name="observer">The IObserver to handle incoming messages</param>
/// <returns>An IDisposable used to unregister the observer with</returns>
public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
{
if (remoteEndpoint == null)
{
throw new ArgumentNullException("remoteEndpoint");
}
SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
if (id == null)
{
throw new ArgumentException("ID not supported");
}
return RegisterObserver(id.Addr, observer);
}
/// <summary>
/// Registers an IObserver used to handle incoming messages from the remote host
/// at the specified IPEndPoint.
/// The IDisposable that is returned can be used to unregister the IObserver.
/// </summary>
/// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
/// <param name="observer">The IObserver to handle incoming messages</param>
/// <returns>An IDisposable used to unregister the observer with</returns>
public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
{
if (remoteEndpoint == null)
{
throw new ArgumentNullException("remoteEndpoint");
}
if (observer == null)
{
throw new ArgumentNullException("observer");
}
return _observerContainer.RegisterObserver(remoteEndpoint, observer);
}
/// <summary>
/// Registers an IObserver used to handle incoming messages from the remote host
/// at the specified IPEndPoint.
/// The IDisposable that is returned can be used to unregister the IObserver.
/// </summary>
/// <param name="observer">The IObserver to handle incoming messages</param>
/// <returns>An IDisposable used to unregister the observer with</returns>
public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
return _observerContainer.RegisterObserver(observer);
}
/// <summary>
/// Release all resources for the DefaultRemoteManager.
/// </summary>
public void Dispose()
{
foreach (ProxyObserver cachedClient in _cachedClients.Values)
{
cachedClient.Dispose();
}
if (_server != null)
{
_server.Dispose();
}
}
/// <summary>
/// Observer to send messages to connected remote host
/// </summary>
private class ProxyObserver : IRemoteObserver<T>
{
private readonly TransportClient<IRemoteEvent<T>> _client;
private int _messageCount;
/// <summary>
/// Create new ProxyObserver
/// </summary>
/// <param name="client">The connected transport client used to send
/// messages to remote host</param>
public ProxyObserver(TransportClient<IRemoteEvent<T>> client)
{
_client = client;
_messageCount = 0;
}
/// <summary>
/// Send the message to the remote host
/// </summary>
/// <param name="message">The message to send</param>
public void OnNext(T message)
{
IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message)
{
Sequence = _messageCount
};
_messageCount++;
_client.Send(remoteEvent);
}
/// <summary>
/// Close underlying transport client
/// </summary>
public void Dispose()
{
_client.Dispose();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
}
}