| #region License |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #endregion |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Threading; |
| using System.Threading.Tasks; |
| using Gremlin.Net.Driver.Exceptions; |
| using Gremlin.Net.Process; |
| |
| namespace Gremlin.Net.Driver |
| { |
| internal class ConnectionPool : IDisposable |
| { |
| private const int ConnectionIndexOverflowLimit = int.MaxValue - 1000000; |
| |
| private readonly ConnectionFactory _connectionFactory; |
| private readonly CopyOnWriteCollection<Connection> _connections = new CopyOnWriteCollection<Connection>(); |
| private readonly int _poolSize; |
| private readonly int _maxInProcessPerConnection; |
| private int _connectionIndex; |
| private int _poolState; |
| private const int PoolIdle = 0; |
| private const int PoolPopulationInProgress = 1; |
| |
| public ConnectionPool(ConnectionFactory connectionFactory, ConnectionPoolSettings settings) |
| { |
| _connectionFactory = connectionFactory; |
| _poolSize = settings.PoolSize; |
| _maxInProcessPerConnection = settings.MaxInProcessPerConnection; |
| PopulatePoolAsync().WaitUnwrap(); |
| } |
| |
| public int NrConnections => _connections.Count; |
| |
| public async Task<IConnection> GetAvailableConnectionAsync() |
| { |
| await EnsurePoolIsPopulatedAsync().ConfigureAwait(false); |
| return ProxiedConnection(GetConnectionFromPool()); |
| } |
| |
| private async Task EnsurePoolIsPopulatedAsync() |
| { |
| // The pool could have been (partially) empty because of connection problems. So, we need to populate it again. |
| if (_poolSize <= NrConnections) return; |
| var poolState = Interlocked.CompareExchange(ref _poolState, PoolPopulationInProgress, PoolIdle); |
| if (poolState == PoolPopulationInProgress) return; |
| try |
| { |
| await PopulatePoolAsync().ConfigureAwait(false); |
| } |
| finally |
| { |
| // We need to remove the PoolPopulationInProgress flag again even if an exception occurred, so we don't block the pool population for ever |
| Interlocked.CompareExchange(ref _poolState, PoolIdle, PoolPopulationInProgress); |
| } |
| } |
| |
| private async Task PopulatePoolAsync() |
| { |
| var nrConnectionsToCreate = _poolSize - _connections.Count; |
| var connectionCreationTasks = new List<Task<Connection>>(nrConnectionsToCreate); |
| try |
| { |
| for (var i = 0; i < nrConnectionsToCreate; i++) |
| { |
| connectionCreationTasks.Add(CreateNewConnectionAsync()); |
| } |
| var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false); |
| _connections.AddRange(createdConnections); |
| } |
| catch(Exception) |
| { |
| // Dispose created connections if the connection establishment failed |
| foreach (var creationTask in connectionCreationTasks) |
| { |
| if (!creationTask.IsFaulted) |
| creationTask.Result?.Dispose(); |
| } |
| throw; |
| } |
| } |
| |
| private async Task<Connection> CreateNewConnectionAsync() |
| { |
| var newConnection = _connectionFactory.CreateConnection(); |
| await newConnection.ConnectAsync().ConfigureAwait(false); |
| return newConnection; |
| } |
| |
| private Connection GetConnectionFromPool() |
| { |
| var connections = _connections.Snapshot; |
| if (connections.Length == 0) throw new ServerUnavailableException(); |
| return TryGetAvailableConnection(connections); |
| } |
| |
| private Connection TryGetAvailableConnection(Connection[] connections) |
| { |
| var index = Interlocked.Increment(ref _connectionIndex); |
| ProtectIndexFromOverflowing(index); |
| |
| for (var i = 0; i < connections.Length; i++) |
| { |
| var connection = connections[(index + i) % connections.Length]; |
| if (connection.NrRequestsInFlight >= _maxInProcessPerConnection) continue; |
| if (!connection.IsOpen) |
| { |
| RemoveConnectionFromPool(connection); |
| continue; |
| } |
| return connection; |
| } |
| |
| if (connections.Length > 0) |
| { |
| throw new ConnectionPoolBusyException(_poolSize, _maxInProcessPerConnection); |
| } |
| else |
| { |
| throw new ServerUnavailableException(); |
| } |
| } |
| |
| private void ProtectIndexFromOverflowing(int currentIndex) |
| { |
| if (currentIndex > ConnectionIndexOverflowLimit) |
| Interlocked.Exchange(ref _connectionIndex, 0); |
| } |
| |
| private void RemoveConnectionFromPool(Connection connection) |
| { |
| if (_connections.TryRemove(connection)) |
| DefinitelyDestroyConnection(connection); |
| } |
| |
| private IConnection ProxiedConnection(Connection connection) |
| { |
| return new ProxyConnection(connection, ReturnConnectionIfOpen); |
| } |
| |
| private void ReturnConnectionIfOpen(Connection connection) |
| { |
| if (connection.IsOpen) return; |
| ConsiderUnavailable(); |
| } |
| |
| private void ConsiderUnavailable() |
| { |
| CloseAndRemoveAllConnectionsAsync().WaitUnwrap(); |
| } |
| |
| private async Task CloseAndRemoveAllConnectionsAsync() |
| { |
| foreach (var connection in _connections.RemoveAndGetAll()) |
| { |
| await connection.CloseAsync().ConfigureAwait(false); |
| DefinitelyDestroyConnection(connection); |
| } |
| } |
| |
| private void DefinitelyDestroyConnection(Connection connection) |
| { |
| connection.Dispose(); |
| } |
| |
| #region IDisposable Support |
| |
| private bool _disposed; |
| |
| public void Dispose() |
| { |
| Dispose(true); |
| GC.SuppressFinalize(this); |
| } |
| |
| protected virtual void Dispose(bool disposing) |
| { |
| if (!_disposed) |
| { |
| if (disposing) |
| CloseAndRemoveAllConnectionsAsync().WaitUnwrap(); |
| _disposed = true; |
| } |
| } |
| |
| #endregion |
| } |
| } |