blob: a1ecf82d52b46a0861ff95d302eabf5aebe494c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using grpc = Grpc.Core;
using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
[TestClass]
public class RpcClientTest
{
[TestMethod]
public async Task testTelemetry()
{
Console.WriteLine("Test Telemetry streaming");
string target = "https://11.166.42.94:8081";
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
Signature.sign(client_config, metadata);
var cmd = new rmq::TelemetryCommand();
cmd.Settings = new rmq::Settings();
cmd.Settings.ClientType = rmq::ClientType.Producer;
cmd.Settings.AccessPoint = new rmq::Endpoints();
cmd.Settings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
var address = new rmq::Address();
address.Port = 8081;
address.Host = "11.166.42.94";
cmd.Settings.AccessPoint.Addresses.Add(address);
cmd.Settings.RequestTimeout = new Google.Protobuf.WellKnownTypes.Duration();
cmd.Settings.RequestTimeout.Seconds = 3;
cmd.Settings.RequestTimeout.Nanos = 0;
cmd.Settings.Publishing = new rmq::Publishing();
var topic = new rmq::Resource();
topic.Name = "cpp_sdk_standard";
cmd.Settings.Publishing.Topics.Add(topic);
cmd.Settings.UserAgent = new rmq::UA();
cmd.Settings.UserAgent.Language = rmq::Language.DotNet;
cmd.Settings.UserAgent.Version = "1.0";
cmd.Settings.UserAgent.Hostname = System.Net.Dns.GetHostName();
cmd.Settings.UserAgent.Platform = System.Environment.OSVersion.ToString();
var duplexStreaming = rpc_client.Telemetry(metadata);
var reader = duplexStreaming.ResponseStream;
var writer = duplexStreaming.RequestStream;
var cts = new CancellationTokenSource();
await writer.WriteAsync(cmd);
Console.WriteLine("Command written");
if (await reader.MoveNext(cts.Token))
{
var response = reader.Current;
switch (response.CommandCase)
{
case rmq::TelemetryCommand.CommandOneofCase.Settings:
{
var responded_settings = response.Settings;
Console.WriteLine($"{responded_settings.ToString()}");
break;
}
case rmq::TelemetryCommand.CommandOneofCase.None:
{
Console.WriteLine($"Unknown response command type: {response.Status.ToString()}");
break;
}
}
Console.WriteLine("Server responded ");
}
else
{
Console.WriteLine("Server is not responding");
var status = duplexStreaming.GetStatus();
Console.WriteLine($"status={status.ToString()}");
var trailers = duplexStreaming.GetTrailers();
Console.WriteLine($"trailers={trailers.ToString()}");
}
}
[TestMethod]
public void testQueryRoute()
{
string target = "https://11.166.42.94:8081";
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
Signature.sign(client_config, metadata);
var request = new rmq::QueryRouteRequest();
request.Topic = new rmq::Resource();
request.Topic.Name = "cpp_sdk_standard";
request.Endpoints = new rmq::Endpoints();
request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
var address = new rmq::Address();
address.Port = 8081;
address.Host = "11.166.42.94";
request.Endpoints.Addresses.Add(address);
var response = rpc_client.QueryRoute(metadata, request, client_config.RequestTimeout);
var result = response.GetAwaiter().GetResult();
}
[TestMethod]
public async Task TestSend()
{
string target = "https://11.166.42.94:8081";
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
Signature.sign(client_config, metadata);
var request = new rmq::SendMessageRequest();
var message = new rmq::Message();
message.Topic = new rmq::Resource();
message.Topic.Name = "cpp_sdk_standard";
message.Body = Google.Protobuf.ByteString.CopyFromUtf8("Test Body");
message.SystemProperties = new rmq::SystemProperties();
message.SystemProperties.Tag = "TagA";
message.SystemProperties.MessageId = "abc";
request.Messages.Add(message);
var response = await rpc_client.SendMessage(metadata, request, TimeSpan.FromSeconds(3));
Assert.AreEqual(rmq::Code.Ok, response.Status.Code);
}
}
}