blob: 2c9777e85419c96468e40e4d8b7d011c359bb6c8 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Internal;
using Abstractions;
using Events;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
public sealed class ProcessManager : IRegisterEvent, IAsyncDisposable
{
private readonly ConcurrentDictionary<Guid, IProcess> _processes;
private readonly IConnectionPool _connectionPool;
public ProcessManager(IConnectionPool connectionPool)
{
_processes = new ConcurrentDictionary<Guid, IProcess>();
_connectionPool = connectionPool;
}
public async ValueTask DisposeAsync()
{
foreach (var proc in _processes.Values.ToArray())
await proc.DisposeAsync().ConfigureAwait(false);
await _connectionPool.DisposeAsync().ConfigureAwait(false);
}
public void Add(IProcess process)
=> _processes[process.CorrelationId] = process;
private async void Remove(Guid correlationId)
{
if (_processes.TryRemove(correlationId, out var process))
await process.DisposeAsync().ConfigureAwait(false);
}
public void Register(IEvent e)
{
switch (e)
{
case ConsumerCreated _:
DotPulsarMeter.ConsumerCreated();
break;
case ConsumerDisposed consumerDisposed:
Remove(consumerDisposed.CorrelationId);
DotPulsarMeter.ConsumerDisposed();
break;
case ProducerCreated _:
DotPulsarMeter.ProducerCreated();
break;
case ProducerDisposed producerDisposed:
Remove(producerDisposed.CorrelationId);
DotPulsarMeter.ProducerDisposed();
break;
case ReaderCreated _:
DotPulsarMeter.ReaderCreated();
break;
case ReaderDisposed readerDisposed:
Remove(readerDisposed.CorrelationId);
DotPulsarMeter.ReaderDisposed();
break;
default:
if (_processes.TryGetValue(e.CorrelationId, out var process))
process.Handle(e);
break;
}
}
}