blob: 231acd2d30fd30a7b7a811db56f40f42459405ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Org.Apache.Rocketmq;
using Proto = Apache.Rocketmq.V2;
namespace tests
{
[TestClass]
public class TransactionTest
{
private const string FakeTag = "fakeTag";
private const string FakeTopic = "fakeTopic";
private const string FakeMsgKey = "fakeMsgKey";
private const string BrokerName = "broker0";
private const string Host = "127.0.0.1";
private const int Port = 8080;
private Producer _producer;
private byte[] _bytes;
[TestInitialize]
public void SetUp()
{
_producer = CreateTestClient();
_bytes = Encoding.UTF8.GetBytes("fakeBytes");
}
[TestMethod]
public void TestTryAddMessage()
{
var transaction = new Transaction(_producer);
var message = CreateMessage();
var publishingMessage = transaction.TryAddMessage(message);
Assert.AreEqual(MessageType.Transaction, publishingMessage.MessageType);
}
[TestMethod]
[ExpectedException(typeof(ArgumentException))]
public void TestTryAddExceededMessages()
{
var transaction = new Transaction(_producer);
var message = CreateMessage();
transaction.TryAddMessage(message);
transaction.TryAddMessage(message);
}
[TestMethod]
public void TestTryAddReceipt()
{
var transaction = new Transaction(_producer);
var message = CreateMessage();
var publishingMessage = transaction.TryAddMessage(message);
var mq0 = CreateMessageQueue();
var sendReceipt = CreateSendReceipt(mq0);
transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
}
[TestMethod]
[ExpectedException(typeof(ArgumentException))]
public void TestTryAddReceiptNotContained()
{
var transaction = new Transaction(_producer);
var message = CreateMessage();
var publishingMessage = new PublishingMessage(message, new PublishingSettings("fakeNamespace",
"fakeClientId", new Endpoints("fakeEndpoints"), new Mock<IRetryPolicy>().Object,
TimeSpan.FromSeconds(10), new ConcurrentDictionary<string, bool>()), true);
var mq0 = CreateMessageQueue();
var sendReceipt = CreateSendReceipt(mq0);
transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
}
[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task TestCommitWithNoReceipts()
{
var transaction = new Transaction(_producer);
await transaction.Commit();
}
[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task TestRollbackWithNoReceipts()
{
var transaction = new Transaction(_producer);
await transaction.Rollback();
}
[TestMethod]
public async Task TestCommit()
{
var transaction = new Transaction(_producer);
var message = CreateMessage();
var publishingMessage = transaction.TryAddMessage(message);
var mq0 = CreateMessageQueue();
var sendReceipt = CreateSendReceipt(mq0);
transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
var mockClientManager = new Mock<IClientManager>();
_producer.SetClientManager(mockClientManager.Object);
SetupCommitOrRollback(mockClientManager, true);
await transaction.Commit();
}
[TestMethod]
public async Task TestRollback()
{
var transaction = new Transaction(_producer);
var message = CreateMessage();
var publishingMessage = transaction.TryAddMessage(message);
var mq0 = CreateMessageQueue();
var sendReceipt = CreateSendReceipt(mq0);
transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
var mockClientManager = new Mock<IClientManager>();
_producer.SetClientManager(mockClientManager.Object);
SetupCommitOrRollback(mockClientManager, false);
await transaction.Rollback();
}
private Producer CreateTestClient()
{
var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
return new Producer(clientConfig, new ConcurrentDictionary<string, bool>(),
1, null);
}
private Message CreateMessage()
{
return new Message.Builder()
.SetTopic(FakeTopic)
.SetBody(_bytes)
.SetTag(FakeTag)
.SetKeys(FakeMsgKey)
.Build();
}
private Proto.MessageQueue CreateMessageQueue()
{
return new Proto.MessageQueue
{
Broker = new Proto.Broker
{
Name = BrokerName,
Endpoints = new Proto.Endpoints
{
Scheme = Proto.AddressScheme.Ipv4,
Addresses = { new Proto.Address { Host = Host, Port = Port } }
}
},
Id = 0,
Permission = Proto.Permission.ReadWrite,
Topic = new Proto.Resource { ResourceNamespace = "foo-bar-namespace", Name = "TestTopic" }
};
}
private IEnumerable<SendReceipt> CreateSendReceipt(Proto.MessageQueue mq0)
{
var metadata = _producer.Sign();
var sendResultEntry = new Proto.SendResultEntry
{
MessageId = "fakeMsgId",
TransactionId = "fakeTxId",
Status = new Proto.Status { Code = Proto.Code.Ok },
Offset = 1
};
var sendMessageResponse = new Proto.SendMessageResponse
{
Status = new Proto.Status { Code = Proto.Code.Ok },
Entries = { sendResultEntry }
};
var invocation = new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(null, sendMessageResponse, metadata);
return SendReceipt.ProcessSendMessageResponse(new MessageQueue(mq0), invocation);
}
private void SetupCommitOrRollback(Mock<IClientManager> mockClientManager, bool commit)
{
var endTransactionMetadata = _producer.Sign();
var endTransactionResponse = new Proto.EndTransactionResponse
{
Status = new Proto.Status { Code = Proto.Code.Ok }
};
var endTransactionInvocation = new RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>(null,
endTransactionResponse, endTransactionMetadata);
mockClientManager.Setup(cm => cm.EndTransaction(It.IsAny<Endpoints>(),
It.IsAny<Proto.EndTransactionRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(endTransactionInvocation));
_producer.State = State.Running;
}
}
}