blob: d6b469a96365f6def3b13afc6957f05a84209bc7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TcpConn.hpp"
#include <gfcpp/DistributedSystem.hpp>
#include <gfcpp/SystemProperties.hpp>
#include <gfcpp/Log.hpp>
#include <memory.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_IO.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/OS.h>
using namespace apache::geode::client;
int TcpConn::m_chunkSize = TcpConn::setChunkSize();
void TcpConn::clearNagle(ACE_SOCKET sock) {
int32_t val = 1;
#ifdef WIN32
const char *param = (const char *)&val;
#else
const void *param = (const void *)&val;
#endif
int32_t plen = sizeof(val);
if (0 != setsockopt(sock, IPPROTO_TCP, 1, param, plen)) {
int32_t lastError = ACE_OS::last_error();
LOGERROR("Failed to set TCP_NODELAY on socket. Errno: %d: %s", lastError,
ACE_OS::strerror(lastError));
}
}
int32_t TcpConn::maxSize(ACE_SOCKET sock, int32_t flag, int32_t size) {
int32_t val = 0;
#ifdef _WIN32
const char *cparam = (const char *)&val;
char *param = (char *)&val;
#else
const void *cparam = (const void *)&val;
void *param = (void *)&val;
#endif
socklen_t plen = sizeof(val);
socklen_t clen = sizeof(val);
static int32_t max = 32000;
if (m_maxBuffSizePool <= 0) {
SystemProperties *props = DistributedSystem::getSystemProperties();
if (props != NULL) {
max = props->maxSocketBufferSize();
}
} else {
max = m_maxBuffSizePool;
}
int32_t inc = 32120;
val = size - (3 * inc);
if (val < 0) val = 0;
if (size == 0) size = max;
int32_t red = 0;
int32_t lastRed = -1;
while (lastRed != red) {
lastRed = red;
val += inc;
if (0 != setsockopt(sock, SOL_SOCKET, flag, cparam, clen)) {
int32_t lastError = ACE_OS::last_error();
LOGERROR("Failed to set socket options. Errno: %d : %s ", lastError,
ACE_OS::strerror(lastError));
}
if (0 != getsockopt(sock, SOL_SOCKET, flag, param, &plen)) {
int32_t lastError = ACE_OS::last_error();
LOGERROR(
"Failed to get buffer size for flag %d on socket. Errno: %d : %s",
flag, lastError, ACE_OS::strerror(lastError));
}
#ifdef _LINUX
val /= 2;
#endif
if ((val >= max) || (val >= size)) continue;
red = val;
}
return val;
}
void TcpConn::createSocket(ACE_SOCKET sock) {
LOGDEBUG("Creating plain socket stream");
m_io = new ACE_SOCK_Stream((ACE_HANDLE)sock);
// m_io->enable(ACE_NONBLOCK);
}
void TcpConn::init() {
/* adongre
* CID 28736: Improper use of negative value (NEGATIVE_RETURNS)
* Function "socket(2, 1, 0)" returns a negative number.
* Assigning: unsigned variable "sock" = "socket".
*
* CID 28737: Unsigned compared against 0 (NO_EFFECT)
* This less-than-zero comparison of an unsigned value is never true. "sock <
* 0U".
*/
ACE_SOCKET sock = socket(AF_INET, SOCK_STREAM, 0);
// if ( sock < 0 ) {
if (sock == -1) {
int32_t lastError = ACE_OS::last_error();
LOGERROR("Failed to create socket. Errno: %d: %s", lastError,
ACE_OS::strerror(lastError));
char msg[256];
ACE_OS::snprintf(msg, 256, "TcpConn::connect failed with errno: %d: %s",
lastError, ACE_OS::strerror(lastError));
throw GeodeIOException(msg);
}
clearNagle(sock);
static int32_t readSize = 0;
static int32_t writeSize = 0;
int32_t originalReadSize = readSize;
readSize = maxSize(sock, SO_SNDBUF, readSize);
if (originalReadSize != readSize) {
// This should get logged once at startup and again only if it changes
LOGINFO("Using socket send buffer size of %d.", readSize);
}
int32_t originalWriteSize = writeSize;
writeSize = maxSize(sock, SO_RCVBUF, writeSize);
if (originalWriteSize != writeSize) {
// This should get logged once at startup and again only if it changes
LOGINFO("Using socket receive buffer size of %d.", writeSize);
}
createSocket(sock);
connect();
}
TcpConn::TcpConn() : m_io(NULL), m_waitSeconds(0), m_maxBuffSizePool(0) {}
TcpConn::TcpConn(const char *ipaddr, uint32_t waitSeconds,
int32_t maxBuffSizePool)
: m_io(NULL),
m_addr(ipaddr),
m_waitSeconds(waitSeconds),
m_maxBuffSizePool(maxBuffSizePool) {}
TcpConn::TcpConn(const char *hostname, int32_t port, uint32_t waitSeconds,
int32_t maxBuffSizePool)
: m_io(NULL),
m_addr(port, hostname),
m_waitSeconds(waitSeconds),
m_maxBuffSizePool(maxBuffSizePool) {}
void TcpConn::listen(const char *hostname, int32_t port, uint32_t waitSeconds) {
ACE_INET_Addr addr(port, hostname);
listen(addr, waitSeconds);
}
void TcpConn::listen(const char *ipaddr, uint32_t waitSeconds) {
ACE_INET_Addr addr(ipaddr);
listen(addr, waitSeconds);
}
void TcpConn::listen(ACE_INET_Addr addr, uint32_t waitSeconds) {
GF_DEV_ASSERT(m_io != NULL);
ACE_SOCK_Acceptor listener(addr, 1);
int32_t retVal = 0;
if (waitSeconds > 0) {
ACE_Time_Value wtime(waitSeconds);
retVal = listener.accept(*m_io, 0, &wtime);
} else {
retVal = listener.accept(*m_io, 0);
}
if (retVal == -1) {
char msg[256];
int32_t lastError = ACE_OS::last_error();
if (lastError == ETIME || lastError == ETIMEDOUT) {
/* adongre
* Coverity - II
* CID 29270: Calling risky function (SECURE_CODING)[VERY RISKY]. Using
* "sprintf" can cause a
* buffer overflow when done incorrectly. Because sprintf() assumes an
* arbitrarily long string,
* callers must be careful not to overflow the actual space of the
* destination.
* Use snprintf() instead, or correct precision specifiers.
* Fix : using ACE_OS::snprintf
*/
ACE_OS::snprintf(
msg, 256,
"TcpConn::listen Attempt to listen timed out after %d seconds.",
waitSeconds);
throw TimeoutException(msg);
}
ACE_OS::snprintf(msg, 256, "TcpConn::listen failed with errno: %d: %s",
lastError, ACE_OS::strerror(lastError));
throw GeodeIOException(msg);
}
}
void TcpConn::connect(const char *hostname, int32_t port,
uint32_t waitSeconds) {
ACE_INET_Addr addr(port, hostname);
m_addr = addr;
m_waitSeconds = waitSeconds;
connect();
}
void TcpConn::connect(const char *ipaddr, uint32_t waitSeconds) {
ACE_INET_Addr addr(ipaddr);
m_addr = addr;
m_waitSeconds = waitSeconds;
connect();
}
void TcpConn::connect() {
GF_DEV_ASSERT(m_io != NULL);
ACE_INET_Addr ipaddr = m_addr;
uint32_t waitSeconds = m_waitSeconds;
ACE_OS::signal(SIGPIPE, SIG_IGN); // Ignore broken pipe
// passing waittime as microseconds
if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) {
waitSeconds = waitSeconds * 1000;
} else {
waitSeconds = waitSeconds * (1000 * 1000);
}
LOGFINER("Connecting plain socket stream to %s:%d waiting %d micro sec",
ipaddr.get_host_name(), ipaddr.get_port_number(), waitSeconds);
ACE_SOCK_Connector conn;
int32_t retVal = 0;
if (waitSeconds > 0) {
// passing waittime as microseconds
ACE_Time_Value wtime(0, waitSeconds);
retVal = conn.connect(*m_io, ipaddr, &wtime);
} else {
retVal = conn.connect(*m_io, ipaddr);
}
if (retVal == -1) {
char msg[256];
int32_t lastError = ACE_OS::last_error();
if (lastError == ETIME || lastError == ETIMEDOUT) {
ACE_OS::snprintf(
msg, 256,
"TcpConn::connect Attempt to connect timed out after %d seconds.",
waitSeconds);
// this is only called by constructor, so we must delete m_io
GF_SAFE_DELETE(m_io);
throw TimeoutException(msg);
}
ACE_OS::snprintf(msg, 256, "TcpConn::connect failed with errno: %d: %s",
lastError, ACE_OS::strerror(lastError));
// this is only called by constructor, so we must delete m_io
GF_SAFE_DELETE(m_io);
throw GeodeIOException(msg);
}
int rc = this->m_io->enable(ACE_NONBLOCK);
if (-1 == rc) {
char msg[250];
int32_t lastError = ACE_OS::last_error();
ACE_OS::snprintf(msg, 256, "TcpConn::NONBLOCK: %d: %s", lastError,
ACE_OS::strerror(lastError));
LOGINFO(msg);
}
}
void TcpConn::close() {
if (m_io != NULL) {
m_io->close();
GF_SAFE_DELETE(m_io);
}
}
int32_t TcpConn::receive(char *buff, int32_t len, uint32_t waitSeconds,
uint32_t waitMicroSeconds) {
return socketOp(SOCK_READ, buff, len, waitSeconds);
}
int32_t TcpConn::send(const char *buff, int32_t len, uint32_t waitSeconds,
uint32_t waitMicroSeconds) {
return socketOp(SOCK_WRITE, const_cast<char *>(buff), len, waitSeconds);
}
int32_t TcpConn::socketOp(TcpConn::SockOp op, char *buff, int32_t len,
uint32_t waitSeconds) {
{
/*{
ACE_HANDLE handle = m_io->get_handle();
int val = ACE::get_flags (handle);
if (ACE_BIT_DISABLED (val, ACE_NONBLOCK))
{
//ACE::set_flags (handle, ACE_NONBLOCK);
LOGINFO("Flag is not set");
}else
{
LOGINFO("Flag is set");
}
}*/
GF_DEV_ASSERT(m_io != NULL);
GF_DEV_ASSERT(buff != NULL);
#if GF_DEVEL_ASSERTS == 1
if (len <= 0) {
LOGERROR(
"TcpConn::socketOp called with a length of %d specified. "
"No operation performed.",
len);
GF_DEV_ASSERT(false);
}
#endif
ACE_Time_Value waitTime(0, waitSeconds /*now its in microSeconds*/);
ACE_Time_Value endTime(ACE_OS::gettimeofday() + waitTime);
ACE_Time_Value sleepTime(0, 100);
size_t readLen = 0;
ssize_t retVal;
bool errnoSet = false;
int32_t sendlen = len;
int32_t totalsend = 0;
while (len > 0 && waitTime > ACE_Time_Value::zero) {
if (len > m_chunkSize) {
sendlen = m_chunkSize;
len -= m_chunkSize;
} else {
sendlen = len;
len = 0;
}
do {
if (op == SOCK_READ) {
retVal = m_io->recv_n(buff, sendlen, &waitTime, &readLen);
} else {
retVal = m_io->send_n(buff, sendlen, &waitTime, &readLen);
}
sendlen -= static_cast<int32_t>(readLen);
totalsend += static_cast<int32_t>(readLen);
if (retVal < 0) {
int32_t lastError = ACE_OS::last_error();
if (lastError == EAGAIN) {
ACE_OS::sleep(sleepTime);
} else {
errnoSet = true;
break;
}
} else if (retVal == 0 && readLen == 0) {
ACE_OS::last_error(EPIPE);
errnoSet = true;
break;
}
buff += readLen;
if (sendlen == 0) break;
waitTime = endTime - ACE_OS::gettimeofday();
if (waitTime <= ACE_Time_Value::zero) break;
} while (sendlen > 0);
if (errnoSet) break;
}
if (len > 0 && !errnoSet) {
ACE_OS::last_error(ETIME);
}
GF_DEV_ASSERT(len >= 0);
return totalsend;
}
}
// Return the local port for this TCP connection.
uint16_t TcpConn::getPort() {
GF_DEV_ASSERT(m_io != NULL);
ACE_INET_Addr localAddr;
m_io->get_local_addr(*(ACE_Addr *)&localAddr);
return localAddr.get_port_number();
}