blob: 67529c0eb97d8e54b6c1c4ee5e052dcfbbaf4808 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <cstring>
#include <ignite/network/utils.h>
#include <ignite/common/utils.h>
#include "network/linux_async_worker_thread.h"
#include "network/linux_async_client_pool.h"
namespace
{
ignite::common::FibonacciSequence<10> fibonacci10;
}
namespace ignite
{
namespace network
{
LinuxAsyncWorkerThread::LinuxAsyncWorkerThread(LinuxAsyncClientPool &clientPool) :
clientPool(clientPool),
stopping(true),
epoll(-1),
stopEvent(-1),
nonConnected(),
currentConnection(),
currentClient(),
failedAttempts(0),
lastConnectionTime(),
minAddrs(0)
{
memset(&lastConnectionTime, 0, sizeof(lastConnectionTime));
}
LinuxAsyncWorkerThread::~LinuxAsyncWorkerThread()
{
Stop();
}
void LinuxAsyncWorkerThread::Start0(size_t limit, const std::vector<TcpRange> &addrs)
{
epoll = epoll_create(1);
if (epoll < 0)
common::ThrowLastSystemError("Failed to create epoll instance");
stopEvent = eventfd(0, EFD_NONBLOCK);
if (stopEvent < 0)
{
std::string msg = common::GetLastSystemError("Failed to create stop event instance");
close(stopEvent);
common::ThrowSystemError(msg);
}
epoll_event event;
memset(&event, 0, sizeof(event));
event.events = EPOLLIN;
int res = epoll_ctl(epoll, EPOLL_CTL_ADD, stopEvent, &event);
if (res < 0)
{
std::string msg = common::GetLastSystemError("Failed to create stop event instance");
close(stopEvent);
close(epoll);
common::ThrowSystemError(msg);
}
stopping = false;
failedAttempts = 0;
nonConnected = addrs;
currentConnection.reset();
currentClient = SP_LinuxAsyncClient();
if (!limit || limit > addrs.size())
minAddrs = 0;
else
minAddrs = addrs.size() - limit;
Thread::Start();
}
void LinuxAsyncWorkerThread::Stop()
{
if (stopping)
return;
stopping = true;
int64_t value = 1;
ssize_t res = write(stopEvent, &value, sizeof(value));
IGNITE_UNUSED(res);
assert(res == sizeof(value));
Thread::Join();
close(stopEvent);
close(epoll);
nonConnected.clear();
currentConnection.reset();
}
void LinuxAsyncWorkerThread::Run()
{
while (!stopping)
{
HandleNewConnections();
if (stopping)
break;
HandleConnectionEvents();
}
}
void LinuxAsyncWorkerThread::HandleNewConnections()
{
if (!ShouldInitiateNewConnection())
return;
if (CalculateConnectionTimeout() > 0)
return;
addrinfo* addr = 0;
if (currentConnection.get())
addr = currentConnection->Next();
if (!addr)
{
size_t idx = rand() % nonConnected.size();
const TcpRange& range = nonConnected.at(idx);
currentConnection.reset(new ConnectingContext(range));
addr = currentConnection->Next();
if (!addr)
{
currentConnection.reset();
ReportConnectionError(EndPoint(), "Can not resolve a single address from range: " + range.ToString());
++failedAttempts;
return;
}
}
// Create a SOCKET for connecting to server
int socketFd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (SOCKET_ERROR == socketFd)
{
ReportConnectionError(currentConnection->GetAddress(),
"Socket creation failed: " + sockets::GetLastSocketErrorMessage());
return;
}
sockets::TrySetSocketOptions(socketFd, LinuxAsyncClient::BUFFER_SIZE, true, true, true);
bool success = sockets::SetNonBlockingMode(socketFd, true);
if (!success)
{
ReportConnectionError(currentConnection->GetAddress(),
"Can not make non-blocking socket: " + sockets::GetLastSocketErrorMessage());
return;
}
currentClient = currentConnection->ToClient(socketFd);
bool ok = currentClient.Get()->StartMonitoring(epoll);
if (!ok)
common::ThrowLastSystemError("Can not add file descriptor to epoll");
// Connect to server.
int res = connect(socketFd, addr->ai_addr, addr->ai_addrlen);
if (SOCKET_ERROR == res)
{
int lastError = errno;
clock_gettime(CLOCK_MONOTONIC, &lastConnectionTime);
if (lastError != EWOULDBLOCK && lastError != EINPROGRESS)
{
HandleConnectionFailed("Failed to establish connection with the host: " +
sockets::GetSocketErrorMessage(lastError));
return;
}
}
}
void LinuxAsyncWorkerThread::HandleConnectionEvents()
{
enum { MAX_EVENTS = 16 };
epoll_event events[MAX_EVENTS];
int timeout = CalculateConnectionTimeout();
int res = epoll_wait(epoll, events, MAX_EVENTS, timeout);
if (res <= 0)
return;
for (int i = 0; i < res; ++i)
{
epoll_event& currentEvent = events[i];
LinuxAsyncClient* client = static_cast<LinuxAsyncClient*>(currentEvent.data.ptr);
if (!client)
continue;
if (client == currentClient.Get())
{
if (currentEvent.events & (EPOLLRDHUP | EPOLLERR))
{
HandleConnectionFailed("Can not establish connection");
continue;
}
HandleConnectionSuccess(client);
}
if (currentEvent.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP))
{
HandleConnectionClosed(client);
continue;
}
if (currentEvent.events & EPOLLIN)
{
DataBuffer msg = client->Receive();
if (msg.IsEmpty())
{
HandleConnectionClosed(client);
continue;
}
clientPool.HandleMessageReceived(client->GetId(), msg);
}
if (currentEvent.events & EPOLLOUT)
{
bool ok = client->ProcessSent();
if (!ok)
{
HandleConnectionClosed(client);
continue;
}
}
}
}
void LinuxAsyncWorkerThread::ReportConnectionError(const EndPoint& addr, const std::string& msg)
{
IgniteError err(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
clientPool.HandleConnectionError(addr, err);
}
void LinuxAsyncWorkerThread::HandleConnectionFailed(const std::string& msg)
{
LinuxAsyncClient* client = currentClient.Get();
assert(client != 0);
client->StopMonitoring();
client->Close();
ReportConnectionError(client->GetAddress(), msg);
currentClient = SP_LinuxAsyncClient();
++failedAttempts;
}
void LinuxAsyncWorkerThread::HandleConnectionClosed(LinuxAsyncClient *client)
{
client->StopMonitoring();
nonConnected.push_back(client->GetRange());
clientPool.CloseAndRelease(client->GetId(), 0);
}
void LinuxAsyncWorkerThread::HandleConnectionSuccess(LinuxAsyncClient* client)
{
nonConnected.erase(std::find(nonConnected.begin(), nonConnected.end(), client->GetRange()));
clientPool.AddClient(currentClient);
currentClient = SP_LinuxAsyncClient();
currentConnection.reset();
failedAttempts = 0;
clock_gettime(CLOCK_MONOTONIC, &lastConnectionTime);
}
int LinuxAsyncWorkerThread::CalculateConnectionTimeout() const
{
if (!ShouldInitiateNewConnection())
return -1;
if (lastConnectionTime.tv_sec == 0)
return 0;
int timeout = fibonacci10.GetValue(failedAttempts) * 1000;
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
int passed = (now.tv_sec - lastConnectionTime.tv_sec) * 1000 +
(now.tv_nsec - lastConnectionTime.tv_nsec) / 1000000;
timeout -= passed;
if (timeout < 0)
timeout = 0;
return timeout;
}
bool LinuxAsyncWorkerThread::ShouldInitiateNewConnection() const
{
return !currentClient.Get() && nonConnected.size() > minAddrs;
}
}
}