blob: 6f09fef0d16e2ceaad6cd908289d406e769a162d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using Apache.Qpid.Proton.Test.Driver;
using Apache.Qpid.Proton.Types;
using Apache.Qpid.Proton.Types.Messaging;
using Apache.Qpid.Proton.Types.Transactions;
using Apache.Qpid.Proton.Types.Transport;
using NUnit.Framework;
namespace Apache.Qpid.Proton.Engine.Implementation
{
[TestFixture, Timeout(20000)]
public class ProtonTransactionLinkTest : ProtonEngineTestSupport
{
private Symbol[] DEFAULT_OUTCOMES = new Symbol[] { Accepted.DescriptorSymbol,
Rejected.DescriptorSymbol,
Released.DescriptorSymbol,
Modified.DescriptorSymbol };
private String[] DEFAULT_OUTCOMES_STRINGS = new String[] { Accepted.DescriptorSymbol.ToString(),
Rejected.DescriptorSymbol.ToString(),
Released.DescriptorSymbol.ToString(),
Modified.DescriptorSymbol.ToString() };
[Test]
public void TestCreateDefaultCoordinatorSender()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((result) => failure = result.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
Coordinator coordinator = new Coordinator();
Source source = new Source();
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectCoordinatorAttach().Respond();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
IConnection connection = engine.Start().Open();
ISession session = connection.Session().Open();
ISender sender = session.Sender("test-coordinator");
sender.Source = source;
sender.Coordinator = coordinator;
sender.Open();
sender.Detach();
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestCreateCoordinatorSender()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((result) => failure = result.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
Coordinator coordinator = new Coordinator();
coordinator.Capabilities = new Symbol[] { TxnCapability.LOCAL_TXN };
Source source = new Source();
source.Outcomes = DEFAULT_OUTCOMES;
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().WithSource().WithOutcomes(DEFAULT_OUTCOMES_STRINGS).And()
.WithCoordinator().WithCapabilities(TxnCapability.LOCAL_TXN.ToString()).And().Respond();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
IConnection connection = engine.Start().Open();
ISession session = connection.Session().Open();
ISender sender = session.Sender("test-coordinator");
sender.Source = source;
sender.Coordinator = coordinator;
bool openedWithCoordinatorTarget = false;
sender.OpenHandler((result) =>
{
if (result.RemoteTerminus is Coordinator)
{
openedWithCoordinatorTarget = true;
}
});
sender.Open();
Assert.IsTrue(openedWithCoordinatorTarget);
Coordinator remoteCoordinator = (Coordinator)sender.RemoteTerminus;
Assert.AreEqual(TxnCapability.LOCAL_TXN, remoteCoordinator.Capabilities[0]);
sender.Detach();
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
[Test]
public void TestRemoteCoordinatorTriggersSenderCreateWhenManagerHandlerNotSet()
{
IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
engine.ErrorHandler((result) => failure = result.FailureCause);
ProtonTestConnector peer = CreateTestPeer(engine);
peer.ExpectAMQPHeader().RespondWithAMQPHeader();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.RemoteAttach().WithName("TXN-Link")
.WithHandle(0)
.WithRole(Role.Sender.ToBoolean())
.WithSource().WithOutcomes(DEFAULT_OUTCOMES_STRINGS).And()
.WithInitialDeliveryCount(0)
.WithCoordinator().WithCapabilities(TxnCapability.LOCAL_TXN.ToString()).And().Queue();
IConnection connection = engine.Start().Open();
ISession session = connection.Session();
IReceiver transactionReceiver = null;
session.ReceiverOpenHandler(txnReceiver =>
{
transactionReceiver = txnReceiver;
});
session.Open();
peer.WaitForScriptToComplete();
peer.ExpectAttach().OfReceiver()
.WithSource().WithOutcomes(DEFAULT_OUTCOMES_STRINGS).And()
.WithCoordinator().WithCapabilities(TxnCapability.LOCAL_TXN.ToString());
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
IReceiver manager = transactionReceiver;
Assert.IsNotNull(transactionReceiver);
Assert.IsNotNull(transactionReceiver.RemoteTerminus);
Assert.AreEqual(TxnCapability.LOCAL_TXN, ((Coordinator)manager.RemoteTerminus).Capabilities[0]);
manager.Coordinator = ((Coordinator)manager.RemoteTerminus).Copy();
manager.Source = manager.RemoteSource.Copy();
manager.Open();
manager.Close();
session.Close();
connection.Close();
peer.WaitForScriptToComplete();
Assert.IsNull(failure);
}
}
}