blob: b8ea64b355bfab17cb61a29a5feb3d65458c584e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Org.Apache.Rocketmq;
using Org.Apache.Rocketmq.Error;
using Proto = Apache.Rocketmq.V2;
namespace tests
{
[TestClass]
public class SimpleConsumerTest
{
// Helper method to mock the client manager and assert exceptions
private async Task MockAndAssertAckException<TException>(SimpleConsumer consumer, MessageView messageView, Proto.Code code) where TException : Exception
{
var mockClientManager = new Mock<IClientManager>();
consumer.SetClientManager(mockClientManager.Object);
var metadata = consumer.Sign();
var response = new Proto.AckMessageResponse
{
Status = new Proto.Status { Code = code }
};
var invocation = new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(null, response, metadata);
mockClientManager.Setup(cm =>
cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()))
.Returns(Task.FromResult(invocation));
try
{
await consumer.Ack(messageView);
}
catch (Exception e)
{
Assert.IsInstanceOfType(e, typeof(TException));
}
}
[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task TestReceiveWithoutStart()
{
var consumer = CreateSimpleConsumer();
await consumer.Receive(16, TimeSpan.FromSeconds(15));
}
[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task TestAckWithoutStart()
{
var consumer = CreateSimpleConsumer();
var messageView = MessageView.FromProtobuf(CreateMessage());
await consumer.Ack(messageView);
}
[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task TestSubscribeWithoutStart()
{
var consumer = CreateSimpleConsumer();
await consumer.Subscribe("testTopic", new FilterExpression("*"));
}
[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public void TestUnsubscribeWithoutStart()
{
var consumer = CreateSimpleConsumer();
consumer.Unsubscribe("testTopic");
}
[TestMethod]
[ExpectedException(typeof(InternalErrorException))]
public async Task TestReceiveWithZeroMaxMessageNum()
{
var consumer = CreateSimpleConsumer();
consumer.State = State.Running;
await consumer.Receive(0, TimeSpan.FromSeconds(15));
}
[TestMethod]
public async Task TestAck()
{
var consumer = CreateSimpleConsumer();
consumer.State = State.Running;
var messageView = CreateMessageView();
await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.BadRequest);
await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.IllegalTopic);
await MockAndAssertAckException<BadRequestException>(consumer, messageView,
Proto.Code.IllegalConsumerGroup);
await MockAndAssertAckException<BadRequestException>(consumer, messageView,
Proto.Code.InvalidReceiptHandle);
await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.ClientIdRequired);
await MockAndAssertAckException<UnauthorizedException>(consumer, messageView, Proto.Code.Unauthorized);
await MockAndAssertAckException<ForbiddenException>(consumer, messageView, Proto.Code.Forbidden);
await MockAndAssertAckException<NotFoundException>(consumer, messageView, Proto.Code.NotFound);
await MockAndAssertAckException<NotFoundException>(consumer, messageView, Proto.Code.TopicNotFound);
await MockAndAssertAckException<TooManyRequestsException>(consumer, messageView,
Proto.Code.TooManyRequests);
await MockAndAssertAckException<InternalErrorException>(consumer, messageView, Proto.Code.InternalError);
await MockAndAssertAckException<InternalErrorException>(consumer, messageView,
Proto.Code.InternalServerError);
await MockAndAssertAckException<ProxyTimeoutException>(consumer, messageView, Proto.Code.ProxyTimeout);
await MockAndAssertAckException<UnsupportedException>(consumer, messageView, Proto.Code.Unsupported);
}
[TestMethod]
public async Task TestChangeInvisibleDuration()
{
var consumer = CreateSimpleConsumer();
consumer.State = State.Running;
var messageView = CreateMessageView();
var invisibleDuration = TimeSpan.FromSeconds(3);
var mockClientManager = new Mock<IClientManager>();
consumer.SetClientManager(mockClientManager.Object);
var metadata = consumer.Sign();
var response = new Proto.ChangeInvisibleDurationResponse
{
Status = new Proto.Status { Code = Proto.Code.Ok }
};
var invocation =
new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(null,
response, metadata);
mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny<Endpoints>(),
It.IsAny<Proto.ChangeInvisibleDurationRequest>(), It.IsAny<TimeSpan>()))
.Returns(Task.FromResult(invocation));
await consumer.ChangeInvisibleDuration(messageView, invisibleDuration);
await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
invisibleDuration, Proto.Code.BadRequest);
await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
invisibleDuration, Proto.Code.IllegalTopic);
await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
invisibleDuration, Proto.Code.IllegalConsumerGroup);
await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
invisibleDuration, Proto.Code.IllegalInvisibleTime);
await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
invisibleDuration, Proto.Code.InvalidReceiptHandle);
await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
invisibleDuration, Proto.Code.ClientIdRequired);
await MockAndAssertChangeInvisibleDurationException<UnauthorizedException>(consumer, messageView,
invisibleDuration, Proto.Code.Unauthorized);
await MockAndAssertChangeInvisibleDurationException<NotFoundException>(consumer, messageView,
invisibleDuration, Proto.Code.NotFound);
await MockAndAssertChangeInvisibleDurationException<NotFoundException>(consumer, messageView,
invisibleDuration, Proto.Code.TopicNotFound);
await MockAndAssertChangeInvisibleDurationException<TooManyRequestsException>(consumer, messageView,
invisibleDuration, Proto.Code.TooManyRequests);
await MockAndAssertChangeInvisibleDurationException<InternalErrorException>(consumer, messageView,
invisibleDuration, Proto.Code.InternalError);
await MockAndAssertChangeInvisibleDurationException<InternalErrorException>(consumer, messageView,
invisibleDuration, Proto.Code.InternalServerError);
await MockAndAssertChangeInvisibleDurationException<ProxyTimeoutException>(consumer, messageView,
invisibleDuration, Proto.Code.ProxyTimeout);
await MockAndAssertChangeInvisibleDurationException<UnsupportedException>(consumer, messageView,
invisibleDuration, Proto.Code.Unsupported);
}
private async Task MockAndAssertChangeInvisibleDurationException<TException>(SimpleConsumer consumer,
MessageView messageView, TimeSpan invisibleDuration, Proto.Code code) where TException : Exception
{
var mockClientManager = new Mock<IClientManager>();
consumer.SetClientManager(mockClientManager.Object);
var metadata = consumer.Sign();
var response = new Proto.ChangeInvisibleDurationResponse
{
Status = new Proto.Status { Code = code }
};
var invocation =
new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(null,
response, metadata);
mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny<Endpoints>(),
It.IsAny<Proto.ChangeInvisibleDurationRequest>(), It.IsAny<TimeSpan>()))
.Returns(Task.FromResult(invocation));
try
{
await consumer.ChangeInvisibleDuration(messageView, invisibleDuration);
}
catch (Exception e)
{
Assert.IsInstanceOfType(e, typeof(TException));
}
}
private SimpleConsumer CreateSimpleConsumer()
{
var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
var subscription = new Dictionary<string, FilterExpression> { { "testTopic", new FilterExpression("*") } };
var consumer =
new SimpleConsumer(clientConfig, "testConsumerGroup", TimeSpan.FromSeconds(15), subscription);
return consumer;
}
private Proto.Message CreateMessage()
{
var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
var systemProperties = new Proto.SystemProperties
{
MessageType = Proto.MessageType.Normal,
MessageId = MessageIdGenerator.GetInstance().Next(),
BornHost = "127.0.0.1",
BodyDigest = digest,
BornTimestamp = new Timestamp()
};
var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
var message = new Proto.Message
{
SystemProperties = systemProperties,
Topic = new Proto.Resource { Name = "testTopic" },
Body = body
};
return message;
}
private MessageView CreateMessageView()
{
var message = CreateMessage();
var messageQueue = new MessageQueue(new Proto.MessageQueue
{
Broker = new Proto.Broker
{
Name = "broker0",
Endpoints = new Proto.Endpoints
{
Scheme = Proto.AddressScheme.Ipv4,
Addresses =
{
new Proto.Address
{
Host = "127.0.0.1",
Port = 8080
}
}
}
},
Id = 0,
Permission = Proto.Permission.ReadWrite,
Topic = new Proto.Resource { ResourceNamespace = "testNamespace", Name = "testTopic" }
});
return MessageView.FromProtobuf(message, messageQueue);
}
}
}