blob: 34bc77f4e7ef0a5ceee17d43072ba3841acff010 [file] [log] [blame]
#region License
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#endregion
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver.Exceptions;
using Gremlin.Net.Process;
namespace Gremlin.Net.Driver
{
internal class ConnectionPool : IDisposable
{
private const int ConnectionIndexOverflowLimit = int.MaxValue - 1000000;
private readonly ConnectionFactory _connectionFactory;
private readonly CopyOnWriteCollection<Connection> _connections = new CopyOnWriteCollection<Connection>();
private readonly int _poolSize;
private readonly int _maxInProcessPerConnection;
private int _connectionIndex;
private int _poolState;
private const int PoolIdle = 0;
private const int PoolPopulationInProgress = 1;
public ConnectionPool(ConnectionFactory connectionFactory, ConnectionPoolSettings settings)
{
_connectionFactory = connectionFactory;
_poolSize = settings.PoolSize;
_maxInProcessPerConnection = settings.MaxInProcessPerConnection;
PopulatePoolAsync().WaitUnwrap();
}
public int NrConnections => _connections.Count;
public async Task<IConnection> GetAvailableConnectionAsync()
{
await EnsurePoolIsPopulatedAsync().ConfigureAwait(false);
return ProxiedConnection(GetConnectionFromPool());
}
private async Task EnsurePoolIsPopulatedAsync()
{
// The pool could have been (partially) empty because of connection problems. So, we need to populate it again.
if (_poolSize <= NrConnections) return;
var poolState = Interlocked.CompareExchange(ref _poolState, PoolPopulationInProgress, PoolIdle);
if (poolState == PoolPopulationInProgress) return;
try
{
await PopulatePoolAsync().ConfigureAwait(false);
}
finally
{
// We need to remove the PoolPopulationInProgress flag again even if an exception occurred, so we don't block the pool population for ever
Interlocked.CompareExchange(ref _poolState, PoolIdle, PoolPopulationInProgress);
}
}
private async Task PopulatePoolAsync()
{
var nrConnectionsToCreate = _poolSize - _connections.Count;
var connectionCreationTasks = new List<Task<Connection>>(nrConnectionsToCreate);
try
{
for (var i = 0; i < nrConnectionsToCreate; i++)
{
connectionCreationTasks.Add(CreateNewConnectionAsync());
}
var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
_connections.AddRange(createdConnections);
}
catch(Exception)
{
// Dispose created connections if the connection establishment failed
foreach (var creationTask in connectionCreationTasks)
{
if (!creationTask.IsFaulted)
creationTask.Result?.Dispose();
}
throw;
}
}
private async Task<Connection> CreateNewConnectionAsync()
{
var newConnection = _connectionFactory.CreateConnection();
await newConnection.ConnectAsync().ConfigureAwait(false);
return newConnection;
}
private Connection GetConnectionFromPool()
{
var connections = _connections.Snapshot;
if (connections.Length == 0) throw new ServerUnavailableException();
return TryGetAvailableConnection(connections);
}
private Connection TryGetAvailableConnection(Connection[] connections)
{
var index = Interlocked.Increment(ref _connectionIndex);
ProtectIndexFromOverflowing(index);
for (var i = 0; i < connections.Length; i++)
{
var connection = connections[(index + i) % connections.Length];
if (connection.NrRequestsInFlight >= _maxInProcessPerConnection) continue;
if (!connection.IsOpen)
{
RemoveConnectionFromPool(connection);
continue;
}
return connection;
}
if (connections.Length > 0)
{
throw new ConnectionPoolBusyException(_poolSize, _maxInProcessPerConnection);
}
else
{
throw new ServerUnavailableException();
}
}
private void ProtectIndexFromOverflowing(int currentIndex)
{
if (currentIndex > ConnectionIndexOverflowLimit)
Interlocked.Exchange(ref _connectionIndex, 0);
}
private void RemoveConnectionFromPool(Connection connection)
{
if (_connections.TryRemove(connection))
DefinitelyDestroyConnection(connection);
}
private IConnection ProxiedConnection(Connection connection)
{
return new ProxyConnection(connection, ReturnConnectionIfOpen);
}
private void ReturnConnectionIfOpen(Connection connection)
{
if (connection.IsOpen) return;
ConsiderUnavailable();
}
private void ConsiderUnavailable()
{
CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
}
private async Task CloseAndRemoveAllConnectionsAsync()
{
foreach (var connection in _connections.RemoveAndGetAll())
{
await connection.CloseAsync().ConfigureAwait(false);
DefinitelyDestroyConnection(connection);
}
}
private void DefinitelyDestroyConnection(Connection connection)
{
connection.Dispose();
}
#region IDisposable Support
private bool _disposed;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
_disposed = true;
}
}
#endregion
}
}