blob: fcb38da36d0fc7fae23657bae5544efa87e04fda [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module thrift.transport.socket;
import core.stdc.errno: ECONNRESET;
import core.thread : Thread;
import core.time : dur, Duration;
import std.array : empty;
import std.conv : text, to;
import std.exception : enforce;
import std.socket;
import thrift.base;
import thrift.transport.base;
import thrift.internal.socket;
/**
* Common parts of a socket TTransport implementation, regardless of how the
* actual I/O is performed (sync/async).
*/
abstract class TSocketBase : TBaseTransport {
/**
* Constructor that takes an already created, connected (!) socket.
*
* Params:
* socket = Already created, connected socket object.
*/
this(Socket socket) {
socket_ = socket;
setSocketOpts();
}
/**
* Creates a new unconnected socket that will connect to the given host
* on the given port.
*
* Params:
* host = Remote host.
* port = Remote port.
*/
this(string host, ushort port) {
host_ = host;
port_ = port;
}
/**
* Checks whether the socket is connected.
*/
override bool isOpen() @property {
return socket_ !is null;
}
/**
* Writes as much data to the socket as there can be in a single OS call.
*
* Params:
* buf = Data to write.
*
* Returns: The actual number of bytes written. Never more than buf.length.
*/
abstract size_t writeSome(in ubyte[] buf) out (written) {
// DMD @@BUG@@: Enabling this e.g. fails the contract in the
// async_test_server, because buf.length evaluates to 0 here, even though
// in the method body it correctly is 27 (equal to the return value).
version (none) assert(written <= buf.length, text("Implementation wrote " ~
"more data than requested to?! (", written, " vs. ", buf.length, ")"));
} body {
assert(0, "DMD bug? – Why would contracts work for interfaces, but not " ~
"for abstract methods? " ~
"(Error: function […] in and out contracts require function body");
}
/**
* Returns the actual address of the peer the socket is connected to.
*
* In contrast, the host and port properties contain the address used to
* establish the connection, and are not updated after the connection.
*
* The socket must be open when calling this.
*/
Address getPeerAddress() {
enforce(isOpen, new TTransportException("Cannot get peer host for " ~
"closed socket.", TTransportException.Type.NOT_OPEN));
if (!peerAddress_) {
peerAddress_ = socket_.remoteAddress();
assert(peerAddress_);
}
return peerAddress_;
}
/**
* The host the socket is connected to or will connect to. Null if an
* already connected socket was used to construct the object.
*/
string host() const @property {
return host_;
}
/**
* The port the socket is connected to or will connect to. Zero if an
* already connected socket was used to construct the object.
*/
ushort port() const @property {
return port_;
}
/// The socket send timeout.
Duration sendTimeout() const @property {
return sendTimeout_;
}
/// Ditto
void sendTimeout(Duration value) @property {
sendTimeout_ = value;
}
/// The socket receiving timeout. Values smaller than 500 ms are not
/// supported on Windows.
Duration recvTimeout() const @property {
return recvTimeout_;
}
/// Ditto
void recvTimeout(Duration value) @property {
recvTimeout_ = value;
}
/**
* Returns the OS handle of the underlying socket.
*
* Should not usually be used directly, but access to it can be necessary
* to interface with C libraries.
*/
typeof(socket_.handle()) socketHandle() @property {
return socket_.handle();
}
protected:
/**
* Sets the needed socket options.
*/
void setSocketOpts() {
try {
alias SocketOptionLevel.SOCKET lvlSock;
Linger l;
l.on = 0;
l.time = 0;
socket_.setOption(lvlSock, SocketOption.LINGER, l);
} catch (SocketException e) {
logError("Could not set socket option: %s", e);
}
// Just try to disable Nagle's algorithm – this will fail if we are passed
// in a non-TCP socket via the Socket-accepting constructor.
try {
socket_.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true);
} catch (SocketException e) {}
}
/// Remote host.
string host_;
/// Remote port.
ushort port_;
/// Timeout for sending.
Duration sendTimeout_;
/// Timeout for receiving.
Duration recvTimeout_;
/// Cached peer address.
Address peerAddress_;
/// Cached peer host name.
string peerHost_;
/// Cached peer port.
ushort peerPort_;
/// Wrapped socket object.
Socket socket_;
}
/**
* Socket implementation of the TTransport interface.
*
* Due to the limitations of std.socket, currently only TCP/IP sockets are
* supported (i.e. Unix domain sockets are not).
*/
class TSocket : TSocketBase {
///
this(Socket socket) {
super(socket);
}
///
this(string host, ushort port) {
super(host, port);
}
/**
* Connects the socket.
*/
override void open() {
if (isOpen) return;
enforce(!host_.empty, new TTransportException(
"Cannot open socket to null host.", TTransportException.Type.NOT_OPEN));
enforce(port_ != 0, new TTransportException(
"Cannot open socket to port zero.", TTransportException.Type.NOT_OPEN));
Address[] addrs;
try {
addrs = getAddress(host_, port_);
} catch (SocketException e) {
throw new TTransportException("Could not resolve given host string.",
TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
}
Exception[] errors;
foreach (addr; addrs) {
try {
socket_ = new TcpSocket(addr.addressFamily);
setSocketOpts();
socket_.connect(addr);
break;
} catch (SocketException e) {
errors ~= e;
}
}
if (errors.length == addrs.length) {
socket_ = null;
// Need to throw a TTransportException to abide the TTransport API.
import std.algorithm, std.range;
throw new TTransportException(
text("Failed to connect to ", host_, ":", port_, "."),
TTransportException.Type.NOT_OPEN,
__FILE__, __LINE__,
new TCompoundOperationException(
text(
"All addresses tried failed (",
joiner(map!q{text(a[0], `: "`, a[1].msg, `"`)}(zip(addrs, errors)), ", "),
")."
),
errors
)
);
}
}
/**
* Closes the socket.
*/
override void close() {
if (!isOpen) return;
socket_.close();
socket_ = null;
}
override bool peek() {
if (!isOpen) return false;
ubyte buf;
auto r = socket_.receive((&buf)[0 .. 1], SocketFlags.PEEK);
if (r == -1) {
auto lastErrno = getSocketErrno();
static if (connresetOnPeerShutdown) {
if (lastErrno == ECONNRESET) {
close();
return false;
}
}
throw new TTransportException("Peeking into socket failed: " ~
socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
}
return (r > 0);
}
override size_t read(ubyte[] buf) {
enforce(isOpen, new TTransportException(
"Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
typeof(getSocketErrno()) lastErrno;
ushort tries;
while (tries++ <= maxRecvRetries_) {
auto r = socket_.receive(cast(void[])buf);
// If recv went fine, immediately return.
if (r >= 0) return r;
// Something went wrong, find out how to handle it.
lastErrno = getSocketErrno();
if (lastErrno == INTERRUPTED_ERRNO) {
// If the syscall was interrupted, just try again.
continue;
}
static if (connresetOnPeerShutdown) {
// See top comment.
if (lastErrno == ECONNRESET) {
return 0;
}
}
// Not an error which is handled in a special way, just leave the loop.
break;
}
if (isSocketCloseErrno(lastErrno)) {
close();
throw new TTransportException("Receiving failed, closing socket: " ~
socketErrnoString(lastErrno), TTransportException.Type.NOT_OPEN);
} else if (lastErrno == TIMEOUT_ERRNO) {
throw new TTransportException(TTransportException.Type.TIMED_OUT);
} else {
throw new TTransportException("Receiving from socket failed: " ~
socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
}
}
override void write(in ubyte[] buf) {
size_t sent;
while (sent < buf.length) {
auto b = writeSome(buf[sent .. $]);
if (b == 0) {
// This should only happen if the timeout set with SO_SNDTIMEO expired.
throw new TTransportException("send() timeout expired.",
TTransportException.Type.TIMED_OUT);
}
sent += b;
}
assert(sent == buf.length);
}
override size_t writeSome(in ubyte[] buf) {
enforce(isOpen, new TTransportException(
"Cannot write if file is not open.", TTransportException.Type.NOT_OPEN));
auto r = socket_.send(buf);
// Everything went well, just return the number of bytes written.
if (r > 0) return r;
// Handle error conditions.
if (r < 0) {
auto lastErrno = getSocketErrno();
if (lastErrno == WOULD_BLOCK_ERRNO) {
// Not an exceptional error per se – even with blocking sockets,
// EAGAIN apparently is returned sometimes on out-of-resource
// conditions (see the C++ implementation for details). Also, this
// allows using TSocket with non-blocking sockets e.g. in
// TNonblockingServer.
return 0;
}
auto type = TTransportException.Type.UNKNOWN;
if (isSocketCloseErrno(lastErrno)) {
type = TTransportException.Type.NOT_OPEN;
close();
}
throw new TTransportException("Sending to socket failed: " ~
socketErrnoString(lastErrno), type);
}
// send() should never return 0.
throw new TTransportException("Sending to socket failed (0 bytes written).",
TTransportException.Type.UNKNOWN);
}
override void sendTimeout(Duration value) @property {
super.sendTimeout(value);
setTimeout(SocketOption.SNDTIMEO, value);
}
override void recvTimeout(Duration value) @property {
super.recvTimeout(value);
setTimeout(SocketOption.RCVTIMEO, value);
}
/**
* Maximum number of retries for receiving from socket on read() in case of
* EAGAIN/EINTR.
*/
ushort maxRecvRetries() @property const {
return maxRecvRetries_;
}
/// Ditto
void maxRecvRetries(ushort value) @property {
maxRecvRetries_ = value;
}
/// Ditto
enum DEFAULT_MAX_RECV_RETRIES = 5;
protected:
override void setSocketOpts() {
super.setSocketOpts();
setTimeout(SocketOption.SNDTIMEO, sendTimeout_);
setTimeout(SocketOption.RCVTIMEO, recvTimeout_);
}
void setTimeout(SocketOption type, Duration value) {
assert(type == SocketOption.SNDTIMEO || type == SocketOption.RCVTIMEO);
version (Win32) {
if (value > dur!"hnsecs"(0) && value < dur!"msecs"(500)) {
logError(
"Socket %s timeout of %s ms might be raised to 500 ms on Windows.",
(type == SocketOption.SNDTIMEO) ? "send" : "receive",
value.total!"msecs"
);
}
}
if (socket_) {
try {
socket_.setOption(SocketOptionLevel.SOCKET, type, value);
} catch (SocketException e) {
throw new TTransportException(
"Could not set timeout.",
TTransportException.Type.UNKNOWN,
__FILE__,
__LINE__,
e
);
}
}
}
/// Maximum number of recv() retries.
ushort maxRecvRetries_ = DEFAULT_MAX_RECV_RETRIES;
}