blob: 8fdfa861dc3617d4e05bfc97ac5d4d913f559b7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Apache.Rocketmq.V2;
using Grpc.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Org.Apache.Rocketmq;
using Endpoints = Org.Apache.Rocketmq.Endpoints;
using Proto = Apache.Rocketmq.V2;
namespace tests
{
[TestClass]
public class ClientTest
{
[TestMethod]
public async Task TestOnVerifyMessageCommand()
{
var testClient = CreateTestClient();
var endpoints = new Endpoints("testEndpoints");
var command = new VerifyMessageCommand { Nonce = "testNonce" };
var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
new MockClientStreamWriter<TelemetryCommand>(),
new MockAsyncStreamReader<TelemetryCommand>(),
null,
null,
null,
null);
var mockClientManager = new Mock<IClientManager>();
mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
testClient.SetClientManager(mockClientManager.Object);
testClient.OnVerifyMessageCommand(endpoints, command);
mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once);
}
[TestMethod]
public async Task TestOnTopicRouteDataFetchedFailure()
{
var testClient = CreateTestClient();
var endpoints = new Endpoints("testEndpoints");
var mq = new Proto.MessageQueue
{
Topic = new Proto::Resource
{
ResourceNamespace = "testNamespace",
Name = "testTopic"
},
Id = 0,
Permission = Proto.Permission.ReadWrite,
Broker = new Proto::Broker
{
Name = "testBroker",
Id = 0,
Endpoints = new Proto.Endpoints
{
Scheme = Proto.AddressScheme.Ipv4,
Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
}
}
};
var topicRouteData = new TopicRouteData(new[] { mq });
var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
new MockClientStreamWriter<TelemetryCommand>(),
new MockAsyncStreamReader<TelemetryCommand>(),
null,
null,
null,
null);
var mockClientManager = new Mock<IClientManager>();
mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
testClient.SetClientManager(mockClientManager.Object);
try
{
await testClient.OnTopicRouteDataFetched("testTopic", topicRouteData);
Assert.Fail();
}
catch (Exception e)
{
mockClientManager.Verify(cm => cm.Telemetry(It.IsAny<Endpoints>()), Times.Once);
}
}
[TestMethod]
public async Task TestOnPrintThreadStackTraceCommand()
{
var testClient = CreateTestClient();
var endpoints = new Endpoints("testEndpoints");
var command = new PrintThreadStackTraceCommand { Nonce = "testNonce" };
var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
new MockClientStreamWriter<TelemetryCommand>(),
new MockAsyncStreamReader<TelemetryCommand>(),
null,
null,
null,
null);
var mockClientManager = new Mock<IClientManager>();
mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
testClient.SetClientManager(mockClientManager.Object);
// Act
testClient.OnPrintThreadStackTraceCommand(endpoints, command);
// Assert
mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once);
}
private Client CreateTestClient()
{
return new Producer(new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(),
new ConcurrentDictionary<string, bool>(), 1, null);
}
private class MockClientStreamWriter<T> : IClientStreamWriter<T>
{
public Task WriteAsync(T message)
{
// Simulate async operation
return Task.CompletedTask;
}
public WriteOptions WriteOptions { get; set; }
public Task CompleteAsync()
{
throw new NotImplementedException();
}
}
private class MockAsyncStreamReader<T> : IAsyncStreamReader<T>
{
public Task<bool> MoveNext(CancellationToken cancellationToken)
{
throw new System.NotImplementedException();
}
public T Current => throw new NotImplementedException();
}
}
}