blob: 255b87a831a8bb257009722f3f0e7de62e1f775c [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Tests.Internal
{
using Abstractions;
using DotPulsar.Internal;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Events;
using NSubstitute;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Xunit;
public class PartitionedProducerProcessTests
{
[Fact]
public void TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()
{
var connectionPool = Substitute.For<IConnectionPool>();
var establishNewChannel = Substitute.For<IEstablishNewChannel>();
var producer = Substitute.For<IProducer>();
var processManager = new ProcessManager(connectionPool);
var producerGuids = new Dictionary<uint, Guid>(3);
var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3);
var partitionedProducerGuid = Guid.NewGuid();
for (uint i = 0; i < 3; i++)
{
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var correlationId = Guid.NewGuid();
var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid);
producerGuids[i] = correlationId;
producersGroup[i] = producer;
processManager.Add(process);
}
var partitionedStateManager =
new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var producerProcess = new ProducerProcess(partitionedProducerGuid, partitionedStateManager, establishNewChannel, new ProcessManager(connectionPool));
processManager.Add(producerProcess);
processManager.Register(new UpdatePartitions(partitionedProducerGuid, (uint) producersGroup.Count));
// Test initial channel
processManager.Register(new ChannelDisconnected(producerGuids[0]));
Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
processManager.Register(new ChannelDisconnected(producerGuids[1]));
Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
processManager.Register(new ChannelDisconnected(producerGuids[2]));
Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
// Test connect
Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
processManager.Register(new ChannelConnected(producerGuids[0]));
Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
processManager.Register(new ChannelConnected(producerGuids[1]));
Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
processManager.Register(new ChannelConnected(producerGuids[2]));
Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
// Test disconnect
processManager.Register(new ChannelDisconnected(producerGuids[1]));
Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
// Test reconnect
processManager.Register(new ChannelConnected(producerGuids[1]));
Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
// Test fault
processManager.Register(new ExecutorFaulted(producerGuids[1]));
Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState);
}
[Fact]
public void TestUpdatePartitions_WhenIncreasePartitions_ThenPartitionedProducerStateChangeCorrectly()
{
var connectionPool = Substitute.For<IConnectionPool>();
var processManager = new ProcessManager(connectionPool);
var establishNewChannel = Substitute.For<IEstablishNewChannel>();
var guid = Guid.NewGuid();
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var process = new ProducerProcess(guid, stateManager, establishNewChannel, new ProcessManager(connectionPool));
processManager.Add(process);
processManager.Register(new UpdatePartitions(guid, 1));
Assert.Equal(ProducerState.Disconnected, stateManager.CurrentState);
processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
processManager.Register(new UpdatePartitions(guid, 2));
Assert.Equal(ProducerState.PartiallyConnected, stateManager.CurrentState);
processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
}
}
}