blob: 21d12a5d7ea3d093fe55efd07b7a03fb61f61eaa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Engine.Exceptions;
using Apache.Qpid.Proton.Test.Driver;
using Apache.Qpid.Proton.Types;
using Apache.Qpid.Proton.Types.Transport;
using NUnit.Framework;
using Is = Apache.Qpid.Proton.Test.Driver.Matchers.Is;
namespace Apache.Qpid.Proton.Engine.Implementation
{
[TestFixture, Timeout(20000)]
public class ProtonSessionTest : ProtonEngineTestSupport
{
[Test]
public void TestSessionEmitsOpenAndCloseEvents()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
bool sessionLocalOpen = false;
bool sessionLocalClose = false;
bool sessionRemoteOpen = false;
bool sessionRemoteClose = false;
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.LocalOpenHandler((result) => sessionLocalOpen = true)
.LocalCloseHandler((result) => sessionLocalClose = true)
.OpenHandler((result) => sessionRemoteOpen = true)
.CloseHandler((result) => sessionRemoteClose = true);
session.Open();
session.Close();
Assert.IsTrue(sessionLocalOpen, "Session should have reported local open");
Assert.IsTrue(sessionLocalClose, "Session should have reported local close");
Assert.IsTrue(sessionRemoteOpen, "Session should have reported remote open");
Assert.IsTrue(sessionRemoteClose, "Session should have reported remote close");
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestEngineShutdownEventNeitherEndClosed()
{
DoTestEngineShutdownEvent(false, false);
}
[Test]
public void TestEngineShutdownEventLocallyClosed()
{
DoTestEngineShutdownEvent(true, false);
}
[Test]
public void TestEngineShutdownEventRemotelyClosed()
{
DoTestEngineShutdownEvent(false, true);
}
[Test]
public void TestEngineShutdownEventBothEndsClosed()
{
DoTestEngineShutdownEvent(true, true);
}
private void DoTestEngineShutdownEvent(bool locallyClosed, bool remotelyClosed)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
bool engineShutdown = false;
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Open();
session.EngineShutdownHandler((result) => engineShutdown = true);
if (locallyClosed)
{
if (remotelyClosed)
{
peer.ExpectEnd().Respond();
}
else
{
peer.ExpectEnd();
}
session.Close();
}
if (remotelyClosed && !locallyClosed)
{
peer.RemoteEnd().Now();
}
engine.Shutdown();
if (locallyClosed && remotelyClosed)
{
Assert.IsFalse(engineShutdown, "Should not have reported engine shutdown");
}
else
{
Assert.IsTrue(engineShutdown, "Should have reported engine shutdown");
}
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionOpenAndCloseAreIdempotent()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
connection.Open();
ISession session = connection.Session();
session.Open();
// Should not emit another begin frame
session.Open();
session.Close();
// Should not emit another end frame
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSenderCreateOnClosedSessionThrowsIOE()
{
TestLinkCreateOnClosedSessionThrowsISE(false);
}
[Test]
public void TestReceiverCreateOnClosedSessionThrowsIOE()
{
TestLinkCreateOnClosedSessionThrowsISE(true);
}
private void TestLinkCreateOnClosedSessionThrowsISE(bool receiver)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
connection.Open();
ISession session = connection.Session().Open().Close();
if (receiver)
{
try
{
session.Receiver("test");
Assert.Fail("Should not allow receiver create on closed session");
}
catch (InvalidOperationException)
{
// Expected
}
}
else
{
try
{
session.Sender("test");
Assert.Fail("Should not allow sender create on closed session");
}
catch (InvalidOperationException)
{
// Expected
}
}
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestOpenSessionBeforeOpenConnection()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
// An opened session shouldn't write its begin until the parent connection
// is opened and once it is the begin should be automatically written.
IConnection connection = engine.Start();
ISession session = connection.Session();
session.Open();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen();
peer.ExpectBegin();
connection.Open();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestEngineEmitsBeginAfterLocalSessionOpened()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
bool RemoteOpened = false;
IConnection connection = engine.Start();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
connection.Open();
ISession session = connection.Session();
session.OpenHandler((result) => RemoteOpened = true);
session.Open();
peer.WaitForScriptToComplete();
Assert.IsTrue(RemoteOpened);
Assert.IsNull(failure);
}
[Test]
public void TestSessionFiresOpenedEventAfterRemoteOpensLocallyOpenedSession()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
bool connectionRemotelyOpened = false;
bool sessionRemotelyOpened = false;
IConnection connection = engine.Start();
connection.OpenHandler((result) => connectionRemotelyOpened = true);
connection.Open();
Assert.IsTrue(connectionRemotelyOpened, "Connection remote opened event did not fire");
ISession session = connection.Session();
session.OpenHandler((result) => sessionRemotelyOpened = true);
session.Open();
Assert.IsTrue(sessionRemotelyOpened, "Session remote opened event did not fire");
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestNoSessionPerformativesEmittedIfConnectionOpenedAndClosedBeforeAnyRemoteResponses()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
// An opened session shouldn't write its begin until the parent connection
// is opened and once it is the begin should be automatically written.
IConnection connection = engine.Start();
ISession session = connection.Session();
session.Open();
peer.ExpectAMQPHeader();
connection.Open();
peer.WaitForScriptToComplete();
connection.Close();
peer.ExpectOpen().Respond();
peer.ExpectClose().Respond();
peer.RemoteHeader(AmqpHeader.GetAMQPHeader().ToArray()).Now();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestOpenAndCloseSessionWithNullSetsOnSessionOptions()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().OnChannel(0).Respond();
peer.ExpectEnd().OnChannel(0).Respond();
peer.ExpectClose();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Properties = null;
session.OfferedCapabilities = (Symbol[])null;
session.DesiredCapabilities = (Symbol[])null;
session.ErrorCondition = null;
session.Open();
Assert.IsNotNull(session.Attachments);
Assert.IsNull(session.Properties);
Assert.IsNull(session.OfferedCapabilities);
Assert.IsNull(session.DesiredCapabilities);
Assert.IsNull(session.ErrorCondition);
Assert.IsNull(session.RemoteProperties);
Assert.IsNull(session.RemoteOfferedCapabilities);
Assert.IsNull(session.RemoteDesiredCapabilities);
Assert.IsNull(session.RemoteErrorCondition);
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestOpenAndCloseMultipleSessions()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().OnChannel(0).Respond();
peer.ExpectBegin().OnChannel(1).Respond();
peer.ExpectEnd().OnChannel(1).Respond();
peer.ExpectEnd().OnChannel(0).Respond();
peer.ExpectClose();
IConnection connection = engine.Start();
connection.Open();
ISession session1 = connection.Session();
session1.Open();
ISession session2 = connection.Session();
session2.Open();
session2.Close();
session1.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestEngineFireRemotelyOpenedSessionEventWhenRemoteBeginArrives()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.RemoteBegin().Queue();
bool connectionRemotelyOpened = false;
bool sessionRemotelyOpened = false;
ISession remoteSession = null;
IConnection connection = engine.Start();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
connection.OpenHandler((result) => connectionRemotelyOpened = true);
connection.SessionOpenedHandler((result) =>
{
remoteSession = result;
sessionRemotelyOpened = true;
});
connection.Open();
Assert.IsTrue(connectionRemotelyOpened, "Connection remote opened event did not fire");
Assert.IsTrue(sessionRemotelyOpened, "Session remote opened event did not fire");
Assert.IsNotNull(remoteSession, "Connection did not create a local session for remote open");
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionPopulatesBeginUsingDefaults()
{
DoTestSessionOpenPopulatesBegin(false, false);
}
[Test]
public void TestSessionPopulatesBeginWithConfiguredMaxFrameSizeButNoIncomingCapacity()
{
DoTestSessionOpenPopulatesBegin(true, false);
}
[Test]
public void TestSessionPopulatesBeginWithConfiguredMaxFrameSizeAndIncomingCapacity()
{
DoTestSessionOpenPopulatesBegin(true, true);
}
private void DoTestSessionOpenPopulatesBegin(bool setMaxFrameSize, bool setIncomingCapacity)
{
uint MAX_FRAME_SIZE = 32767;
uint SESSION_INCOMING_CAPACITY = int.MaxValue;
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
uint expectedMaxFrameSize;
if (setMaxFrameSize)
{
expectedMaxFrameSize = MAX_FRAME_SIZE;
}
else
{
expectedMaxFrameSize = ProtonConstants.DefaultMaxAmqpFrameSize;
}
uint expectedIncomingWindow = int.MaxValue;
if (setIncomingCapacity)
{
expectedIncomingWindow = SESSION_INCOMING_CAPACITY / MAX_FRAME_SIZE;
}
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(expectedMaxFrameSize).Respond().WithContainerId("driver");
peer.ExpectBegin().WithHandleMax(Is.NullValue())
.WithNextOutgoingId(0)
.WithIncomingWindow(expectedIncomingWindow)
.WithOutgoingWindow(int.MaxValue)
.WithOfferedCapabilities(Is.NullValue())
.WithDesiredCapabilities(Is.NullValue())
.WithProperties(Is.NullValue())
.Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
if (setMaxFrameSize)
{
connection.MaxFrameSize = MAX_FRAME_SIZE;
}
connection.Open();
ISession session = connection.Session();
if (setIncomingCapacity)
{
session.IncomingCapacity = SESSION_INCOMING_CAPACITY;
}
session.Open();
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionOpenFailsWhenConnectionClosed()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectClose().Respond();
bool connectionOpenedSignaled = false;
bool connectionClosedSignaled = false;
IConnection connection = engine.Start();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
connection.OpenHandler((result) => connectionOpenedSignaled = true);
connection.CloseHandler((result) => connectionClosedSignaled = true);
ISession session = connection.Session();
connection.Open();
connection.Close();
Assert.IsTrue(connectionOpenedSignaled, "Connection remote opened event did not fire");
Assert.IsTrue(connectionClosedSignaled, "Connection remote closed event did not fire");
try
{
session.Open();
Assert.Fail("Should not be able to open a session when its Connection was already closed");
}
catch (InvalidOperationException) { }
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionOpenFailsWhenConnectionRemotelyClosed()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.RemoteClose().Queue();
bool connectionOpenedSignaled = false;
bool connectionClosedSignaled = false;
IConnection connection = engine.Start();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
connection.OpenHandler((result) => connectionOpenedSignaled = true);
connection.CloseHandler((result) => connectionClosedSignaled = true);
ISession session = connection.Session();
connection.Open();
Assert.IsTrue(connectionOpenedSignaled, "Connection remote opened event did not fire");
Assert.IsTrue(connectionClosedSignaled, "Connection remote closed event did not fire");
try
{
session.Open();
Assert.Fail("Should not be able to open a session when its Connection was already closed");
}
catch (InvalidOperationException) { }
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionOpenFailsWhenWriteOfBeginFailsWithException()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.DropAfterLastHandler();
IConnection connection = engine.Start().Open();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
Assert.IsTrue(connection.ConnectionState == ConnectionState.Active);
Assert.IsTrue(connection.RemoteConnectionState == ConnectionState.Active);
ISession session = connection.Session();
try
{
session.Open();
Assert.Fail("Should not be able to open a session when its Connection was already closed");
}
catch (EngineFailedException)
{
}
peer.WaitForScriptToComplete();
Assert.IsNotNull(failure);
Assert.IsTrue(engine.IsFailed);
Assert.IsFalse(engine.IsShutdown);
Assert.IsNotNull(engine.FailureCause);
}
[Test]
public void TestSessionOpenNotSentUntilConnectionOpened()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
IConnection connection = engine.Start();
ISession session = connection.Session();
session.Open();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectClose();
connection.Open();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionCloseNotSentUntilConnectionOpened()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
bool sessionOpenedSignaled = false;
IConnection connection = engine.Start();
ISession session = connection.Session();
session.OpenHandler((result) => sessionOpenedSignaled = true);
session.Open();
session.Close();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose();
Assert.IsFalse(sessionOpenedSignaled, "Session opened handler should not have been called yet");
connection.Open();
// Session was already closed so no open event should fire.
Assert.IsFalse(sessionOpenedSignaled, "Session opened handler should not have been called yet");
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionRemotelyClosedWithError()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
IConnection connection = engine.Start().Open();
ISession session = connection.Session();
session.Open();
peer.WaitForScriptToComplete();
Assert.IsTrue(session.IsLocallyOpen);
Assert.IsFalse(session.IsLocallyClosed);
Assert.IsTrue(session.IsRemotelyOpen);
Assert.IsFalse(session.IsRemotelyClosed);
peer.ExpectEnd();
peer.ExpectClose();
peer.RemoteEnd().WithErrorCondition(AmqpError.INTERNAL_ERROR.ToString(), "Error").Now();
Assert.IsTrue(session.IsLocallyOpen);
Assert.IsFalse(session.IsLocallyClosed);
Assert.IsFalse(session.IsRemotelyOpen);
Assert.IsTrue(session.IsRemotelyClosed);
Assert.AreEqual(AmqpError.INTERNAL_ERROR, session.RemoteErrorCondition.Condition);
Assert.AreEqual("Error", session.RemoteErrorCondition.Description);
session.Close();
Assert.IsFalse(session.IsLocallyOpen);
Assert.IsTrue(session.IsLocallyClosed);
Assert.IsFalse(session.IsRemotelyOpen);
Assert.IsTrue(session.IsRemotelyClosed);
Assert.AreEqual(AmqpError.INTERNAL_ERROR, session.RemoteErrorCondition.Condition);
Assert.AreEqual("Error", session.RemoteErrorCondition.Description);
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionCloseAfterConnectionRemotelyClosedWhenNoBeginResponseReceived()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
IConnection connection = engine.Start();
ISession session = connection.Session();
session.Open();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin();
peer.RemoteClose().WithErrorCondition(AmqpError.NOT_ALLOWED.ToString(), "Error").Queue();
connection.Open();
peer.WaitForScriptToComplete();
peer.ExpectEnd();
peer.ExpectClose();
// Connection not locally closed so end still written.
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestHandleRemoteBeginWithInvalidRemoteChannelSet()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
bool remoteConnectionOpened = false;
bool remoteSession = false;
IConnection connection = engine.Start();
// Default engine should start and return a connection immediately
Assert.IsNotNull(connection);
connection.OpenHandler((result) => remoteConnectionOpened = true);
connection.Open();
connection.SessionOpenedHandler((session) => remoteSession = true);
peer.WaitForScriptToComplete();
// Simulate asynchronous arrival of data as we always operate on one thread in these tests.
peer.ExpectClose().WithError(Is.NotNullValue());
peer.RemoteBegin().WithRemoteChannel(3).Now();
peer.WaitForScriptToComplete();
Assert.IsTrue(remoteConnectionOpened, "Remote connection should have occurred");
Assert.IsFalse(remoteSession, "Should not have seen a remote session open.");
Assert.IsNotNull(failure);
}
[Test]
public void TestCapabilitiesArePopulatedAndAccessible()
{
Symbol clientOfferedSymbol = Symbol.Lookup("clientOfferedCapability");
Symbol clientDesiredSymbol = Symbol.Lookup("clientDesiredCapability");
Symbol serverOfferedSymbol = Symbol.Lookup("serverOfferedCapability");
Symbol serverDesiredSymbol = Symbol.Lookup("serverDesiredCapability");
Symbol[] clientOfferedCapabilities = new Symbol[] { clientOfferedSymbol };
Symbol[] clientDesiredCapabilities = new Symbol[] { clientDesiredSymbol };
Symbol[] serverOfferedCapabilities = new Symbol[] { serverOfferedSymbol };
Symbol[] serverDesiredCapabilities = new Symbol[] { serverDesiredSymbol };
bool sessionRemotelyOpened = false;
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().WithOfferedCapabilities(new String[] { clientOfferedSymbol.ToString() })
.WithDesiredCapabilities(new String[] { clientDesiredSymbol.ToString() })
.Respond()
.WithDesiredCapabilities(new String[] { serverDesiredSymbol.ToString() })
.WithOfferedCapabilities(new String[] { serverOfferedSymbol.ToString() });
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.DesiredCapabilities = clientDesiredCapabilities;
session.OfferedCapabilities = clientOfferedCapabilities;
session.OpenHandler((result) => sessionRemotelyOpened = true);
session.Open();
Assert.IsTrue(sessionRemotelyOpened, "Session remote opened event did not fire");
Assert.AreEqual(clientOfferedCapabilities, session.OfferedCapabilities);
Assert.AreEqual(clientDesiredCapabilities, session.DesiredCapabilities);
Assert.AreEqual(serverOfferedCapabilities, session.RemoteOfferedCapabilities);
Assert.AreEqual(serverDesiredCapabilities, session.RemoteDesiredCapabilities);
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
[Ignore("Probable issue with test peer validating Dictionary expectations")]
public void TestPropertiesArePopulatedAndAccessible()
{
Symbol clientPropertyName = Symbol.Lookup("ClientPropertyName");
int clientPropertyValue = 1234;
Symbol serverPropertyName = Symbol.Lookup("ServerPropertyName");
int serverPropertyValue = 5678;
Dictionary<string, object> expectedClientProperties = new Dictionary<string, object>();
expectedClientProperties.Add(clientPropertyName.ToString(), clientPropertyValue);
Dictionary<Symbol, object> clientProperties = new Dictionary<Symbol, object>();
clientProperties.Add(clientPropertyName, clientPropertyValue);
Dictionary<string, object> expectedServerProperties = new Dictionary<string, object>();
expectedServerProperties.Add(serverPropertyName.ToString(), serverPropertyValue);
Dictionary<Symbol, object> serverProperties = new Dictionary<Symbol, object>();
serverProperties.Add(serverPropertyName, serverPropertyValue);
bool sessionRemotelyOpened = false;
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().WithProperties(expectedClientProperties)
.Respond()
.WithProperties(expectedServerProperties);
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Properties = clientProperties;
session.OpenHandler((result) => sessionRemotelyOpened = true);
session.Open();
Assert.IsTrue(sessionRemotelyOpened, "Session remote opened event did not fire");
Assert.IsNotNull(session.Properties);
Assert.IsNotNull(session.RemoteProperties);
Assert.AreEqual(clientPropertyValue, session.Properties[clientPropertyName]);
Assert.AreEqual(serverPropertyValue, session.RemoteProperties[serverPropertyName]);
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestEmittedSessionIncomingWindowOnFirstFlowNoFrameSizeOrSessionCapacitySet()
{
DoSessionIncomingWindowTestImpl(false, false);
}
[Test]
public void TestEmittedSessionIncomingWindowOnFirstFlowWithFrameSizeButNoSessionCapacitySet()
{
DoSessionIncomingWindowTestImpl(true, false);
}
[Test]
public void TestEmittedSessionIncomingWindowOnFirstFlowWithNoFrameSizeButWithSessionCapacitySet()
{
DoSessionIncomingWindowTestImpl(false, true);
}
[Test]
public void TestEmittedSessionIncomingWindowOnFirstFlowWithFrameSizeAndSessionCapacitySet()
{
DoSessionIncomingWindowTestImpl(true, true);
}
private void DoSessionIncomingWindowTestImpl(bool setFrameSize, bool setSessionCapacity)
{
uint TEST_MAX_FRAME_SIZE = 5 * 1024;
uint TEST_SESSION_CAPACITY = 100 * 1024;
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
uint expectedMaxFrameSize;
if (setFrameSize)
{
expectedMaxFrameSize = TEST_MAX_FRAME_SIZE;
}
else
{
expectedMaxFrameSize = ProtonConstants.DefaultMaxAmqpFrameSize;
}
uint expectedWindowSize = 2147483647;
if (setSessionCapacity && setFrameSize)
{
expectedWindowSize = TEST_SESSION_CAPACITY / TEST_MAX_FRAME_SIZE;
}
else if (setSessionCapacity)
{
expectedWindowSize = TEST_SESSION_CAPACITY / engine.Connection.MaxFrameSize;
}
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(expectedMaxFrameSize).Respond();
peer.ExpectBegin().WithIncomingWindow(expectedWindowSize).Respond();
peer.ExpectAttach().Respond();
IConnection connection = engine.Start();
if (setFrameSize)
{
connection.MaxFrameSize = TEST_MAX_FRAME_SIZE;
}
connection.Open();
ISession session = connection.Session();
uint sessionCapacity = 0;
if (setSessionCapacity)
{
sessionCapacity = TEST_SESSION_CAPACITY;
session.IncomingCapacity = sessionCapacity;
}
// Open session and verify emitted incoming window
session.Open();
if (setSessionCapacity)
{
Assert.AreEqual(sessionCapacity, session.RemainingIncomingCapacity);
}
else
{
Assert.AreEqual(int.MaxValue, session.RemainingIncomingCapacity);
}
Assert.AreEqual(sessionCapacity, session.IncomingCapacity, "Unexpected session capacity");
// Use a receiver to force more session window observations.
IReceiver receiver = session.Receiver("receiver");
receiver.Open();
uint deliveryArrived = 0;
IIncomingDelivery delivered = null;
receiver.DeliveryReadHandler((delivery) =>
{
deliveryArrived++;
delivered = delivery;
});
// Expect that a flow will be emitted and the window should match either default window
// size or computed value if max frame size and capacity are set
peer.ExpectFlow().WithLinkCredit(1)
.WithIncomingWindow(expectedWindowSize);
peer.RemoteTransfer().WithDeliveryId(0)
.WithDeliveryTag(new byte[] { 0 })
.WithMore(false)
.WithMessageFormat(0)
.WithBody().WithString("test-message").Also().Queue();
receiver.AddCredit(1);
Assert.AreEqual(1, deliveryArrived, "Unexpected delivery count");
Assert.IsNotNull(delivered);
// Flow more credit after receiving a message but not consuming it should result in a decrease in
// the incoming window if the capacity and max frame size are configured.
if (setSessionCapacity && setFrameSize)
{
expectedWindowSize = expectedWindowSize - 1;
Assert.IsTrue(TEST_SESSION_CAPACITY > session.RemainingIncomingCapacity);
}
peer.ExpectFlow().WithLinkCredit(1)
.WithIncomingWindow(expectedWindowSize);
receiver.AddCredit(1);
// Settle the transfer then flow more credit, verify the emitted incoming window
// (it should increase 1 if capacity and frame size set) otherwise remains unchanged.
if (setSessionCapacity && setFrameSize)
{
expectedWindowSize = expectedWindowSize + 1;
}
peer.ExpectFlow().WithLinkCredit(2).WithIncomingWindow(expectedWindowSize);
// This will consume the bytes and free them from the session window tracking.
Assert.IsNotNull(delivered.ReadAll());
receiver.AddCredit(1);
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
receiver.Close();
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionHandlesDeferredOpenAndBeginResponses()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
uint sessionOpened = 0;
uint sessionClosed = 0;
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen();
peer.ExpectBegin();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.OpenHandler((result) => sessionOpened++);
session.CloseHandler((result) => sessionClosed++);
session.Open();
peer.WaitForScriptToComplete();
// This should happen after we inject the held open and attach
peer.ExpectAttach().OfSender().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Inject held responses to get the ball rolling again
peer.RemoteOpen().WithOfferedCapabilities("ANONYMOUS_RELAY").Now();
peer.RespondToLastBegin().Now();
ISender sender = session.Sender("sender-1");
sender.Open();
session.Close();
Assert.AreEqual(1, sessionOpened, "Should get one opened event");
Assert.AreEqual(1, sessionClosed, "Should get one closed event");
connection.Close();
peer.WaitForScriptToComplete();
}
[Test]
public void TestCloseAfterShutdownDoesNotThrowExceptionOpenWrittenAndResponseBeginWrittenAndResponse()
{
TestCloseAfterShutdownNoOutputAndNoException(true, true, true);
}
[Test]
public void TestCloseAfterShutdownDoesNotThrowExceptionOpenWrittenAndResponseBeginWrittenAndNoResponse()
{
TestCloseAfterShutdownNoOutputAndNoException(true, true, false);
}
[Test]
public void TestCloseAfterShutdownDoesNotThrowExceptionOpenWrittenButNoResponse()
{
TestCloseAfterShutdownNoOutputAndNoException(true, false, false);
}
[Test]
public void TestCloseAfterShutdownDoesNotThrowExceptionOpenNotWritten()
{
TestCloseAfterShutdownNoOutputAndNoException(false, false, false);
}
private void TestCloseAfterShutdownNoOutputAndNoException(bool respondToHeader, bool respondToOpen, bool respondToBegin)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
if (respondToHeader)
{
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
if (respondToOpen)
{
peer.ExpectOpen().Respond();
if (respondToBegin)
{
peer.ExpectBegin().Respond();
}
else
{
peer.ExpectBegin();
}
}
else
{
peer.ExpectOpen();
peer.ExpectBegin();
}
}
else
{
peer.ExpectAMQPHeader();
}
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Open();
engine.Shutdown();
// Should clean up and not throw as we knowingly shutdown engine operations.
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestCloseAfterFailureThrowsEngineStateExceptionOpenWrittenAndResponseBeginWrittenAndResponse()
{
TestCloseAfterEngineFailedThrowsAndNoOutputWritten(true, true, true);
}
[Test]
public void TestCloseAfterFailureThrowsEngineStateExceptionOpenWrittenAndResponseBeginWrittenAndNoResponse()
{
TestCloseAfterEngineFailedThrowsAndNoOutputWritten(true, true, false);
}
[Test]
public void TestCloseAfterFailureThrowsEngineStateExceptionOpenWrittenButNoResponse()
{
TestCloseAfterEngineFailedThrowsAndNoOutputWritten(true, false, false);
}
[Test]
public void TestCloseAfterFailureThrowsEngineStateExceptionOpenNotWritten()
{
TestCloseAfterEngineFailedThrowsAndNoOutputWritten(false, false, false);
}
private void TestCloseAfterEngineFailedThrowsAndNoOutputWritten(bool respondToHeader, bool respondToOpen, bool respondToBegin)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
bool engineFailedEvent = false;
if (respondToHeader)
{
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
if (respondToOpen)
{
peer.ExpectOpen().Respond();
if (respondToBegin)
{
peer.ExpectBegin().Respond();
}
else
{
peer.ExpectBegin();
}
peer.ExpectClose();
}
else
{
peer.ExpectOpen();
peer.ExpectBegin();
peer.ExpectClose();
}
}
else
{
peer.ExpectAMQPHeader();
}
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.EngineShutdownHandler((theEvent) => engineFailedEvent = true);
session.Open();
engine.EngineFailed(new IOException());
try
{
session.Close();
Assert.Fail("Should throw exception indicating engine is in a failed state.");
}
catch (EngineFailedException) { }
try
{
connection.Close();
Assert.Fail("Should throw exception indicating engine is in a failed state.");
}
catch (EngineFailedException) { }
Assert.IsFalse(engineFailedEvent, "Session should not have signalled engine failure");
engine.Shutdown(); // Explicit shutdown now allows local close to complete
Assert.IsTrue(engineFailedEvent, "Session should have signalled engine failure");
// Should clean up and not throw as we knowingly shutdown engine operations.
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNotNull(failure);
}
[Test]
public void TestSessionStopTrackingClosedSenders()
{
DoTestSessionTrackingOfSenders(true, true, false, true);
}
[Test]
public void TestSessionStopTrackingDetachedSenders()
{
DoTestSessionTrackingOfSenders(true, true, false, false);
}
[Test]
public void TestSessionStopTrackingClosedSendersRemoteGoesFirst()
{
DoTestSessionTrackingOfSenders(true, true, true, true);
}
[Test]
public void TestSessionStopTrackingDetachedSendersRemoteGoesFirst()
{
DoTestSessionTrackingOfSenders(true, true, true, false);
}
[Test]
public void TestSessionTracksRemotelyOpenSenders()
{
DoTestSessionTrackingOfSenders(true, false, false, false);
}
[Test]
public void TestSessionTracksLocallyOpenSenders()
{
DoTestSessionTrackingOfSenders(false, true, false, false);
}
private void DoTestSessionTrackingOfSenders(bool localDetach, bool RemoteDetach, bool remoteGoesFirst, bool close)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Open();
Assert.AreEqual(0, CountElements(session.Senders));
peer.ExpectAttach().OfSender().Respond();
ISender sender = session.Sender("test").Open();
Assert.AreEqual(1, CountElements(session.Senders));
if (RemoteDetach && remoteGoesFirst)
{
peer.RemoteDetach().WithClosed(close).Now();
}
if (localDetach)
{
peer.ExpectDetach().WithClosed(close);
if (close)
{
sender.Close();
}
else
{
sender.Detach();
}
}
if (RemoteDetach && !remoteGoesFirst)
{
peer.RemoteDetach().WithClosed(close).Now();
}
if (RemoteDetach && localDetach)
{
Assert.AreEqual(0, CountElements(session.Senders));
}
else
{
Assert.AreEqual(1, CountElements(session.Senders));
}
peer.ExpectEnd().Respond();
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionStopTrackingClosedReceivers()
{
DoTestSessionTrackingOfReceivers(true, true, false, true);
}
[Test]
public void TestSessionStopTrackingDetachedReceivers()
{
DoTestSessionTrackingOfReceivers(true, true, false, false);
}
[Test]
public void TestSessionStopTrackingClosedReceiversRemoteGoesFirst()
{
DoTestSessionTrackingOfReceivers(true, true, true, true);
}
[Test]
public void TestSessionStopTrackingDetachedReceiversRemoteGoesFirst()
{
DoTestSessionTrackingOfReceivers(true, true, true, false);
}
[Test]
public void TestSessionTracksRemotelyOpenReceivers()
{
DoTestSessionTrackingOfReceivers(true, false, false, false);
}
[Test]
public void TestSessionTracksLocallyOpenReceivers()
{
DoTestSessionTrackingOfReceivers(false, true, false, false);
}
private void DoTestSessionTrackingOfReceivers(bool localDetach, bool RemoteDetach, bool remoteGoesFirst, bool close)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Open();
Assert.AreEqual(0, CountElements(session.Receivers));
peer.ExpectAttach().OfReceiver().Respond();
IReceiver receiver = session.Receiver("test").Open();
Assert.AreEqual(1, CountElements(session.Receivers));
if (RemoteDetach && remoteGoesFirst)
{
peer.RemoteDetach().WithClosed(close).Now();
}
if (localDetach)
{
peer.ExpectDetach().WithClosed(close);
if (close)
{
receiver.Close();
}
else
{
receiver.Detach();
}
}
if (RemoteDetach && !remoteGoesFirst)
{
peer.RemoteDetach().WithClosed(close).Now();
}
if (RemoteDetach && localDetach)
{
Assert.AreEqual(0, CountElements(session.Receivers));
}
else
{
Assert.AreEqual(1, CountElements(session.Receivers));
}
peer.ExpectEnd().Respond();
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestGetSenderFromSessionByName()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Open();
Assert.AreEqual(0, CountElements(session.Receivers));
ISender link = session.Sender("test").Open();
ISender lookup = session.Sender("test");
Assert.AreSame(link, lookup);
link.Close();
ISender newLink = session.Sender("test");
Assert.AreNotSame(link, newLink);
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestGetReceiverFromSessionByName()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfReceiver().Respond();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Open();
Assert.AreEqual(0, CountElements(session.Receivers));
IReceiver link = session.Receiver("test").Open();
IReceiver lookup = session.Receiver("test");
Assert.AreSame(link, lookup);
link.Close();
IReceiver newLink = session.Receiver("test");
Assert.AreNotSame(link, newLink);
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestCloseOrDetachWithErrorCondition()
{
string condition = "amqp:session:window-violation";
string description = "something bad happened.";
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectEnd().WithError(condition, description).Respond();
peer.ExpectClose();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session().Open();
session.ErrorCondition = new ErrorCondition(Symbol.Lookup(condition), description);
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
[Test]
public void TestSessionNotifiedOfRemoteSenderOpened()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
bool senderRemotelyOpened = false;
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.SenderOpenHandler((result) => senderRemotelyOpened = true);
session.Open();
peer.RemoteAttach().OfReceiver().WithHandle(1)
.WithInitialDeliveryCount(1)
.WithName("remote-sender").Now();
session.Close();
Assert.IsTrue(senderRemotelyOpened, "Session should have reported remote sender open");
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestOpenSenderAndReceiverWithSameLinkNames()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
bool senderRemotelyOpened = false;
bool receiverRemotelyOpened = false;
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().WithHandle(0).WithName("link-name");
peer.ExpectAttach().OfReceiver().WithHandle(1).WithName("link-name");
peer.ExpectEnd().Respond();
IConnection connection = engine.Start().Open();
ISession session = connection.Session().Open();
ISender sender = session.Sender("link-name").Open();
IReceiver receiver = session.Receiver("link-name").Open();
sender.OpenHandler((link) => senderRemotelyOpened = true);
receiver.OpenHandler((link) => receiverRemotelyOpened = true);
peer.RemoteAttach().OfSender().WithHandle(1)
.WithInitialDeliveryCount(1)
.WithName("link-name").Now();
peer.RemoteAttach().OfReceiver().WithHandle(0)
.WithInitialDeliveryCount(1)
.WithName("link-name").Now();
Assert.IsTrue(sender.IsLocallyOpen);
Assert.IsTrue(sender.IsRemotelyOpen);
Assert.IsTrue(receiver.IsLocallyOpen);
Assert.IsTrue(receiver.IsRemotelyOpen);
session.Close();
Assert.IsTrue(senderRemotelyOpened, "Sender should have reported remote sender open");
Assert.IsTrue(receiverRemotelyOpened, "Receiver should have reported remote sender open");
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestBeginAndEndSessionBeforeRemoteBeginArrives()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin();
peer.ExpectEnd();
IConnection connection = engine.Start();
connection.Open();
ISession session = connection.Session();
session.Open();
session.Close();
peer.WaitForScriptToComplete();
peer.RemoteBegin().WithRemoteChannel(0).WithNextOutgoingId(1).Now();
peer.RemoteEnd().Now();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestHalfClosedSessiOnChannelNotImmediatelyRecycled()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().OnChannel(0);
peer.ExpectEnd();
IConnection connection = engine.Start();
connection.Open();
connection.Session().Open().Close();
// Channel 0 should be skipped since we are still waiting for the being / end and
// we have a free slot that can be used instead.
peer.WaitForScriptToComplete();
peer.ExpectBegin().OnChannel(1).Respond();
peer.ExpectEnd().OnChannel(1).Respond();
connection.Session().Open().Close();
// Now channel 1 should reused since it was opened and closed properly
peer.WaitForScriptToComplete();
peer.ExpectBegin().OnChannel(1).Respond();
peer.ExpectBegin().OnChannel(0).Respond();
peer.ExpectEnd().OnChannel(0).Respond();
connection.Session().Open();
// Close the original session now and its slot should be free to be reused.
peer.RemoteBegin().WithRemoteChannel(0).WithNextOutgoingId(1).Now();
peer.RemoteEnd().Now();
connection.Session().Open().Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestHalfClosedSessiOnChannelRecycledIfNoOtherAvailableChannels()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithChannelMax(1).Respond().WithContainerId("driver");
peer.ExpectBegin().OnChannel(0);
peer.ExpectEnd().OnChannel(0);
peer.ExpectBegin().OnChannel(1);
peer.ExpectBegin().OnChannel(0);
IConnection connection = engine.Start();
connection.ChannelMax = 1; // at most two channels
connection.Open();
connection.Session().Open().Close(); // Ch: 0
connection.Session().Open(); // Ch: 1
connection.Session().Open(); // Ch: 0 (recycled)
peer.WaitForScriptToComplete();
// Answer to initial Begin / End of session on Ch: 0
peer.RemoteBegin().WithRemoteChannel(0).OnChannel(1).Now();
peer.RemoteEnd().OnChannel(1).Now();
// Answer to second session which should have begun on Ch: 1
peer.RemoteBegin().WithRemoteChannel(1).OnChannel(0).Now();
// Answer to third session which should have begun on Ch: 0 recycled
peer.RemoteBegin().WithRemoteChannel(0).OnChannel(1).Now();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionEnforcesHandleMaxForLocalSenders()
{
DoTestSessionEnforcesHandleMaxForLocalEndpoints(false);
}
[Test]
public void TestSessionEnforcesHandleMaxForLocalReceivers()
{
DoTestSessionEnforcesHandleMaxForLocalEndpoints(true);
}
private void DoTestSessionEnforcesHandleMaxForLocalEndpoints(bool receiver)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().WithHandleMax(0).Respond();
peer.ExpectAttach().Respond();
peer.ExpectEnd().Respond();
IConnection connection = engine.Start().Open();
ISession session = connection.Session();
session.HandleMax = 0;
session.Open();
Assert.AreEqual(0, session.HandleMax);
if (receiver)
{
session.Receiver("receiver1").Open();
try
{
session.Receiver("receiver2").Open();
Assert.Fail("Should not allow receiver create on session with one handle maximum");
}
catch (InvalidOperationException)
{
// Expected
}
try
{
session.Sender("sender1").Open();
Assert.Fail("Should not allow additional sender create on session with one handle maximum");
}
catch (InvalidOperationException)
{
// Expected
}
}
else
{
session.Sender("sender1").Open();
try
{
session.Sender("sender2").Open();
Assert.Fail("Should not allow second sender create on session with one handle maximum");
}
catch (InvalidOperationException)
{
// Expected
}
try
{
session.Receiver("receiver1").Open();
Assert.Fail("Should not allow additional receiver create on session with one handle maximum");
}
catch (InvalidOperationException)
{
// Expected
}
}
session.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionEnforcesHandleMaxFromRemoteAttachOfSender()
{
DoTestSessionEnforcesHandleMaxFromRemoteAttach(true);
}
[Test]
public void TestSessionEnforcesHandleMaxFromRemoteAttachOfReceiver()
{
DoTestSessionEnforcesHandleMaxFromRemoteAttach(false);
}
public void DoTestSessionEnforcesHandleMaxFromRemoteAttach(bool sender)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().WithHandleMax(0).Respond().WithHandleMax(42);
if (sender)
{
peer.RemoteAttach().OfSender().WithHandle(1).WithName("link-name").Queue();
}
else
{
peer.RemoteAttach().OfReceiver().WithHandle(1).WithName("link-name").Queue();
}
peer.ExpectClose().WithError(ConnectionError.FRAMING_ERROR.ToString(), "Session handle-max exceeded").Respond();
IConnection connection = engine.Start().Open();
// Remote should attempt to attach a link and violate local handle max restrictions
ISession session = connection.Session();
session.HandleMax = 0;
session.Open();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionOutgoingSetEqualToMaxFrameSize()
{
TestSessionConfigureOutgoingCapacity(1024, 1024, 1024);
}
[Test]
public void TestSessionOutgoingSetToTwiceMaxFrameSize()
{
TestSessionConfigureOutgoingCapacity(1024, 2048, 2048);
}
[Test]
public void TestSessionOutgoingSetToSmallerThanMaxFrameSize()
{
TestSessionConfigureOutgoingCapacity(1024, 512, 1024);
}
[Test]
public void TestSessionOutgoingSetToLargerThanMaxFrameSizeAndNotEven()
{
TestSessionConfigureOutgoingCapacity(1024, 8199, 8192);
}
[Test]
public void TestSessionOutgoingSetToZeroToDisableOutAdd()
{
TestSessionConfigureOutgoingCapacity(1024, 0, 0);
}
private void TestSessionConfigureOutgoingCapacity(uint frameSize, uint sessionCapacity, uint remainingCapacity)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(frameSize).Respond();
peer.ExpectBegin().Respond();
IConnection connection = engine.Start();
connection.MaxFrameSize = frameSize;
connection.Open();
ISession session = connection.Session().Open();
peer.WaitForScriptToComplete();
Assert.AreEqual(int.MaxValue, session.RemainingOutgoingCapacity);
session.OutgoingCapacity = sessionCapacity;
Assert.AreEqual(sessionCapacity, session.OutgoingCapacity);
Assert.AreEqual(remainingCapacity, session.RemainingOutgoingCapacity);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSessionNotWritableWhenOutgoingCapacitySetToZeroAlsoReflectsInSenders()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(2).Queue();
IConnection connection = engine.Start().Open();
ISession session = connection.Session().Open();
ISender sender = session.Sender("test").Open();
peer.WaitForScriptToComplete();
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(int.MaxValue, session.RemainingOutgoingCapacity);
session.OutgoingCapacity = 0;
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSenderCannotSendAfterUsingUpOutgoingCapacityLimit()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 2048;
session.Open();
ISender sender = session.Sender("test");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
peer.ExpectTransfer().WithPayload(payload);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(2048, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
IOutgoingDelivery delivery2 = sender.Next();
delivery2.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(2, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSenderGetsUpdatedOnceSessionOutgoingWindowIsExpandedByWriteCallbacks()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 3072;
session.Open();
ISender sender = session.Sender("test");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
peer.ExpectTransfer().WithPayload(payload);
peer.ExpectTransfer().WithPayload(payload);
int creditStateUpdated = 0;
sender.CreditStateUpdateHandler((self) => creditStateUpdated++);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(3072, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
IOutgoingDelivery delivery2 = sender.Next();
delivery2.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
IOutgoingDelivery delivery3 = sender.Next();
delivery3.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(3, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
Assert.AreEqual(1, creditStateUpdated);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(3072, session.RemainingOutgoingCapacity);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSetSameOutgoingWindowAfterBecomingNotWritableDoesNotTriggerWritable()
{
// Should not become writable because two outstanding writes but low water mark remains one frame pending.
TestSessionOutgoingWindowExpandedAfterItBecomeNotWritable(2048, false);
}
[Test]
public void TestExpandingOutgoingWindowAfterBecomingNotWritableUpdateSenderAsWritableOneFrameBigger()
{
// Should not become writable because two outstanding writes but low water mark remains one frame pending.
TestSessionOutgoingWindowExpandedAfterItBecomeNotWritable(3072, false);
}
[Test]
public void TestExpandingOutgoingWindowAfterBecomingNotWritableUpdateSenderAsWritableTwoFramesBuffer()
{
// Should become writable since low water mark was one but becomes two and we have only two pending.
TestSessionOutgoingWindowExpandedAfterItBecomeNotWritable(4096, true);
}
[Test]
public void TestDisableOutgoingWindowingAfterBecomingNotWritableUpdateSenderAsWritable()
{
// Should become pending since we are lifting restrictions
TestSessionOutgoingWindowExpandedAfterItBecomeNotWritable(null, true);
}
private void TestSessionOutgoingWindowExpandedAfterItBecomeNotWritable(uint? updatedWindow, bool becomesWritable)
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
uint maxFrameSize = 1024;
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(maxFrameSize).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = maxFrameSize;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 2048;
session.Open();
ISender sender = session.Sender("test");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
peer.ExpectTransfer().WithPayload(payload);
int creditStateUpdated = 0;
sender.CreditStateUpdateHandler((self) => creditStateUpdated++);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(2048, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
IOutgoingDelivery delivery2 = sender.Next();
delivery2.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(2, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
session.OutgoingCapacity = (uint)(updatedWindow.HasValue ? updatedWindow : int.MaxValue);
if (becomesWritable)
{
Assert.AreEqual(1, creditStateUpdated);
Assert.IsTrue(sender.IsSendable);
}
else
{
Assert.AreEqual(0, creditStateUpdated);
Assert.IsFalse(sender.IsSendable);
}
if (updatedWindow.HasValue)
{
Assert.AreEqual(updatedWindow - (asyncIOCallbacks.Count * maxFrameSize), session.RemainingOutgoingCapacity);
}
else
{
Assert.AreEqual(int.MaxValue, session.RemainingOutgoingCapacity);
}
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestMultiplySendersCannotSendAfterUsingUpOutgoingCapacityLimit()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 2048;
session.Open();
ISender sender1 = session.Sender("test1");
sender1.DeliveryTagGenerator = generator;
sender1.Open();
ISender sender2 = session.Sender("test2");
sender2.DeliveryTagGenerator = generator;
sender2.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
peer.ExpectTransfer().WithPayload(payload);
Assert.IsTrue(sender1.IsSendable);
Assert.IsTrue(sender2.IsSendable);
Assert.AreEqual(2048, session.RemainingOutgoingCapacity);
// Open, Begin, Attach, Attach
Assert.AreEqual(4, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender1.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
IOutgoingDelivery delivery2 = sender2.Next();
delivery2.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(2, asyncIOCallbacks.Count);
Assert.IsFalse(sender1.IsSendable);
Assert.IsFalse(sender2.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestOnlyOneSenderNotifiedOfNewCapacityIfFirstOneUsesItUp()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 2048;
session.Open();
ISender sender1 = session.Sender("test1");
sender1.DeliveryTagGenerator = generator;
sender1.Open();
ISender sender2 = session.Sender("test2");
sender2.DeliveryTagGenerator = generator;
sender2.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
peer.ExpectTransfer().WithPayload(payload);
// One of them should write to the high water mark again and stop the other getting called.
int creditStateUpdated = 0;
sender1.CreditStateUpdateHandler((self) =>
{
creditStateUpdated++;
IOutgoingDelivery delivery = self.Next();
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
});
sender2.CreditStateUpdateHandler((self) =>
{
creditStateUpdated++;
IOutgoingDelivery delivery = self.Next();
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
});
Assert.IsTrue(sender1.IsSendable);
Assert.IsTrue(sender2.IsSendable);
Assert.AreEqual(2048, session.RemainingOutgoingCapacity);
// Open, Begin, Attach, Attach
Assert.AreEqual(4, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender1.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
IOutgoingDelivery delivery2 = sender2.Next();
delivery2.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
Assert.AreEqual(2, asyncIOCallbacks.Count);
Assert.IsFalse(sender1.IsSendable);
Assert.IsFalse(sender2.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
// Free a frame's worth of window which should allow a new write from one sender.
asyncIOCallbacks.Dequeue().Invoke();
Assert.AreEqual(2, asyncIOCallbacks.Count);
Assert.IsFalse(sender1.IsSendable);
Assert.IsFalse(sender2.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
Assert.AreEqual(1, creditStateUpdated);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestReduceOutgoingWindowDoesNotStopSenderIfSomeWindowRemaining()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 4096;
session.Open();
ISender sender = session.Sender("test1");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(4096, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(3072, session.RemainingOutgoingCapacity);
session.OutgoingCapacity = 2048;
Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
Assert.IsTrue(sender.IsSendable);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestDisableOutgoingWindowMarksSenderAsNotSendableWhenWriteStillPending()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 4096;
session.Open();
ISender sender = session.Sender("test1");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(4096, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(3072, session.RemainingOutgoingCapacity);
session.OutgoingCapacity = 0;
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
Assert.IsFalse(sender.IsSendable);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestReduceAndThenIncreaseOutgoingWindowRemembersPreviouslyPendingWrites()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 4096;
session.Open();
ISender sender = session.Sender("test1");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(4096, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery1 = sender.Next();
delivery1.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(3072, session.RemainingOutgoingCapacity);
session.OutgoingCapacity = 1024;
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
Assert.IsFalse(sender.IsSendable);
session.OutgoingCapacity = 4096;
Assert.AreEqual(3072, session.RemainingOutgoingCapacity);
Assert.IsTrue(sender.IsSendable);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSenderNotifiedAfterSessionRemoteWindowOpenedAfterLocalCapacityRestored()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(1).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 1024;
session.Open();
ISender sender = session.Sender("test1");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
// One of them should write to the high water mark again and stop the other getting called.
int creditStateUpdated = 0;
sender.CreditStateUpdateHandler((self) =>
{
creditStateUpdated++;
IOutgoingDelivery delivery = self.Next();
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
});
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery = sender.Next();
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
// Free a frame's worth of window which shouldn't signal writable as still no remote capacity.
asyncIOCallbacks.Dequeue().Invoke();
Assert.AreEqual(0, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
Assert.AreEqual(0, creditStateUpdated);
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
peer.RemoteFlow().WithLinkCredit(19).WithNextIncomingId(1).WithIncomingWindow(1).Now();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
Assert.AreEqual(1, creditStateUpdated);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestSenderNotifiedAfterSessionRemoteWindowOpenedBeforeLocalCapacityRestored()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
Queue<Action> asyncIOCallbacks = new Queue<Action>();
ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
peer.ExpectAttach().Respond();
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(1).Queue();
IConnection connection = engine.Start();
connection.MaxFrameSize = 1024;
connection.Open();
ISession session = connection.Session();
session.OutgoingCapacity = 1024;
session.Open();
ISender sender = session.Sender("test1");
sender.DeliveryTagGenerator = generator;
sender.Open();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
// One of them should write to the high water mark again and stop the other getting called.
int creditStateUpdated = 0;
sender.CreditStateUpdateHandler((self) =>
{
creditStateUpdated++;
if (sender.IsSendable)
{
IOutgoingDelivery delivery = self.Next();
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
}
});
Assert.IsTrue(sender.IsSendable);
Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
// Open, Begin, Attach
Assert.AreEqual(3, asyncIOCallbacks.Count);
foreach (Action action in asyncIOCallbacks)
{
action.Invoke();
}
asyncIOCallbacks.Clear();
IOutgoingDelivery delivery = sender.Next();
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
peer.WaitForScriptToComplete();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
// Restore session remote incoming capacity but the sender should not send since
// there should still be pending I/O work to be signaled.
peer.RemoteFlow().WithLinkCredit(19).WithNextIncomingId(1).WithIncomingWindow(1).Now();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
Assert.AreEqual(1, creditStateUpdated); // For now all flow events create a signal.
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payload);
// Now local outgoing capacity should be opened up.
asyncIOCallbacks.Dequeue().Invoke();
Assert.AreEqual(1, asyncIOCallbacks.Count);
Assert.IsFalse(sender.IsSendable);
Assert.AreEqual(0, session.RemainingOutgoingCapacity);
Assert.AreEqual(2, creditStateUpdated);
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectAttach().WithHandle(0).Respond().WithHandle(0);
peer.ExpectAttach().WithHandle(1).Respond().WithHandle(0);
peer.ExpectEnd().WithError(SessionError.HANDLE_IN_USE.ToString(), "Attach received with handle that is already in use");
IConnection connection = engine.Start().Open();
ISession session = connection.Session().Open();
session.Sender("test1").Open();
session.Sender("test2").Open();
peer.WaitForScriptToComplete();
peer.ExpectClose().Respond();
connection.Close();
Assert.IsNull(failure);
}
[Test]
public void TestEngineFailedWhenSessionReceivesDetachForUnknownLink()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.RemoteDetach().WithHandle(2).OnChannel(0).Queue();
peer.ExpectClose().WithError(Is.NotNullValue());
IConnection connection = engine.Start().Open();
connection.Session().Open();
peer.WaitForScriptToComplete();
Assert.IsNotNull(failure);
Assert.IsTrue(failure is ProtocolViolationException);
}
[Test]
public void TestEngineFailedWhenSessionReceivesTransferForUnknownLink()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((error) => failure = error.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond().WithContainerId("driver");
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfReceiver().Respond();
peer.RemoteDetach().Queue();
peer.RemoteTransfer().WithHandle(0)
.WithDeliveryId(1)
.WithDeliveryTag(new byte[] { 1 })
.OnChannel(0)
.Queue();
peer.ExpectClose().WithError(Is.NotNullValue());
IConnection connection = engine.Start().Open();
ISession session = connection.Session().Open();
session.Receiver("test").Open();
peer.WaitForScriptToComplete();
Assert.IsNotNull(failure);
Assert.IsTrue(failure is ProtocolViolationException);
}
}
}