blob: cf882d97e31a30e5367d80860d193c2d060cbf90 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Tests;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;
using DotPulsar.Abstractions;
using Toxiproxy.Net;
using Xunit.Abstractions;
using Xunit.Sdk;
public class IntegrationFixture : IAsyncLifetime
{
private const string AuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
private const string SecretKeyPath = "/pulsar/secret.key";
private const string UserName = "test-user";
private const int PulsarPort = 6650;
private const int ToxiProxyControlPort = 8474;
private const int ToxiProxyPort = 15124;
private readonly CancellationTokenSource _cts;
private readonly IMessageSink _messageSink;
private readonly INetwork _network;
private readonly IContainer _pulsarCluster;
private readonly IContainer _toxiProxy;
private Client _toxiProxyClient;
private Connection _toxiProxyConnection;
private Proxy _toxiProxyPulsarProxy;
private string? _token;
public IntegrationFixture(IMessageSink messageSink)
{
_messageSink = messageSink;
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(10));
var environmentVariables = new Dictionary<string, string>
{
{ "PULSAR_PREFIX_tokenSecretKey", $"file://{SecretKeyPath}" },
{ "PULSAR_PREFIX_authenticationRefreshCheckSeconds", "5" },
{ "superUserRoles", UserName },
{ "authenticationEnabled", "true" },
{ "authorizationEnabled", "true" },
{ "authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderToken" },
{ "authenticateOriginalAuthData", "false" },
{ "brokerClientAuthenticationPlugin", AuthenticationPlugin },
{ "CLIENT_PREFIX_authPlugin", AuthenticationPlugin }
};
var arguments =
$"bin/pulsar tokens create-secret-key --output {SecretKeyPath} && " +
$"export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " +
$"export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && bin/apply-config-from-env.py conf/standalone.conf && " +
$"bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf && bin/pulsar standalone --no-functions-worker";
_network = new NetworkBuilder()
.WithName(Guid.NewGuid().ToString("D"))
.Build();
_toxiProxy = new ContainerBuilder()
.WithImage("ghcr.io/shopify/toxiproxy:2.7.0")
.WithPortBinding(ToxiProxyControlPort, true)
.WithPortBinding(ToxiProxyPort, true)
.WithHostname("toxiproxy")
.WithNetwork(_network)
.WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(strategy => strategy.ForPath("/version").ForPort(ToxiProxyControlPort)))
.Build();
_pulsarCluster = new ContainerBuilder()
.WithImage("apachepulsar/pulsar:3.1.3")
.WithEnvironment(environmentVariables)
.WithHostname("pulsar")
.WithNetwork(_network)
.WithWaitStrategy(Wait.ForUnixContainer().UntilCommandIsCompleted(["/bin/bash", "-c", "bin/pulsar-admin clusters list"]))
.WithCommand("/bin/bash", "-c", arguments)
.Build();
ServiceUrl = new Uri($"pulsar://{_pulsarCluster.Hostname}:{PulsarPort}");
_toxiProxyConnection = new Connection();
_toxiProxyClient = _toxiProxyConnection.Client();
_toxiProxyPulsarProxy = new Proxy();
}
public Uri ServiceUrl { get; private set; }
public IAuthentication Authentication => AuthenticationFactory.Token(ct => ValueTask.FromResult(_token!));
public async Task DisposeAsync()
{
await _pulsarCluster.DisposeAsync();
await _toxiProxy.DisposeAsync();
_toxiProxyConnection.Dispose();
_cts.Dispose();
}
public async Task InitializeAsync()
{
SubscribeToContainerEvents(_toxiProxy, "Toxiproxy");
SubscribeToContainerEvents(_pulsarCluster, "Pulsar cluster");
await _network.CreateAsync(_cts.Token);
_messageSink.OnMessage(new DiagnosticMessage("Starting Toxiproxy"));
await _toxiProxy.StartAsync(_cts.Token);
_messageSink.OnMessage(new DiagnosticMessage("Starting Pulsar Cluster"));
await _pulsarCluster.StartAsync(_cts.Token);
_messageSink.OnMessage(new DiagnosticMessage("The containers has initiated. Next, we'll configure Toxiproxy mappings."));
_toxiProxyConnection = new Connection(_toxiProxy.Hostname, _toxiProxy.GetMappedPublicPort(ToxiProxyControlPort));
_toxiProxyClient = _toxiProxyConnection.Client();
_toxiProxyPulsarProxy = new Proxy
{
Name = "localToPulsarCluster",
Enabled = true,
Listen = $"{"0.0.0.0"}:{ToxiProxyPort}",
Upstream = $"{"pulsar"}:{PulsarPort}"
};
await _toxiProxyClient.AddAsync(_toxiProxyPulsarProxy);
_messageSink.OnMessage(new DiagnosticMessage("Toxiproxy successfully mapped connections between host and the Pulsar Cluster."));
ServiceUrl = new Uri($"pulsar://{_toxiProxy.Hostname}:{_toxiProxy.GetMappedPublicPort(ToxiProxyPort)}");
_messageSink.OnMessage(new DiagnosticMessage("You can connect with: " + ServiceUrl));
_token = await CreateToken(Timeout.InfiniteTimeSpan, _cts.Token);
}
private void HandleClusterStateChange(string containerName, string state) =>
_messageSink.OnMessage(new DiagnosticMessage($"The {containerName} changed state to: {state}"));
private void SubscribeToContainerEvents(IContainer container, string containerName)
{
container.Created += (_, _) => HandleClusterStateChange(containerName, "Created");
container.Creating += (_, _) => HandleClusterStateChange(containerName, "Creating");
container.Started += (_, _) => HandleClusterStateChange(containerName, "Started");
container.Starting += (_, _) => HandleClusterStateChange(containerName, "Starting");
container.Stopped += (_, _) => HandleClusterStateChange(containerName, "Stopped");
container.Stopping += (_, _) => HandleClusterStateChange(containerName, "Stopping");
}
public async Task<string> CreateToken(TimeSpan expiryTime, CancellationToken cancellationToken)
{
var arguments = $"bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}";
if (expiryTime != Timeout.InfiniteTimeSpan)
arguments += $" --expiry-time {expiryTime.TotalSeconds}s";
var result = await _pulsarCluster.ExecAsync(["/bin/bash", "-c", arguments], cancellationToken);
if (result.ExitCode != 0)
throw new InvalidOperationException($"Could not create the token: {result.Stderr}");
return result.Stdout.Trim();
}
private static string CreateTopicName() => $"persistent://public/default/{Guid.NewGuid():N}";
public async Task<string> CreateTopic(CancellationToken cancellationToken)
{
var topic = CreateTopicName();
await CreateTopic(topic, cancellationToken);
return topic;
}
public async Task CreateTopic(string topic, CancellationToken cancellationToken)
{
var arguments = $"bin/pulsar-admin topics create {topic}";
var result = await _pulsarCluster.ExecAsync(["/bin/bash", "-c", arguments], cancellationToken);
if (result.ExitCode != 0)
throw new Exception($"Could not create the topic: {result.Stderr}");
}
public async Task<string> CreatePartitionedTopic(int numberOfPartitions, CancellationToken cancellationToken)
{
var topic = CreateTopicName();
await CreatePartitionedTopic(topic, numberOfPartitions, cancellationToken);
return topic;
}
public async Task CreatePartitionedTopic(string topic, int numberOfPartitions, CancellationToken cancellationToken)
{
var arguments = $"bin/pulsar-admin topics create-partitioned-topic {topic} -p {numberOfPartitions}";
var result = await _pulsarCluster.ExecAsync(["/bin/bash", "-c", arguments], cancellationToken);
if (result.ExitCode != 0)
throw new Exception($"Could not create the partitioned topic: {result.Stderr}");
}
public async Task<IAsyncDisposable> DisableThePulsarConnection()
{
_toxiProxyPulsarProxy.Enabled = false;
await _toxiProxyPulsarProxy.UpdateAsync();
return new Releaser(_toxiProxyPulsarProxy);
}
private class Releaser : IAsyncDisposable
{
private readonly Proxy _proxy;
public Releaser(Proxy proxy) => _proxy = proxy;
public async ValueTask DisposeAsync()
{
_proxy.Enabled = true;
await _proxy.UpdateAsync();
}
}
}