blob: 18846c7e20ad64e65c26efe9e6af7227652abc6b [file] [log] [blame]
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace Apache.IoTDB
{
public class ConcurrentClientQueue
{
public ConcurrentQueue<Client> ClientQueue { get; }
public ConcurrentClientQueue(List<Client> clients)
{
ClientQueue = new ConcurrentQueue<Client>(clients);
}
public ConcurrentClientQueue()
{
ClientQueue = new ConcurrentQueue<Client>();
}
public void Add(Client client) => Return(client);
public void Return(Client client)
{
Monitor.Enter(ClientQueue);
ClientQueue.Enqueue(client);
#if DEBUG
Console.WriteLine($"Thread{Thread.CurrentThread.ManagedThreadId} put back {client}");
#endif
Monitor.Pulse(ClientQueue);
Monitor.Exit(ClientQueue);
Thread.Sleep(0);
}
int _ref = 0;
public void AddRef()
{
lock (this)
{
_ref++;
}
}
public int GetRef()
{
return _ref;
}
public void RemoveRef()
{
lock (this)
{
_ref--;
}
}
public int Timeout { get; set; } = 10;
public Client Take()
{
Client client = null;
Monitor.Enter(ClientQueue);
if (ClientQueue.IsEmpty)
{
#if DEBUG
Console.WriteLine($"Thread{Thread.CurrentThread.ManagedThreadId} the connection pool is empty, wait timeout:{Timeout}");
#endif
Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
}
if (!ClientQueue.TryDequeue(out client))
{
#if DEBUG
Console.WriteLine($"Thread{Thread.CurrentThread.ManagedThreadId} obtaining a client from the client pool failed, wait and try again");
#endif
}
else
{
#if DEBUG
Console.WriteLine($"Thread{Thread.CurrentThread.ManagedThreadId} take away {client}");
#endif
}
Monitor.Exit(ClientQueue);
if (client == null)
{
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
}
return client;
}
}
}