blob: 54399fad422a64ef95da5181742c3d5ff1d127c8 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Tests;
using Ductus.FluentDocker.Builders;
using Ductus.FluentDocker.Services;
using Ductus.FluentDocker.Services.Extensions;
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
public class IntegrationFixture : IAsyncLifetime
{
private const string AuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
private const string SecretKeyPath = "/pulsar/secret.key";
private const string UserName = "test-user";
private const int Port = 6650;
private readonly IContainerService _cluster;
public IntegrationFixture()
{
var environmentVariables = new[]
{
$"PULSAR_PREFIX_tokenSecretKey=file://{SecretKeyPath}",
"PULSAR_PREFIX_authenticationRefreshCheckSeconds=5",
$"superUserRoles={UserName}",
"authenticationEnabled=true",
"authorizationEnabled=true",
"authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken",
"authenticateOriginalAuthData=false",
$"brokerClientAuthenticationPlugin={AuthenticationPlugin}",
$"CLIENT_PREFIX_authPlugin={AuthenticationPlugin}",
};
var arguments = "\"" +
$"bin/pulsar tokens create-secret-key --output {SecretKeyPath} && " +
$"export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " +
"export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && " +
"bin/apply-config-from-env.py conf/standalone.conf && " +
"bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf && " +
"bin/pulsar standalone --no-functions-worker"
+ "\"";
_cluster = new Builder()
.UseContainer()
.UseImage("apachepulsar/pulsar:2.9.2")
.WithEnvironment(environmentVariables)
.ExposePort(Port)
.Command("/bin/bash -c", arguments)
.Build();
ServiceUrl = new Uri("pulsar:://localhost:6650");
}
public Uri ServiceUrl { get; private set; }
public Task DisposeAsync()
{
_cluster.Dispose();
return Task.CompletedTask;
}
public Task InitializeAsync()
{
_cluster.Start();
_cluster.WaitForMessageInLogs("Successfully updated the policies on namespace public/default", int.MaxValue);
var endpoint = _cluster.ToHostExposedEndpoint($"{Port}/tcp");
ServiceUrl = new Uri($"pulsar://localhost:{endpoint.Port}");
return Task.CompletedTask;
}
public string CreateToken(TimeSpan expiryTime)
{
var arguments = $"bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}";
if (expiryTime != Timeout.InfiniteTimeSpan)
arguments += $" --expiry-time {expiryTime.TotalSeconds}s";
var result = _cluster.Execute(arguments);
if (!result.Success)
throw new InvalidOperationException($"Could not create the token: {result.Error}");
return result.Data[0];
}
public void CreatePartitionedTopic(string topic, int numberOfPartitions)
{
var arguments = $"bin/pulsar-admin topics create-partitioned-topic {topic} -p {numberOfPartitions}";
var result = _cluster.Execute(arguments);
if (!result.Success)
throw new Exception($"Could not create the partitioned topic: {result.Error}");
}
}