blob: 75ff53544ea6892cd60d2bdfd156f5f3419fe93d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/net/socket.h"
#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <unistd.h>
#include <cerrno>
#include <cinttypes>
#include <cstring>
#include <limits>
#include <ostream>
#include <string>
#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/errno.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket_info.pb.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/slice.h"
DEFINE_string(local_ip_for_outbound_sockets, "",
"IP to bind to when making outgoing socket connections. "
"This must be an IP address of the form A.B.C.D, not a hostname. "
"Advanced parameter, subject to change.");
TAG_FLAG(local_ip_for_outbound_sockets, experimental);
DEFINE_bool(socket_inject_short_recvs, false,
"Inject short recv() responses which return less data than "
"requested");
TAG_FLAG(socket_inject_short_recvs, hidden);
TAG_FLAG(socket_inject_short_recvs, unsafe);
using std::string;
using std::numeric_limits;
using strings::Substitute;
namespace kudu {
namespace {
Status ParseIpAddress(const string& addr_str, Sockaddr* result) {
DCHECK(!addr_str.empty());
Sockaddr bind_host;
const auto s = bind_host.ParseString(addr_str, 0);
if (PREDICT_FALSE(!s.ok() || bind_host.port() != 0)) {
if (!s.ok()) {
return Status::InvalidArgument(
Substitute("$0: invalid local IP address", addr_str), s.ToString());
}
return Status::InvalidArgument(
Substitute("$0: unexpected port with IP address", addr_str));
}
if (result) {
*result = std::move(bind_host);
}
return Status::OK();
}
bool ValidateLocalIpForOutboundSockets(
const char* flagname, const string& value) {
if (value.empty()) {
// The default value should pass the validation.
return true;
}
if (auto s = ParseIpAddress(value, nullptr); !s.ok()) {
LOG(ERROR) << Substitute("invalid local IP '$0' for --$1: $2",
value, flagname, s.ToString());
return false;
}
return true;
}
DEFINE_validator(local_ip_for_outbound_sockets,
&ValidateLocalIpForOutboundSockets);
} // anonymous namespace
namespace {
// tcp_info struct duplicated from linux/tcp.h.
//
// This allows us to decouple the compile-time Linux headers from the
// runtime Linux kernel. The compile-time headers (and kernel) might be
// older than the runtime kernel, in which case an ifdef-based approach
// wouldn't allow us to get all of the info available.
//
// NOTE: this struct has been annotated with some local notes about the
// contents of each field.
struct tcp_info { // NOLINT(readability-identifier-naming)
// Various state-tracking information.
// ------------------------------------------------------------
uint8_t tcpi_state;
uint8_t tcpi_ca_state;
uint8_t tcpi_retransmits;
uint8_t tcpi_probes;
uint8_t tcpi_backoff;
uint8_t tcpi_options;
uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
uint8_t tcpi_delivery_rate_app_limited:1;
// Configurations.
// ------------------------------------------------------------
uint32_t tcpi_rto;
uint32_t tcpi_ato;
uint32_t tcpi_snd_mss;
uint32_t tcpi_rcv_mss;
// Counts of packets in various states in the outbound queue.
// At first glance one might think these are monotonic counters, but
// in fact they are instantaneous counts of queued packets and thus
// not very useful for our purposes.
// ------------------------------------------------------------
// Number of packets outstanding that haven't been acked.
uint32_t tcpi_unacked;
// Number of packets outstanding that have been selective-acked.
uint32_t tcpi_sacked;
// Number of packets outstanding that have been deemed lost (a SACK arrived
// for a later packet)
uint32_t tcpi_lost;
// Number of packets in the queue that have been retransmitted.
uint32_t tcpi_retrans;
// The number of packets towards the highest SACKed sequence number
// (some measure of reording, removed in later Linux versions by
// 737ff314563ca27f044f9a3a041e9d42491ef7ce)
uint32_t tcpi_fackets;
// Times when various events occurred.
// ------------------------------------------------------------
uint32_t tcpi_last_data_sent;
uint32_t tcpi_last_ack_sent; /* Not remembered, sorry. */
uint32_t tcpi_last_data_recv;
uint32_t tcpi_last_ack_recv;
// Path MTU.
uint32_t tcpi_pmtu;
// Receiver slow start threshold.
uint32_t tcpi_rcv_ssthresh;
// Smoothed RTT estimate and variance based on the time between sending data and receiving
// corresponding ACK. See https://tools.ietf.org/html/rfc2988 for details.
uint32_t tcpi_rtt;
uint32_t tcpi_rttvar;
// Slow start threshold.
uint32_t tcpi_snd_ssthresh;
// Sender congestion window (in number of MSS-sized packets)
uint32_t tcpi_snd_cwnd;
// Advertised MSS.
uint32_t tcpi_advmss;
// Amount of packet reordering allowed.
uint32_t tcpi_reordering;
// Receiver-side RTT estimate per the Dynamic Right Sizing algorithm:
//
// "A system that is only transmitting acknowledgements can still estimate the round-trip
// time by observing the time between when a byte is first acknowledged and the receipt of
// data that is at least one window beyond the sequence number that was acknowledged. If the
// sender is being throttled by the network, this estimate will be valid. However, if the
// sending application did not have any data to send, the measured time could be much larger
// than the actual round-trip time. Thus this measurement acts only as an upper-bound on the
// round-trip time and should be be used only when it is the only source of round-trip time
// information."
uint32_t tcpi_rcv_rtt;
uint32_t tcpi_rcv_space;
// Total number of retransmitted packets.
uint32_t tcpi_total_retrans;
// Pacing-related metrics.
uint64_t tcpi_pacing_rate;
uint64_t tcpi_max_pacing_rate;
// Total bytes ACKed by remote peer.
uint64_t tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
// Total bytes received (for which ACKs have been sent out).
uint64_t tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
// Segments sent and received.
uint32_t tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */
uint32_t tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */
// The following metrics are quite new and not in el7.
// ------------------------------------------------------------
uint32_t tcpi_notsent_bytes;
uint32_t tcpi_min_rtt;
uint32_t tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */
uint32_t tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */
// Calculated rate at which data was delivered.
uint64_t tcpi_delivery_rate;
// Timers for various states.
uint64_t tcpi_busy_time; /* Time (usec) busy sending data */
uint64_t tcpi_rwnd_limited; /* Time (usec) limited by receive window */
uint64_t tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
};
} // anonymous namespace
Socket::Socket()
: fd_(-1) {
}
Socket::Socket(int fd)
: fd_(fd) {
}
Socket::Socket(Socket&& other) noexcept
: fd_(other.Release()) {
}
void Socket::Reset(int fd) {
ignore_result(Close());
fd_ = fd;
}
int Socket::Release() {
int fd = fd_;
fd_ = -1;
return fd;
}
Socket::~Socket() {
ignore_result(Close());
}
Status Socket::Close() {
if (fd_ < 0) {
return Status::OK();
}
int fd = fd_;
int ret;
RETRY_ON_EINTR(ret, ::close(fd));
if (ret < 0) {
int err = errno;
return Status::NetworkError("close error", ErrnoToString(err), err);
}
fd_ = -1;
return Status::OK();
}
Status Socket::Shutdown(bool shut_read, bool shut_write) {
DCHECK_GE(fd_, 0);
int flags = 0;
if (shut_read && shut_write) {
flags |= SHUT_RDWR;
} else if (shut_read) {
flags |= SHUT_RD;
} else if (shut_write) {
flags |= SHUT_WR;
}
if (::shutdown(fd_, flags) < 0) {
int err = errno;
return Status::NetworkError("shutdown error", ErrnoToString(err), err);
}
return Status::OK();
}
int Socket::GetFd() const {
return fd_;
}
bool Socket::IsTemporarySocketError(int err) {
return ((err == EAGAIN) || (err == EWOULDBLOCK) || (err == EINTR));
}
#if defined(__linux__)
Status Socket::Init(int family, int flags) {
int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0;
Reset(::socket(family, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
if (fd_ < 0) {
int err = errno;
return Status::NetworkError("error opening socket", ErrnoToString(err), err);
}
return Status::OK();
}
#else
Status Socket::Init(int family, int flags) {
Reset(::socket(family, SOCK_STREAM, 0));
if (fd_ < 0) {
int err = errno;
return Status::NetworkError("error opening socket", ErrnoToString(err), err);
}
RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING));
RETURN_NOT_OK(SetCloseOnExec());
// Disable SIGPIPE.
int set = 1;
RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_NOSIGPIPE, set),
"failed to set SO_NOSIGPIPE");
return Status::OK();
}
#endif // defined(__linux__)
Status Socket::SetNoDelay(bool enabled) {
int flag = enabled ? 1 : 0;
RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_NODELAY, flag),
"failed to set TCP_NODELAY");
return Status::OK();
}
Status Socket::SetTcpCork(bool enabled) {
#if defined(__linux__)
int flag = enabled ? 1 : 0;
RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_CORK, flag),
"failed to set TCP_CORK");
#endif // defined(__linux__)
// TODO(unknown): Use TCP_NOPUSH for OSX if perf becomes an issue.
return Status::OK();
}
Status Socket::SetNonBlocking(bool enabled) {
int curflags = ::fcntl(fd_, F_GETFL, 0);
if (curflags == -1) {
int err = errno;
return Status::NetworkError(
StringPrintf("Failed to get file status flags on fd %d", fd_),
ErrnoToString(err), err);
}
int newflags = (enabled) ? (curflags | O_NONBLOCK) : (curflags & ~O_NONBLOCK);
if (::fcntl(fd_, F_SETFL, newflags) == -1) {
int err = errno;
if (enabled) {
return Status::NetworkError(
StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_),
ErrnoToString(err), err);
} else {
return Status::NetworkError(
StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_),
ErrnoToString(err), err);
}
}
return Status::OK();
}
Status Socket::IsNonBlocking(bool* is_nonblock) const {
int curflags = ::fcntl(fd_, F_GETFL, 0);
if (curflags == -1) {
int err = errno;
return Status::NetworkError(
StringPrintf("Failed to get file status flags on fd %d", fd_),
ErrnoToString(err), err);
}
*is_nonblock = ((curflags & O_NONBLOCK) != 0);
return Status::OK();
}
Status Socket::SetCloseOnExec() {
int curflags = fcntl(fd_, F_GETFD, 0);
if (curflags == -1) {
int err = errno;
Reset(-1);
return Status::NetworkError("fcntl(F_GETFD) error", ErrnoToString(err), err);
}
if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) {
int err = errno;
Reset(-1);
return Status::NetworkError("fcntl(F_SETFD) error", ErrnoToString(err), err);
}
return Status::OK();
}
Status Socket::SetSendTimeout(const MonoDelta& timeout) {
return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout);
}
Status Socket::SetRecvTimeout(const MonoDelta& timeout) {
return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout);
}
Status Socket::SetReuseAddr(bool flag) {
int int_flag = flag ? 1 : 0;
RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, int_flag),
"failed to set SO_REUSEADDR");
return Status::OK();
}
Status Socket::SetReusePort(bool flag) {
#ifdef SO_REUSEPORT
int int_flag = flag ? 1 : 0;
RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_REUSEPORT, int_flag),
"failed to set SO_REUSEPORT");
return Status::OK();
#else
return Status::NotSupported("failed to set SO_REUSEPORT: protocol not available");
#endif
}
Status Socket::BindAndListen(const Sockaddr& sockaddr,
int listen_queue_size) {
RETURN_NOT_OK(SetReuseAddr(true));
RETURN_NOT_OK(Bind(sockaddr));
return Listen(listen_queue_size);
}
Status Socket::Listen(int listen_queue_size) {
if (listen(fd_, listen_queue_size)) {
int err = errno;
return Status::NetworkError("listen() error", ErrnoToString(err));
}
return Status::OK();
}
Status Socket::GetSocketAddress(Sockaddr* cur_addr) const {
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
DCHECK_GE(fd_, 0);
if (::getsockname(fd_, reinterpret_cast<struct sockaddr*>(&ss), &len) == -1) {
int err = errno;
return Status::NetworkError("getsockname error", ErrnoToString(err), err);
}
*cur_addr = Sockaddr(reinterpret_cast<struct sockaddr&>(ss), len);
return Status::OK();
}
Status Socket::GetPeerAddress(Sockaddr* cur_addr) const {
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
DCHECK_GE(fd_, 0);
if (::getpeername(fd_, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1) {
int err = errno;
return Status::NetworkError("getpeername error", ErrnoToString(err), err);
}
*cur_addr = Sockaddr(reinterpret_cast<const sockaddr&>(addr), len);
return Status::OK();
}
bool Socket::IsLoopbackConnection() const {
Sockaddr local, remote;
if (!GetSocketAddress(&local).ok()) return false;
if (!GetPeerAddress(&remote).ok()) return false;
// Check if remote address is in 127.0.0.0/8 subnet.
if (remote.IsAnyLocalAddress()) {
return true;
}
// Compare local and remote addresses without comparing ports.
local.set_port(0);
remote.set_port(0);
return local == remote;
}
Status Socket::Bind(const Sockaddr& bind_addr) {
DCHECK_GE(fd_, 0);
if (PREDICT_FALSE(::bind(fd_, bind_addr.addr(), bind_addr.addrlen()))) {
int err = errno;
Status s = Status::NetworkError(
strings::Substitute("error binding socket to $0: $1",
bind_addr.ToString(), ErrnoToString(err)),
Slice(), err);
if (s.IsNetworkError() && bind_addr.is_ip() &&
s.posix_code() == EADDRINUSE && bind_addr.port() != 0) {
TryRunLsof(bind_addr);
}
return s;
}
return Status::OK();
}
Status Socket::Accept(Socket* new_conn, Sockaddr* remote, int flags) {
TRACE_EVENT0("net", "Socket::Accept");
struct sockaddr_storage addr;
socklen_t olen = sizeof(addr);
DCHECK_GE(fd_, 0);
#if defined(__linux__)
int accept_flags = SOCK_CLOEXEC;
if (flags & FLAG_NONBLOCKING) {
accept_flags |= SOCK_NONBLOCK;
}
int fd = -1;
RETRY_ON_EINTR(fd, accept4(fd_, (struct sockaddr*)&addr,
&olen, accept_flags));
if (fd < 0) {
int err = errno;
return Status::NetworkError("accept4(2) error", ErrnoToString(err), err);
}
new_conn->Reset(fd);
#else
int fd = -1;
RETRY_ON_EINTR(fd, accept(fd_, (struct sockaddr*)&addr, &olen));
if (fd < 0) {
int err = errno;
return Status::NetworkError("accept(2) error", ErrnoToString(err), err);
}
new_conn->Reset(fd);
RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
RETURN_NOT_OK(new_conn->SetCloseOnExec());
#endif // defined(__linux__)
*remote = Sockaddr(reinterpret_cast<const sockaddr&>(addr), olen);
TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD,
"remote", remote->ToString());
return Status::OK();
}
Status Socket::BindForOutgoingConnection() {
Sockaddr bind_host;
RETURN_NOT_OK(ParseIpAddress(FLAGS_local_ip_for_outbound_sockets, &bind_host));
return Bind(bind_host);
}
Status Socket::Connect(const Sockaddr& remote) {
TRACE_EVENT1("net", "Socket::Connect",
"remote", remote.ToString());
if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
RETURN_NOT_OK(BindForOutgoingConnection());
}
DCHECK_GE(fd_, 0);
int ret;
RETRY_ON_EINTR(ret, ::connect(fd_, remote.addr(), remote.addrlen()));
if (ret < 0) {
int err = errno;
return Status::NetworkError("connect(2) error", ErrnoToString(err), err);
}
return Status::OK();
}
Status Socket::GetSockError() const {
int val = 0, ret;
socklen_t val_len = sizeof(val);
DCHECK_GE(fd_, 0);
ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len);
if (ret) {
int err = errno;
return Status::NetworkError("getsockopt(SO_ERROR) failed", ErrnoToString(err), err);
}
if (val != 0) {
return Status::NetworkError(ErrnoToString(val), Slice(), val);
}
return Status::OK();
}
Status Socket::Write(const uint8_t* buf, int32_t amt, int32_t* nwritten) {
if (amt <= 0) {
return Status::NetworkError(
StringPrintf("invalid send of %" PRId32 " bytes",
amt), Slice(), EINVAL);
}
DCHECK_GE(fd_, 0);
int res;
RETRY_ON_EINTR(res, ::send(fd_, buf, amt, MSG_NOSIGNAL));
if (res < 0) {
int err = errno;
return Status::NetworkError("write error", ErrnoToString(err), err);
}
*nwritten = res;
return Status::OK();
}
Status Socket::Writev(const struct ::iovec* iov,
int iov_len,
int64_t* nwritten) {
if (PREDICT_FALSE(iov_len <= 0)) {
return Status::NetworkError(
StringPrintf("writev: invalid io vector length of %d",
iov_len),
Slice(), EINVAL);
}
DCHECK_GE(fd_, 0);
struct msghdr msg;
memset(&msg, 0, sizeof(struct msghdr));
msg.msg_iov = const_cast<iovec*>(iov);
msg.msg_iovlen = iov_len;
ssize_t res;
RETRY_ON_EINTR(res, ::sendmsg(fd_, &msg, MSG_NOSIGNAL));
if (PREDICT_FALSE(res < 0)) {
int err = errno;
return Status::NetworkError("sendmsg error", ErrnoToString(err), err);
}
*nwritten = res;
return Status::OK();
}
// Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
Status Socket::BlockingWrite(const uint8_t* buf, size_t buflen, size_t* nwritten,
const MonoTime& deadline) {
DCHECK_LE(buflen, numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported";
DCHECK(nwritten);
size_t tot_written = 0;
while (tot_written < buflen) {
int32_t inc_num_written = 0;
int32_t num_to_write = buflen - tot_written;
MonoDelta timeout = deadline - MonoTime::Now();
if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
return Status::TimedOut(Substitute("sent $0 of $1 requested bytes",
tot_written, buflen));
}
RETURN_NOT_OK(SetSendTimeout(timeout));
Status s = Write(buf, num_to_write, &inc_num_written);
tot_written += inc_num_written;
buf += inc_num_written;
*nwritten = tot_written;
if (PREDICT_FALSE(!s.ok())) {
// Continue silently when the syscall is interrupted.
if (s.posix_code() == EINTR) {
continue;
}
if (s.posix_code() == EAGAIN) {
return Status::TimedOut(Substitute("sent $0 of $1 requested bytes",
tot_written, buflen));
}
return s.CloneAndPrepend("BlockingWrite error");
}
if (PREDICT_FALSE(inc_num_written == 0)) {
// Shouldn't happen on Linux with a blocking socket. Maybe other Unices.
break;
}
}
if (PREDICT_FALSE(tot_written < buflen)) {
return Status::IOError("Wrote zero bytes on a BlockingWrite() call",
StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen));
}
return Status::OK();
}
Status Socket::Recv(uint8_t* buf, int32_t amt, int32_t* nread) {
if (PREDICT_FALSE(amt <= 0)) {
return Status::NetworkError(
StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL);
}
// The recv() call can return fewer than the requested number of bytes.
// Especially when 'amt' is small, this is very unlikely to happen in
// the context of unit tests. So, we provide an injection hook which
// simulates the same behavior.
if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) {
Random r(GetRandomSeed32());
amt = 1 + r.Uniform(amt - 1);
}
DCHECK_GE(fd_, 0);
int res;
RETRY_ON_EINTR(res, recv(fd_, buf, amt, 0));
if (res <= 0) {
Sockaddr remote;
Status get_addr_status = GetPeerAddress(&remote);
string remote_str = get_addr_status.ok() ? remote.ToString() : "unknown peer";
if (res == 0) {
string error_message = Substitute("recv got EOF from $0", remote_str);
return Status::NetworkError(error_message, Slice(), ESHUTDOWN);
}
int err = errno;
string error_message = Substitute("recv error from $0", remote_str);
return Status::NetworkError(error_message, ErrnoToString(err), err);
}
*nread = res;
return Status::OK();
}
// Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
// One place where we deviate: we consider EOF a failure if < amt bytes are read.
Status Socket::BlockingRecv(uint8_t* buf, size_t amt, size_t* nread, const MonoTime& deadline) {
DCHECK_LE(amt, numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported";
DCHECK(nread);
size_t tot_read = 0;
while (tot_read < amt) {
int32_t inc_num_read = 0;
int32_t num_to_read = amt - tot_read;
MonoDelta timeout = deadline - MonoTime::Now();
if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
return Status::TimedOut(Substitute("received $0 of $1 requested bytes",
tot_read, amt));
}
RETURN_NOT_OK(SetRecvTimeout(timeout));
Status s = Recv(buf, num_to_read, &inc_num_read);
tot_read += inc_num_read;
buf += inc_num_read;
*nread = tot_read;
if (PREDICT_FALSE(!s.ok())) {
// Continue silently when the syscall is interrupted.
if (s.posix_code() == EINTR) {
continue;
}
if (s.posix_code() == EAGAIN) {
return Status::TimedOut(Substitute("received $0 of $1 requested bytes",
tot_read, amt));
}
return s.CloneAndPrepend("BlockingRecv error");
}
if (PREDICT_FALSE(inc_num_read == 0)) {
// EOF.
break;
}
}
if (PREDICT_FALSE(tot_read < amt)) {
return Status::IOError("Read zero bytes on a blocking Recv() call",
StringPrintf("Transferred %zu of %zu bytes", tot_read, amt));
}
return Status::OK();
}
#if defined(__linux__)
Status Socket::GetStats(SocketStatsPB* pb) const {
DCHECK_GE(fd_, 0);
#define HAS_FIELD(field_name) \
(len >= offsetof(tcp_info, field_name) + sizeof(ti.field_name))
// Fetch TCP_INFO statistics from the kernel.
tcp_info ti = {};
socklen_t len = sizeof(ti);
if (int rc = getsockopt(fd_, IPPROTO_TCP, TCP_INFO, &ti, &len); rc == 0) {
if (!HAS_FIELD(tcpi_total_retrans)) {
// All the fields up through tcpi_total_retrans were present since very old
// kernel versions, beyond our minimal supported. So, we can just bail if we
// don't get sufficient data back.
return Status::NotSupported("bad length returned for TCP_INFO");
}
pb->set_rtt(ti.tcpi_rtt);
pb->set_rttvar(ti.tcpi_rttvar);
pb->set_snd_cwnd(ti.tcpi_snd_cwnd);
pb->set_total_retrans(ti.tcpi_total_retrans);
// The following fields were added later in kernel development history.
// In RHEL6 they were backported starting in 6.8. Even though they were
// backported all together as a group, we'll just be safe and check for
// each individually.
if (HAS_FIELD(tcpi_pacing_rate)) {
pb->set_pacing_rate(ti.tcpi_pacing_rate);
}
if (HAS_FIELD(tcpi_max_pacing_rate)) {
pb->set_max_pacing_rate(ti.tcpi_max_pacing_rate);
}
if (HAS_FIELD(tcpi_bytes_acked)) {
pb->set_bytes_acked(ti.tcpi_bytes_acked);
}
if (HAS_FIELD(tcpi_bytes_received)) {
pb->set_bytes_received(ti.tcpi_bytes_received);
}
if (HAS_FIELD(tcpi_segs_out)) {
pb->set_segs_out(ti.tcpi_segs_out);
}
if (HAS_FIELD(tcpi_segs_in)) {
pb->set_segs_in(ti.tcpi_segs_in);
}
// Calculate sender bandwidth based on the same logic used by the 'ss' utility.
if (ti.tcpi_rtt > 0 && ti.tcpi_snd_mss && ti.tcpi_snd_cwnd) {
// Units:
// rtt = usec
// cwnd = number of MSS-size packets
// mss = bytes / packet
//
// Dimensional analysis:
// packets * bytes/packet * usecs/sec / usec -> bytes/sec
static constexpr int kUsecsPerSec = 1000000;
pb->set_send_bytes_per_sec(static_cast<int64_t>(ti.tcpi_snd_cwnd) *
ti.tcpi_snd_mss * kUsecsPerSec / ti.tcpi_rtt);
}
}
#undef HAS_FIELD
// Fetch the queue sizes.
int queue_len = 0;
if (int rc = ioctl(fd_, TIOCOUTQ, &queue_len); rc == 0) {
pb->set_send_queue_bytes(queue_len);
}
if (int rc = ioctl(fd_, FIONREAD, &queue_len); rc == 0) {
pb->set_receive_queue_bytes(queue_len);
}
return Status::OK();
}
#endif // #if defined(__linux__) ...
Status Socket::GetTransportDetails(TransportDetailsPB* pb) const {
DCHECK(pb);
DCHECK_GE(fd_, 0);
int32_t max_seg_size = 0;
socklen_t optlen = sizeof(max_seg_size);
int ret = ::getsockopt(fd_, IPPROTO_TCP, TCP_MAXSEG, &max_seg_size, &optlen);
if (ret) {
int err = errno;
return Status::NetworkError(
"getsockopt(TCP_MAXSEG) failed", ErrnoToString(err), err);
}
pb->mutable_tcp()->set_max_segment_size(max_seg_size);
return Status::OK();
}
Status Socket::SetTimeout(int opt, const char* optname, const MonoDelta& timeout) {
if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) {
return Status::InvalidArgument("Timeout specified as negative to SetTimeout",
timeout.ToString());
}
struct timeval tv;
timeout.ToTimeVal(&tv);
RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, opt, tv),
Substitute("failed to set socket option $0 to $1",
optname, timeout.ToString()));
return Status::OK();
}
Status Socket::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) {
static const char* const err_string = "failed to set socket option $0 to $1";
DCHECK_GT(idle_time_s, 0);
#if defined(__linux__)
RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPIDLE, idle_time_s),
Substitute(err_string, "TCP_KEEPIDLE", idle_time_s));
#else
RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPALIVE, idle_time_s),
Substitute(err_string, "TCP_KEEPALIVE", idle_time_s));
#endif
DCHECK_GT(retry_time_s, 0);
RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPINTVL, retry_time_s),
Substitute(err_string, "TCP_KEEPINTVL", retry_time_s));
DCHECK_GT(num_retries, 0);
RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPCNT, num_retries),
Substitute(err_string, "TCP_KEEPCNT", num_retries));
RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_KEEPALIVE, 1),
"failed to enable TCP KeepAlive socket option");
return Status::OK();
}
template<typename T>
Status Socket::SetSockOpt(int level, int option, const T& value) {
if (::setsockopt(fd_, level, option, &value, sizeof(T)) == -1) {
int err = errno;
return Status::NetworkError(ErrnoToString(err), Slice(), err);
}
return Status::OK();
}
} // namespace kudu