Add more client transport impl and improve logging
diff --git a/src/Proton.Client/Client/Implementation/ClientConnection.cs b/src/Proton.Client/Client/Implementation/ClientConnection.cs
index 6e58785..2c05ebc 100644
--- a/src/Proton.Client/Client/Implementation/ClientConnection.cs
+++ b/src/Proton.Client/Client/Implementation/ClientConnection.cs
@@ -24,12 +24,16 @@
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Client.Utilities;
using Apache.Qpid.Proton.Engine.Sasl.Client;
+using Apache.Qpid.Proton.Client.Transport;
+using Apache.Qpid.Proton.Logging;
namespace Apache.Qpid.Proton.Client.Implementation
{
// TODO
public class ClientConnection : IConnection
{
+ private static IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientConnection>();
+
private const int UNLIMITED = -1;
private const int UNDEFINED = -1;
@@ -42,9 +46,11 @@
private readonly ClientConnectionCapabilities capabilities = new ClientConnectionCapabilities();
private readonly TaskCompletionSource<IConnection> openFuture = new TaskCompletionSource<IConnection>();
private readonly TaskCompletionSource<IConnection> closeFuture = new TaskCompletionSource<IConnection>();
+ private readonly IOContext ioContext;
private Engine.IEngine engine;
private Engine.IConnection protonConnection;
+ private ITransport transport;
private AtomicReference<Exception> failureCause = new AtomicReference<Exception>();
private ClientSession connectionSession;
private ClientSender connectionSender;
@@ -58,6 +64,7 @@
this.options = options;
this.connectionId = client.NextConnectionId();
this.sessionBuilder = new ClientSessionBuilder(this);
+ this.ioContext = new IOContext(options.TransportOptions, options.SslOptions);
reconnectPool.Add(new ReconnectLocation(host, port));
reconnectPool.AddAll(options.ReconnectOptions.ReconnectLocations);
@@ -102,7 +109,7 @@
{
try
{
- CloseAsync(error).Wait();
+ CloseAsync(error).GetAwaiter().GetResult();
}
catch (Exception)
{
@@ -409,7 +416,7 @@
failureCause.CompareAndSet(null, ClientExceptionSupport.CreateOrPassthroughFatal(ex));
_ = openFuture.TrySetException(failureCause);
_ = closeFuture.TrySetResult(this);
- // TODO ioContext.shutdown();
+ ioContext.Shutdown();
throw failureCause;
}
@@ -439,12 +446,13 @@
internal void Execute(Action action)
{
- // TODO Add task to work list
+ ioContext.EventLoop.Execute(action);
}
internal void Schedule(Action action, TimeSpan delay)
{
- // TODO Add task to work list
+ // TODO: Either add scheduling to event loop or handle timeouts here somehow
+ Task.Delay(delay).ContinueWith((t) => ioContext.EventLoop.Execute(action));
}
internal TaskCompletionSource<T> Request<T>(Object requestor, TaskCompletionSource<T> request)
@@ -528,12 +536,12 @@
if (totalConnections == 1)
{
- // TODO LOG.info("Connection {} connected to server: {}:{}", getId(), transport.getHost(), transport.getPort());
+ LOG.Info("Connection {0} connected to server: {1}", ConnectionId, transport.EndPoint);
// TODO SubmitConnectionEvent(options.connectedHandler(), transport.getHost(), transport.getPort(), null);
}
else
{
- // TODO LOG.info("Connection {} reconnected to server: {}:{}", getId(), transport.getHost(), transport.getPort());
+ LOG.Info("Connection {0} reconnected to server: {1}", ConnectionId, transport.EndPoint);
// TODO SubmitConnectionEvent(options.reconnectedHandler(), transport.getHost(), transport.getPort(), null);
}
@@ -552,9 +560,9 @@
{
connection.Engine.Shutdown();
}
- catch (Exception)
+ catch (Exception ignore)
{
- //LOG.debug("Unexpected exception thrown from engine shutdown: ", ignore);
+ LOG.Debug("Unexpected exception thrown from engine shutdown: {0}", ignore.Message);
}
}
else
@@ -597,7 +605,7 @@
try
{
- // TODO transport.close();
+ transport.Close();
}
catch (Exception)
{
@@ -627,11 +635,11 @@
failureCause = new ClientConnectionRemotelyClosedException("Unknown error led to connection disconnect");
}
- // TODO LOG.trace("Engine reports failure with error: {}", failureCause.getMessage());
+ LOG.Trace("Engine reports failure with error: {}", failureCause.Message);
if (IsReconnectAllowed(failureCause))
{
- // TODO LOG.info("Connection {} interrupted to server: {}:{}", getId(), transport.getHost(), transport.getPort());
+ LOG.Info("Connection {0} interrupted to server: {1}", ConnectionId, transport.EndPoint);
// TODO SubmitDisconnectionEvent(options.interruptedHandler(), transport.getHost(), transport.getPort(), failureCause);
// Initial configuration validation happens here, if this step fails then the
@@ -679,7 +687,7 @@
engine.Configuration.TraceFrames = true;
if (!engine.Configuration.TraceFrames)
{
- // TODO LOG.warn("Connection {} frame tracing was enabled but protocol engine does not support it", getId());
+ LOG.Warn("Connection {0} frame tracing was enabled but protocol engine does not support it", ConnectionId);
}
}
@@ -786,10 +794,10 @@
_ = openFuture.TrySetException(failureCause);
_ = closeFuture.TrySetResult(this);
- // TODO
- // LOG.warn("Connection {} has failed due to: {}", ConnectionId, failureCause != null ?
- // failureCause.GetType().Name + " -> " + failureCause.Message : "No failure details provided.");
+ LOG.Warn("Connection {0} has failed due to: {1}", ConnectionId, failureCause != null ?
+ failureCause.Get().GetType().Name + " -> " + failureCause.Get().Message : "No failure details provided.");
+ // TODO
// SubmitDisconnectionEvent(options.DisconnectedHandler, transport.Host, transport.Port, failureCause);
}
@@ -802,10 +810,16 @@
try
{
reconnectAttempts++;
+ transport = ioContext.NewTransport();
+ LOG.Trace("Connection {0} Attempting connection to remote {1}", ConnectionId, location.Host, location.Port);
+
// TODO
- //transport = ioContext.newTransport();
- //LOG.trace("Connection {} Attempting connection to remote {}:{}", getId(), location.getHost(), location.getPort());
- // transport.connect(location.getHost(), location.getPort(), new ClientTransportListener(engine));
+ // transport.TransportConnectedHandler(HandleTransportConnected);
+ // transport.TransportConnectFailedHandler(HandleConnectFailed);
+ // transport.TransportDisconnectedHandler(HandleTransportDisconnected);
+ // transport.TransportReadHandler(HandleTransportRead);
+
+ transport.Connect(location.Host, location.Port);
}
catch (Exception error)
{
@@ -819,7 +833,7 @@
int warnInterval = options.ReconnectOptions.WarnAfterReconnectAttempts;
if (reconnectAttempts > 0 && warnInterval > 0 && (reconnectAttempts % warnInterval) == 0)
{
- // TODO LOG.warn("Connection {}: Failed to connect after: {} attempt(s) continuing to retry.", getId(), reconnectAttempts);
+ LOG.Warn("Connection {0}: Failed to connect after: {1} attempt(s) continuing to retry.", ConnectionId, reconnectAttempts);
}
// If no connection recovery required then we have never fully connected to a remote
@@ -829,25 +843,25 @@
{
if (reconnectAttempts == 0)
{
- // TODO LOG.trace("Initial connect attempt will be performed immediately");
+ LOG.Trace("Initial connect attempt will be performed immediately");
// executor.execute(()->attemptConnection(location));
}
else
{
long delay = NextReconnectDelay();
- // TODO LOG.trace("Next connect attempt will be in {} milliseconds", delay);
+ LOG.Trace("Next connect attempt will be in {0} milliseconds", delay);
// executor.schedule(()->attemptConnection(location), delay, TimeUnit.MILLISECONDS);
}
}
else if (reconnectAttempts == 0)
{
- // TODO LOG.trace("Initial reconnect attempt will be performed immediately");
+ LOG.Trace("Initial reconnect attempt will be performed immediately");
// executor.execute(()->attemptConnection(location));
}
else
{
long delay = NextReconnectDelay();
- // TODO LOG.trace("Next reconnect attempt will be in {} milliseconds", delay);
+ LOG.Trace("Next reconnect attempt will be in {0} milliseconds", delay);
// executor.schedule(()->attemptConnection(location), delay, TimeUnit.MILLISECONDS);
}
}
@@ -962,7 +976,7 @@
public string Password => options.Password;
- public IPrincipal LocalPrincipal => throw new NotImplementedException(); // TODO
+ public IPrincipal LocalPrincipal => connection.transport?.LocalPrincipal;
}
diff --git a/src/Proton.Client/Client/Implementation/ClientSession.cs b/src/Proton.Client/Client/Implementation/ClientSession.cs
index 075cc57..49af498 100644
--- a/src/Proton.Client/Client/Implementation/ClientSession.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSession.cs
@@ -21,6 +21,7 @@
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Client.Utilities;
+using Apache.Qpid.Proton.Logging;
namespace Apache.Qpid.Proton.Client.Implementation
{
@@ -30,6 +31,8 @@
/// </summary>
public class ClientSession : ISession
{
+ private static IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientSession>();
+
private static readonly IClientTransactionContext NoOpTransactionContext = new ClientNoOpTransactionContext();
private const long INFINITE = -1;
@@ -292,7 +295,7 @@
}
});
- return connection.Request(this, rollbackFuture).Task.Result;
+ return connection.Request(this, rollbackFuture).Task.GetAwaiter().GetResult();
}
#region Internal client session API
@@ -525,7 +528,7 @@
private void HandleRemoteOpen(Engine.ISession session)
{
openFuture.SetResult(this);
- // LOG.trace("Session:{} opened successfully.", id());
+ LOG.Trace("Session:{0} opened successfully.", SessionId);
foreach (Engine.ISender sender in protonSession.Senders)
{
diff --git a/src/Proton.Client/Client/Transport/IOContext.cs b/src/Proton.Client/Client/Transport/IOContext.cs
new file mode 100644
index 0000000..1042ea2
--- /dev/null
+++ b/src/Proton.Client/Client/Transport/IOContext.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.Qpid.Proton.Client.Concurrent;
+using Apache.Qpid.Proton.Utilities;
+
+namespace Apache.Qpid.Proton.Client.Transport
+{
+ /// <summary>
+ /// Represents an IO context used by clients to commicate with remote
+ /// services and provides a single threaded event loop used to manage
+ /// IO based work and connection related services.
+ /// </summary>
+ public sealed class IOContext
+ {
+ private static readonly int SHUTDOWN_TIMEOUT = 50;
+
+ private readonly IEventLoop eventLoop;
+ private readonly TransportOptions transportOptions;
+ private readonly SslOptions sslOptions;
+
+ public IOContext(TransportOptions transportOptions, SslOptions sslOptions)
+ {
+ Statics.RequireNonNull(transportOptions, "Transport Options cannot be null");
+ Statics.RequireNonNull(sslOptions, "Transport SSL Options cannot be null");
+
+ this.eventLoop = new DefaultEventLoop();
+ this.transportOptions = transportOptions;
+ this.sslOptions = sslOptions;
+ }
+
+ /// <summary>
+ /// Provides access to the event loop used to process all IO related
+ /// work done within a client instance.
+ /// </summary>
+ public IEventLoop EventLoop => eventLoop;
+
+ public void Shutdown()
+ {
+ eventLoop.Shutdown(); // TODO graceful shutdown with quiesce
+ }
+
+ public ITransport NewTransport()
+ {
+ if (eventLoop.IsShutdown || eventLoop.IsTerminated)
+ {
+ throw new InvalidOperationException("Cannot create new transport when context is shutdown.");
+ }
+
+ // TODO - WebSockets
+
+ return new TcpTransport(transportOptions, sslOptions, eventLoop);
+ }
+ }
+}
diff --git a/src/Proton.Client/Client/Transport/ITransport.cs b/src/Proton.Client/Client/Transport/ITransport.cs
index 0e0e3b3..7c95dfd 100644
--- a/src/Proton.Client/Client/Transport/ITransport.cs
+++ b/src/Proton.Client/Client/Transport/ITransport.cs
@@ -17,6 +17,7 @@
using System;
using System.Net;
+using System.Security.Principal;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Concurrent;
@@ -42,6 +43,13 @@
EndPoint EndPoint { get; }
/// <summary>
+ /// Returns a local principal that was created following successful SSL
+ /// handshaking with the remote. Before a handshake or in the case of non-SSL
+ /// transport types this method returns null.
+ /// </summary>
+ IPrincipal LocalPrincipal { get; }
+
+ /// <summary>
/// Initiates an orderly close of the transport.
/// </summary>
void Close();
diff --git a/src/Proton.Client/Client/Transport/TcpTransport.cs b/src/Proton.Client/Client/Transport/TcpTransport.cs
index ce808f2..bc00488 100644
--- a/src/Proton.Client/Client/Transport/TcpTransport.cs
+++ b/src/Proton.Client/Client/Transport/TcpTransport.cs
@@ -19,6 +19,7 @@
using System.IO;
using System.Net;
using System.Net.Sockets;
+using System.Security.Principal;
using System.Threading.Channels;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Buffer;
@@ -65,6 +66,8 @@
public EndPoint EndPoint => channelEndpoint;
+ public IPrincipal LocalPrincipal => null; // TODO
+
#endregion
public TcpTransport(TransportOptions options, SslOptions sslOptions, IEventLoop eventLoop)
diff --git a/src/Proton.TestPeer/Network/PeerTcpServer.cs b/src/Proton.TestPeer/Network/PeerTcpServer.cs
index 9877177..7d891c5 100644
--- a/src/Proton.TestPeer/Network/PeerTcpServer.cs
+++ b/src/Proton.TestPeer/Network/PeerTcpServer.cs
@@ -36,8 +36,6 @@
private ILoggerFactory loggerFactory;
private ILogger<PeerTcpServer> logger;
-
-
public PeerTcpServer(in ILoggerFactory loggerFactory)
{
this.loggerFactory = loggerFactory;
@@ -65,6 +63,9 @@
}
serverListener.Listen(1);
+
+ logger.LogInformation("Peer TCP Server listen started on endpoint: {0}", serverListener.LocalEndPoint);
+
serverListener.BeginAccept(new AsyncCallback(NewTcpClientConnection), this);
return ((IPEndPoint)serverListener.LocalEndPoint).Port;
diff --git a/test/Proton.Client.Tests/UnitTest1.cs b/test/Proton.Client.Tests/UnitTest1.cs
deleted file mode 100644
index 65df964..0000000
--- a/test/Proton.Client.Tests/UnitTest1.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using NUnit.Framework;
-
-namespace Proton.Client.Tests
-{
- public class Tests
- {
- [SetUp]
- public void Setup()
- {
- }
-
- [Test]
- public void Test1()
- {
- Assert.Pass();
- }
- }
-}
\ No newline at end of file