blob: 25b9fdcd13bc33406f2bfc5510574304bd003853 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace Org.Apache.REEF.Wake.Remote.Impl
{
/// <summary>
/// Establish connections to TransportServer for remote message passing
/// </summary>
public class TransportClient<T> : IDisposable
{
private readonly ILink<T> _link;
private readonly IObserver<TransportEvent<T>> _observer;
private readonly CancellationTokenSource _cancellationSource;
private bool _disposed;
/// <summary>
/// Construct a TransportClient.
/// Used to send messages to the specified remote endpoint.
/// </summary>
/// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
/// <param name="codec">Codec to decode/encode</param>
/// <param name="clientFactory">TcpClient factory</param>
public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec, ITcpClientConnectionFactory clientFactory)
{
if (remoteEndpoint == null)
{
throw new ArgumentNullException("remoteEndpoint");
}
if (codec == null)
{
throw new ArgumentNullException("codec");
}
if (clientFactory == null)
{
throw new ArgumentNullException("clientFactory");
}
_link = new Link<T>(remoteEndpoint, codec, clientFactory);
_cancellationSource = new CancellationTokenSource();
_disposed = false;
}
/// <summary>
/// Construct a TransportClient.
/// Used to send messages to the specified remote endpoint.
/// </summary>
/// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
/// <param name="codec">Codec to decode/encodec</param>
/// <param name="observer">Callback used when receiving responses from remote host</param>
/// <param name="clientFactory">TcpClient factory</param>
public TransportClient(IPEndPoint remoteEndpoint,
ICodec<T> codec,
IObserver<TransportEvent<T>> observer,
ITcpClientConnectionFactory clientFactory)
: this(remoteEndpoint, codec, clientFactory)
{
_observer = observer;
Task.Run(() => ResponseLoop());
}
/// <summary>
/// Gets the underlying transport link.
/// </summary>
public ILink<T> Link
{
get { return _link; }
}
/// <summary>
/// Send the remote message.
/// </summary>
/// <param name="message">The message to send</param>
public void Send(T message)
{
if (message == null)
{
throw new ArgumentNullException("message");
}
_link.Write(message);
}
/// <summary>
/// Close all opened connections
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
_link.Dispose();
_disposed = true;
}
}
/// <summary>
/// Continually read responses from remote host
/// </summary>
private async Task ResponseLoop()
{
while (!_cancellationSource.IsCancellationRequested)
{
T message = await _link.ReadAsync(_cancellationSource.Token);
if (message == null)
{
break;
}
TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
_observer.OnNext(transportEvent);
}
}
}
}