blob: 706fac40f06352795f2f44cc252871b9d536b9e0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Org.Apache.REEF.Utilities.AsyncUtils;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.StreamingCodec;
using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Wake.Remote.Impl
{
/// <summary>
/// Server to handle incoming remote messages.
/// </summary>
/// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
internal sealed class StreamingTransportServer<T> : IDisposable
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(StreamingTransportServer<>));
private TcpListener _listener;
private readonly CancellationTokenSource _cancellationSource;
private readonly IObserver<TransportEvent<T>> _remoteObserver;
private readonly ITcpPortProvider _tcpPortProvider;
private readonly IStreamingCodec<T> _streamingCodec;
private bool _disposed;
private Task _serverTask;
/// <summary>
/// Constructs a TransportServer to listen for remote events.
/// Listens on the specified remote endpoint. When it receives a remote
/// event, it will invoke the specified remote handler.
/// </summary>
/// <param name="address">Endpoint address to listen on</param>
/// <param name="remoteHandler">The handler to invoke when receiving incoming
/// remote messages</param>
/// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
/// <param name="streamingCodec">Streaming codec</param>
internal StreamingTransportServer(
IPAddress address,
IObserver<TransportEvent<T>> remoteHandler,
ITcpPortProvider tcpPortProvider,
IStreamingCodec<T> streamingCodec)
{
_listener = new TcpListener(address, 0);
_remoteObserver = remoteHandler;
_tcpPortProvider = tcpPortProvider;
_cancellationSource = new CancellationTokenSource();
_cancellationSource.Token.ThrowIfCancellationRequested();
_streamingCodec = streamingCodec;
_disposed = false;
}
/// <summary>
/// Returns the listening endpoint for the TransportServer
/// </summary>
public IPEndPoint LocalEndpoint
{
get { return _listener.LocalEndpoint as IPEndPoint; }
}
/// <summary>
/// Starts listening for incoming remote messages.
/// </summary>
public void Run()
{
FindAPortAndStartListener();
_serverTask = Task.Factory.StartNew(() => StartServer(), TaskCreationOptions.LongRunning);
}
private void FindAPortAndStartListener()
{
var foundAPort = false;
var exception = new SocketException((int)SocketError.AddressAlreadyInUse);
for (var enumerator = _tcpPortProvider.GetEnumerator();
!foundAPort && enumerator.MoveNext();)
{
_listener = new TcpListener(LocalEndpoint.Address, enumerator.Current);
try
{
_listener.Start();
foundAPort = true;
}
catch (SocketException e)
{
exception = e;
}
}
if (!foundAPort)
{
Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER);
}
LOGGER.Log(Level.Info,
string.Format("Listening on {0}", _listener.LocalEndpoint.ToString()));
}
/// <summary>
/// Close the TransportServer and all open connections
/// </summary>
public void Dispose()
{
if (!_disposed)
{
_cancellationSource.Cancel();
try
{
_listener.Stop();
}
catch (SocketException)
{
LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
}
if (_serverTask != null)
{
// Give the TransportServer Task 500ms to shut down, ignore any timeout errors
try
{
CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
_serverTask.Wait(serverDisposeTimeout.Token);
_serverTask.Dispose();
}
catch (Exception e)
{
Exceptions.Caught(e, Level.Warning, LOGGER);
}
}
}
_disposed = true;
}
/// <summary>
/// Helper method to start TransportServer. This will
/// be run in an asynchronous Task.
/// </summary>
/// <returns>An asynchronous Task for the running server.</returns>
private async Task StartServer()
{
try
{
while (!_cancellationSource.Token.IsCancellationRequested)
{
TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
ProcessClient(client).LogAndIgnoreExceptionIfAny(
LOGGER, "Task Exception observed processing client in StreamingTransportServer.", Level.Warning);
}
}
catch (InvalidOperationException)
{
LOGGER.Log(Level.Info, "StreamingTransportServer has been closed.");
}
catch (OperationCanceledException)
{
LOGGER.Log(Level.Info, "StreamingTransportServer has been closed.");
}
}
/// <summary>
/// Receives event from connected TcpClient and invokes handler on the event.
/// </summary>
/// <param name="client">The connected client</param>
private async Task ProcessClient(TcpClient client)
{
// Keep reading messages from client until they disconnect or timeout
CancellationToken token = _cancellationSource.Token;
using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec))
{
while (!token.IsCancellationRequested)
{
T message = await link.ReadAsync(token);
if (message == null)
{
break;
}
TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
_remoteObserver.OnNext(transportEvent);
}
LOGGER.Log(Level.Error,
"ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested);
}
}
}
}