blob: 286183ad602d0fbfddecc63be6e66e31d1de30b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <ignite/common/utils.h>
#include <ignite/network/utils.h>
#include "network/sockets.h"
#include "network/linux_async_client_pool.h"
namespace ignite
{
namespace network
{
LinuxAsyncClientPool::LinuxAsyncClientPool() :
stopping(true),
asyncHandler(0),
workerThread(*this),
idGen(0),
clientsCs(),
clientIdMap()
{
// No-op.
}
LinuxAsyncClientPool::~LinuxAsyncClientPool()
{
InternalStop();
}
void LinuxAsyncClientPool::Start(const std::vector<TcpRange> &addrs, uint32_t connLimit)
{
if (!stopping)
throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Client pool is already started");
idGen = 0;
stopping = false;
try
{
workerThread.Start0(connLimit, addrs);
}
catch (...)
{
Stop();
throw;
}
}
void LinuxAsyncClientPool::Stop()
{
InternalStop();
}
bool LinuxAsyncClientPool::Send(uint64_t id, const DataBuffer &data)
{
if (stopping)
return false;
SP_LinuxAsyncClient client = FindClient(id);
if (!client.IsValid())
return false;
return client.Get()->Send(data);
}
void LinuxAsyncClientPool::Close(uint64_t id, const IgniteError *err)
{
if (stopping)
return;
SP_LinuxAsyncClient client = FindClient(id);
if (client.IsValid() && !client.Get()->IsClosed())
client.Get()->Shutdown(err);
}
void LinuxAsyncClientPool::CloseAndRelease(uint64_t id, const IgniteError *err)
{
if (stopping)
return;
SP_LinuxAsyncClient client;
{
common::concurrent::CsLockGuard lock(clientsCs);
std::map<uint64_t, SP_LinuxAsyncClient>::iterator it = clientIdMap.find(id);
if (it == clientIdMap.end())
return;
client = it->second;
clientIdMap.erase(it);
}
bool closed = client.Get()->Close();
if (closed)
{
IgniteError err0(client.Get()->GetCloseError());
if (err0.GetCode() == IgniteError::IGNITE_SUCCESS)
err0 = IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection closed by server");
if (!err)
err = &err0;
HandleConnectionClosed(id, err);
}
}
bool LinuxAsyncClientPool::AddClient(SP_LinuxAsyncClient &client)
{
if (stopping)
return false;
LinuxAsyncClient& clientRef = *client.Get();
{
common::concurrent::CsLockGuard lock(clientsCs);
uint64_t id = ++idGen;
clientRef.SetId(id);
clientIdMap[id] = client;
}
HandleConnectionSuccess(clientRef.GetAddress(), clientRef.GetId());
return true;
}
void LinuxAsyncClientPool::HandleConnectionError(const EndPoint &addr, const IgniteError &err)
{
AsyncHandler* asyncHandler0 = asyncHandler;
if (asyncHandler0)
asyncHandler0->OnConnectionError(addr, err);
}
void LinuxAsyncClientPool::HandleConnectionSuccess(const EndPoint &addr, uint64_t id)
{
AsyncHandler* asyncHandler0 = asyncHandler;
if (asyncHandler0)
asyncHandler0->OnConnectionSuccess(addr, id);
}
void LinuxAsyncClientPool::HandleConnectionClosed(uint64_t id, const IgniteError *err)
{
AsyncHandler* asyncHandler0 = asyncHandler;
if (asyncHandler0)
asyncHandler0->OnConnectionClosed(id, err);
}
void LinuxAsyncClientPool::HandleMessageReceived(uint64_t id, const DataBuffer &msg)
{
AsyncHandler* asyncHandler0 = asyncHandler;
if (asyncHandler0)
asyncHandler0->OnMessageReceived(id, msg);
}
void LinuxAsyncClientPool::HandleMessageSent(uint64_t id)
{
AsyncHandler* asyncHandler0 = asyncHandler;
if (asyncHandler0)
asyncHandler0->OnMessageSent(id);
}
void LinuxAsyncClientPool::InternalStop()
{
stopping = true;
workerThread.Stop();
{
common::concurrent::CsLockGuard lock(clientsCs);
std::map<uint64_t, SP_LinuxAsyncClient>::iterator it;
for (it = clientIdMap.begin(); it != clientIdMap.end(); ++it)
{
LinuxAsyncClient& client = *it->second.Get();
IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Client stopped");
HandleConnectionClosed(client.GetId(), &err);
}
clientIdMap.clear();
}
}
SP_LinuxAsyncClient LinuxAsyncClientPool::FindClient(uint64_t id) const
{
common::concurrent::CsLockGuard lock(clientsCs);
std::map<uint64_t, SP_LinuxAsyncClient>::const_iterator it = clientIdMap.find(id);
if (it == clientIdMap.end())
return SP_LinuxAsyncClient();
return it->second;
}
}
}