Merge pull request #28 from Havret/extend_logging
AMQNET-614: Extend logging
diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index f8bf283..a34d343 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -314,10 +314,14 @@
{
session.OnInboundMessage(envelope);
}
+ else
+ {
+ Tracer.Error($"Could not dispatch message {envelope.Message.NMSMessageId} because session {envelope.ConsumerInfo.SessionId} not found.");
+ }
if (connectionListeners.Any())
{
- foreach (INmsConnectionListener listener in connectionListeners)
+ foreach (INmsConnectionListener listener in connectionListeners)
listener.OnInboundMessage(envelope.Message);
}
}
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index e84f458..b4cf301 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -200,6 +200,11 @@
public void OnInboundMessage(InboundMessageDispatch envelope)
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug($"Message {envelope.Message.NMSMessageId} passed to consumer {Info.Id}");
+ }
+
SetAcknowledgeCallback(envelope);
if (envelope.EnqueueFirst)
@@ -215,6 +220,11 @@
private void DeliverNextPending()
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug($"{Info.Id} is about to deliver next pending message.");
+ }
+
if (Session.IsStarted && started && Listener != null)
{
lock (syncRoot)
@@ -225,19 +235,30 @@
{
var envelope = messageQueue.DequeueNoWait();
if (envelope == null)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug($"No message available for delivery.");
+ }
+
return;
+ }
if (IsMessageExpired(envelope))
{
if (Tracer.IsDebugEnabled)
+ {
Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
+ }
DoAckExpired(envelope);
}
else if (IsRedeliveryExceeded(envelope))
{
if (Tracer.IsDebugEnabled)
- Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount}");
+ {
+ Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
+ }
// TODO: Apply redelivery policy
DoAckExpired(envelope);
@@ -320,6 +341,11 @@
while (true)
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Trying to dequeue next message.");
+ }
+
InboundMessageDispatch envelope = messageQueue.Dequeue(timeout);
if (failureCause != null)
@@ -331,7 +357,9 @@
if (IsMessageExpired(envelope))
{
if (Tracer.IsDebugEnabled)
+ {
Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
+ }
DoAckExpired(envelope);
@@ -341,7 +369,9 @@
else if (IsRedeliveryExceeded(envelope))
{
if (Tracer.IsDebugEnabled)
- Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount}");
+ {
+ Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
+ }
// TODO: Apply redelivery policy
DoAckExpired(envelope);
@@ -349,7 +379,9 @@
else
{
if (Tracer.IsDebugEnabled)
- Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
+ {
+ Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
+ }
AckFromReceive(envelope);
return envelope.Message.Copy();
@@ -423,11 +455,23 @@
{
if (closed.CompareAndSet(false, true))
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Shutting down NmsMessageConsumer.");
+ }
+
failureCause = exception;
Session.Remove(this);
started.Set(false);
messageQueue.Dispose();
}
+ else
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("NmsMessageConsumer already closed.");
+ }
+ }
}
public void Start()
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 1b1c0f6..9904098 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -344,6 +344,10 @@
{
messageConsumer.OnInboundMessage(envelope);
}
+ else
+ {
+ Tracer.Error($"Could not dispatch message {envelope.Message.NMSMessageId} because consumer {envelope.ConsumerId} not found.");
+ }
}
public void Acknowledge(AckType ackType)
@@ -422,6 +426,11 @@
internal void EnqueueForDispatch(NmsMessageConsumer.MessageDeliveryTask task)
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Message enqueued for dispatch.");
+ }
+
dispatcher?.Post(task);
}
@@ -451,7 +460,7 @@
public NmsMessageProducer ProducerClosed(Id producerId, Exception cause)
{
- Tracer.Info($"A NMS MessageProducer has been closed: {producerId}");
+ Tracer.Info($"A NmsMessageProducer has been closed: {producerId}. Cause: {cause}");
if (producers.TryGetValue(producerId, out NmsMessageProducer producer))
{
@@ -464,6 +473,14 @@
Tracer.DebugFormat("Ignoring exception thrown during cleanup of closed producer", error);
}
}
+ else
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug($"NmsMessageProducer: {producerId} not found in session {this.SessionInfo.Id}.");
+ }
+
+ }
return producer;
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index fb4fd37..1481248 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -195,6 +195,11 @@
private void OnMessage(IReceiverLink receiver, global::Amqp.Message amqpMessage)
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug($"Received message {amqpMessage?.Properties?.MessageId}");
+ }
+
NmsMessage message;
try
{
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index 24022a8..99330db 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -41,6 +41,66 @@
this.transport = transport;
}
+ static AmqpProvider()
+ {
+ // Set up tracing in AMQP. We capture all AMQP traces in the TraceListener below
+ // and map to NMS 'Tracer' logs as follows:
+ // AMQP Tracer
+ // Verbose Debug
+ // Frame Debug
+ // Information Info
+ // Output Info (should not happen)
+ // Warning Warn
+ // Error Error
+ Trace.TraceLevel = TraceLevel.Verbose;
+ Trace.TraceListener = (level, format, args) =>
+ {
+ switch (level)
+ {
+ case TraceLevel.Verbose:
+ case TraceLevel.Frame:
+ Tracer.DebugFormat(format, args);
+ break;
+ case TraceLevel.Information:
+ case TraceLevel.Output:
+ //
+ // Applications should not access AmqpLite directly so there
+ // should be no 'Output' level logs.
+ Tracer.InfoFormat(format, args);
+ break;
+ case TraceLevel.Warning:
+ Tracer.WarnFormat(format, args);
+ break;
+ case TraceLevel.Error:
+ Tracer.ErrorFormat(format, args);
+ break;
+ default:
+ Tracer.InfoFormat("Unknown AMQP LogLevel: {}", level);
+ Tracer.InfoFormat(format, args);
+ break;
+ }
+ };
+ }
+
+ /// <summary>
+ /// Enables AmqpNetLite's Frame logging level.
+ /// </summary>
+ public bool TraceFrames
+ {
+ get => ((Trace.TraceLevel & TraceLevel.Frame) == TraceLevel.Frame);
+ set
+ {
+ if (value)
+ {
+ Trace.TraceLevel = Trace.TraceLevel | TraceLevel.Frame;
+ }
+ else
+ {
+ Trace.TraceLevel = Trace.TraceLevel & ~TraceLevel.Frame;
+ }
+ }
+ }
+
public long SendTimeout => connectionInfo?.SendTimeout ?? ConnectionInfo.DEFAULT_SEND_TIMEOUT;
public Uri RemoteUri { get; }
public IProviderListener Listener { get; private set; }
@@ -58,6 +118,11 @@
internal void OnConnectionClosed(Error error)
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug($"Connection closed. {error}");
+ }
+
bool connectionExplicitlyClosed = error == null;
if (!connectionExplicitlyClosed)
{
diff --git a/src/NMS.AMQP/Transport/ITransportContext.cs b/src/NMS.AMQP/Transport/ITransportContext.cs
index ad225be..a7afefb 100644
--- a/src/NMS.AMQP/Transport/ITransportContext.cs
+++ b/src/NMS.AMQP/Transport/ITransportContext.cs
@@ -31,13 +31,13 @@
int SendBufferSize { get; set; }
int SendTimeout { get; set; }
+
bool TcpNoDelay { get; set; }
-
- bool UseLogging { get; set; }
+
bool IsSecure { get; }
-
+
ITransportContext Copy();
-
+
Task<Connection> CreateAsync(Address address, IHandler handler);
}
}
diff --git a/src/NMS.AMQP/Transport/TransportContext.cs b/src/NMS.AMQP/Transport/TransportContext.cs
index 229d8e7..d5aead9 100644
--- a/src/NMS.AMQP/Transport/TransportContext.cs
+++ b/src/NMS.AMQP/Transport/TransportContext.cs
@@ -43,49 +43,6 @@
connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
}
- static TransportContext()
- {
- //
- // Set up tracing in AMQP. We capture all AMQP traces in the TraceListener below
- // and map to NMS 'Tracer' logs as follows:
- // AMQP Tracer
- // Verbose Debug
- // Frame Debug
- // Information Info
- // Output Info (should not happen)
- // Warning Warn
- // Error Error
- //
- Amqp.Trace.TraceLevel = Amqp.TraceLevel.Verbose | Amqp.TraceLevel.Frame;
- Amqp.Trace.TraceListener = (level, format, args) =>
- {
- switch (level)
- {
- case Amqp.TraceLevel.Verbose:
- case Amqp.TraceLevel.Frame:
- Tracer.DebugFormat(format, args);
- break;
- case Amqp.TraceLevel.Information:
- case Amqp.TraceLevel.Output:
- //
- // Applications should not access AmqpLite directly so there
- // should be no 'Output' level logs.
- Tracer.InfoFormat(format, args);
- break;
- case Amqp.TraceLevel.Warning:
- Tracer.WarnFormat(format, args);
- break;
- case Amqp.TraceLevel.Error:
- Tracer.ErrorFormat(format, args);
- break;
- default:
- Tracer.InfoFormat("Unknown AMQP LogLevel: {}", level);
- Tracer.InfoFormat(format, args);
- break;
- }
- };
- }
-
#region Transport Options
public int ReceiveBufferSize { get => this.connectionBuilder.TCP.ReceiveBufferSize; set => this.connectionBuilder.TCP.ReceiveBufferSize = value; }
@@ -127,25 +84,6 @@
}
}
}
-
- /// <summary>
- /// UseLogging Enables AmqpNetLite's Frame logging level.
- /// </summary>
- public bool UseLogging
- {
- get => ((Amqp.Trace.TraceLevel & Amqp.TraceLevel.Frame) == Amqp.TraceLevel.Frame);
- set
- {
- if (value)
- {
- Amqp.Trace.TraceLevel = Amqp.Trace.TraceLevel | Amqp.TraceLevel.Frame;
- }
- else
- {
- Amqp.Trace.TraceLevel = Amqp.Trace.TraceLevel & ~Amqp.TraceLevel.Frame;
- }
- }
- }
#endregion
diff --git a/src/NMS.AMQP/Util/IdGenerator.cs b/src/NMS.AMQP/Util/IdGenerator.cs
index 1c91fbf..f3d1c7a 100644
--- a/src/NMS.AMQP/Util/IdGenerator.cs
+++ b/src/NMS.AMQP/Util/IdGenerator.cs
@@ -125,7 +125,6 @@
public void Add(object component)
{
- Tracer.DebugFormat("Adding Component To Id, component {0}, current index: {1}", component, current);
if (isReadOnly)
{
throw new NMSException("Invalid Operation when generating Component Id. Can not change id once generated.");
diff --git a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
index a66cc6b..68c6937 100644
--- a/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
+++ b/test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj
@@ -31,6 +31,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.1.1" />
<PackageReference Include="Moq" Version="4.11.0" />
+ <PackageReference Include="NLog" Version="4.6.7" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit.Console" Version="3.10.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.13.0" />
@@ -44,6 +45,9 @@
<None Include=".\config\cert\client.crt" Link="client.crt">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
+ <None Update="NLog.config">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </None>
<None Update="TestSuite.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
diff --git a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
index 09bdc04..ba09d9c 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
@@ -296,7 +296,7 @@
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
for (int i = 0; i < msgCount; i++)
- Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(3000)));
+ Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(3000)), $"Message {i} not received within given timeout.");
testPeer.WaitForAllMatchersToComplete(3000);
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index d4eaeb1..816dc84 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -24,6 +24,7 @@
using Apache.NMS;
using Apache.NMS.AMQP;
using Moq;
+using NLog;
using NMS.AMQP.Test.TestAmqp;
using NMS.AMQP.Test.TestAmqp.BasicTypes;
using NUnit.Framework;
@@ -33,6 +34,8 @@
[TestFixture]
public class FailoverIntegrationTest : IntegrationTestFixture
{
+ private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
+
[Test, Timeout(20_000), Ignore("Ignore as we cannot detect connection disconnect on Linux.")]
public void TestFailoverHandlesDropThenRejectionCloseAfterConnect()
{
@@ -47,6 +50,10 @@
var originalUri = CreatePeerUri(originalPeer);
var rejectingUri = CreatePeerUri(rejectingPeer);
var finalUri = CreatePeerUri(finalPeer);
+
+ Logger.Info($"Original peer is at: {originalUri}");
+ Logger.Info($"Rejecting peer is at: {rejectingUri}");
+ Logger.Info($"Final peer is at: {finalUri}");
// Connect to the first
originalPeer.ExpectSaslAnonymous();
@@ -85,7 +92,7 @@
finalPeer.ExpectBegin();
// Close the original peer and wait for things to shake out.
- originalPeer.Close(sendClose: false);
+ originalPeer.Close(sendClose: true);
rejectingPeer.WaitForAllMatchersToComplete(2000);
diff --git a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
index cd52602..9a13133 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs
@@ -27,7 +27,7 @@
{
static IntegrationTestFixture()
{
- Tracer.Trace = new Logger(Logger.LogLevel.DEBUG);
+ Tracer.Trace = new NLogAdapter();
}
protected IConnection EstablishConnection(TestAmqpPeer testPeer, string optionsString = null, Symbol[] serverCapabilities = null, Fields serverProperties = null, bool setClientId = true)
diff --git a/test/Apache-NMS-AMQP-Test/NLog.config b/test/Apache-NMS-AMQP-Test/NLog.config
new file mode 100644
index 0000000..71b7c9c
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/NLog.config
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <targets>
+ <target name="logconsole" xsi:type="Console" layout="${time} | ${level} | ${callsite:includeNamespace=false:methodName=false} | ${message}" />
+ </targets>
+
+ <rules>
+ <logger name="*" minlevel="Debug" writeTo="logconsole" />
+ </rules>
+</nlog>
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs
index 3bf5383..ea0de01 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs
@@ -16,6 +16,7 @@
*/
using System;
+using Amqp;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Provider.Amqp;
using NUnit.Framework;
@@ -26,7 +27,7 @@
public class AmqpProviderFactoryTest
{
private uint customMaxHandle = 2048;
-
+
[Test]
public void TestCreateAmqpProvider()
{
@@ -38,21 +39,31 @@
public void TestCreateWithDefaultOptions()
{
AmqpProvider provider = ProviderFactory.Create(new Uri("amqp://localhost:5672")) as AmqpProvider;
-
+
Assert.IsNotNull(provider);
Assert.AreEqual(AmqpProvider.DEFAULT_MAX_HANDLE, provider.MaxHandle);
+ Assert.IsFalse(provider.TraceFrames);
}
[Test]
public void TestCreateWithCustomOptions()
{
- Uri uri = new Uri("amqp://localhost:5672" + "?" +
- "amqp.maxHandle=" + customMaxHandle);
-
+ Uri uri = new Uri("amqp://localhost:5672" +
+ "?amqp.maxHandle=" + customMaxHandle +
+ "&amqp.traceFrames=true");
+
AmqpProvider provider = ProviderFactory.Create(uri) as AmqpProvider;
-
+
Assert.IsNotNull(provider);
Assert.AreEqual(customMaxHandle, provider.MaxHandle);
+ Assert.IsTrue(provider.TraceFrames);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ // clean up trace level as it may interfere with other tests
+ Trace.TraceLevel = 0;
}
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/Logger.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/Logger.cs
deleted file mode 100644
index ce24b3d..0000000
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/Logger.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-using System;
-using Apache.NMS;
-
-namespace NMS.AMQP.Test.TestAmqp
-{
- class Logger : ITrace
- {
- public enum LogLevel
- {
- OFF = -1,
- FATAL,
- ERROR,
- WARN,
- INFO,
- DEBUG
- }
-
- private LogLevel lv;
-
- public void LogException(Exception ex)
- {
- this.Warn("Exception: " + ex.Message);
- }
-
- public Logger() : this(LogLevel.WARN)
- {
- }
-
- public Logger(LogLevel lvl)
- {
- lv = lvl;
- }
-
- public bool IsDebugEnabled
- {
- get { return lv >= LogLevel.DEBUG; }
- }
-
- public bool IsErrorEnabled
- {
- get { return lv >= LogLevel.ERROR; }
- }
-
- public bool IsFatalEnabled
- {
- get { return lv >= LogLevel.FATAL; }
- }
-
- public bool IsInfoEnabled
- {
- get { return lv >= LogLevel.INFO; }
- }
-
- public bool IsWarnEnabled
- {
- get { return lv >= LogLevel.WARN; }
- }
-
- public void Debug(string message)
- {
- if (IsDebugEnabled)
- Console.WriteLine("Debug: {0}", message);
- }
-
- public void Error(string message)
- {
- if (IsErrorEnabled)
- Console.WriteLine("Error: {0}", message);
- }
-
- public void Fatal(string message)
- {
- if (IsFatalEnabled)
- Console.WriteLine("Fatal: {0}", message);
- }
-
- public void Info(string message)
- {
- if (IsInfoEnabled)
- Console.WriteLine("Info: {0}", message);
- }
-
- public void Warn(string message)
- {
- if (IsWarnEnabled)
- Console.WriteLine("Warn: {0}", message);
- }
- }
-}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
new file mode 100644
index 0000000..326be86
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/NLogAdapter.cs
@@ -0,0 +1,30 @@
+using Apache.NMS;
+using NLog;
+
+namespace NMS.AMQP.Test.TestAmqp
+{
+ class NLogAdapter : ITrace
+ {
+ private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
+
+ public bool IsDebugEnabled => Logger.IsDebugEnabled;
+
+ public bool IsErrorEnabled => Logger.IsErrorEnabled;
+
+ public bool IsFatalEnabled => Logger.IsFatalEnabled;
+
+ public bool IsInfoEnabled => Logger.IsInfoEnabled;
+
+ public bool IsWarnEnabled => Logger.IsWarnEnabled;
+
+ public void Debug(string message) => Logger.Debug(message);
+
+ public void Error(string message) => Logger.Error(message);
+
+ public void Fatal(string message) => Logger.Fatal(message);
+
+ public void Info(string message) => Logger.Info(message);
+
+ public void Warn(string message) => Logger.Warn(message);
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 9a519f2..5646215 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -28,6 +28,7 @@
using Amqp.Sasl;
using Amqp.Types;
using Apache.NMS.AMQP.Util;
+using NLog;
using NMS.AMQP.Test.TestAmqp.BasicTypes;
using NMS.AMQP.Test.TestAmqp.Matchers;
using NUnit.Framework;
@@ -36,6 +37,8 @@
{
public class TestAmqpPeer : IDisposable
{
+ private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
+
public static readonly string MESSAGE_NUMBER = "MessageNumber";
private static readonly Symbol ANONYMOUS = new Symbol("ANONYMOUS");
@@ -93,6 +96,8 @@
public bool OnFrame(Stream stream, ushort channel, DescribedList command, Amqp.Message message)
{
+ Logger.Debug($"Received frame, descriptor={command.GetType().Name} on port: {ServerPort}");
+
var matcher = GetFirstMatcher();
if (matcher != null)
{
@@ -327,10 +332,19 @@
message.ApplicationProperties[MESSAGE_NUMBER] = i;
}
+ if (message.Properties == null)
+ message.Properties = new Properties();
+
+ message.Properties.MessageId = $"ID:{i.ToString()}";
+
+ var messageId = message.Properties.MessageId;
+
ByteBuffer payload = message.Encode();
flowMatcher.WithOnComplete(context =>
{
+ Logger.Debug($"Sending message {messageId}");
+
var transfer = new Transfer()
{
DeliveryId = (uint) nextId,
@@ -340,7 +354,15 @@
Handle = context.Command.Handle,
};
- context.SendCommand(transfer, payload);
+ try
+ {
+ context.SendCommand(transfer, payload);
+ }
+ catch (Exception e)
+ {
+ Logger.Error($"Sending message {messageId} failed.");
+ throw;
+ }
});
}
@@ -837,6 +859,8 @@
public void Close(bool sendClose = false)
{
+ Logger.Info($"Closing {nameof(TestAmqpPeer)}: {this.ServerPort}");
+
if (sendClose)
{
var close = new Close();
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeerRunner.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeerRunner.cs
index f0341cd..21f0687 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeerRunner.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeerRunner.cs
@@ -23,12 +23,15 @@
using Amqp;
using Amqp.Framing;
using Amqp.Types;
+using NLog.Fluent;
using NMS.AMQP.Test.TestAmqp.BasicTypes;
namespace NMS.AMQP.Test.TestAmqp
{
public class TestAmqpPeerRunner
{
+ private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();
+
private readonly TestAmqpPeer testAmqpPeer;
private readonly IPEndPoint ip;
private Socket socket;
@@ -123,8 +126,9 @@
}
}
}
- catch
+ catch(Exception e)
{
+ Logger.Info(e);
stream.Dispose();
}
}