blob: f2e296dcac732cf19172ecaf37acabb0d9109ff5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "receiver/hrpc.h"
#include "util/log.h"
#include "util/string.h"
#include "util/time.h"
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#if defined(__OpenBSD__)
#include <sys/types.h>
#define be16toh(x) betoh16(x)
#define be32toh(x) betoh32(x)
#define be64toh(x) betoh64(x)
#elif defined(__NetBSD__) || defined(__FreeBSD__)
#include <sys/endian.h>
#else
#include <endian.h>
#endif
/**
* @file hrpc.c
*
* Implements sending messages via HRPC.
*/
#define HRPC_MAGIC 0x43525448U
#define MAX_HRPC_ERROR_LENGTH (4 * 1024 * 1024)
#define MAX_HRPC_BODY_LENGTH (64 * 1024 * 1024)
#define DEFAULT_HTRACED_HRPC_PORT 9075
#define ADDR_STR_MAX (2 + INET6_ADDRSTRLEN + sizeof(":65536"))
struct hrpc_client {
/**
* The HTrace log object.
*/
struct htrace_log *lg;
/**
* The tcp write timeout in milliseconds.
*/
uint64_t write_timeo_ms;
/**
* The tcp read timeout in milliseconds.
*/
uint64_t read_timeo_ms;
/**
* The hostname or IP address. Malloced.
*/
char *host;
/**
* The port.
*/
int port;
/**
* The host:port string. Malloced.
*/
char *endpoint;
/**
* Socket of current open connection, or -1 if there is no currently open
* connection.
*/
int sock;
/**
* The sequence number on the connection.
*/
uint64_t seq;
/**
* The remote IP address.
*/
char addr_str[ADDR_STR_MAX];
};
struct hrpc_req_header {
uint32_t magic;
uint32_t method_id;
uint64_t seq;
uint32_t length;
} __attribute__((packed,aligned(4)));
struct hrpc_resp_header {
uint64_t seq;
uint32_t method_id;
uint32_t err_length;
uint32_t length;
} __attribute__((packed,aligned(4)));
static int hrpc_client_open_conn(struct hrpc_client *hcli);
static int try_connect(struct hrpc_client *hcli, struct addrinfo *p);
static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
int sock);
static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
const void *buf1, size_t buf1_len,
const void *buf2, size_t buf2_len, uint64_t *seq);
static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
uint64_t seq, char **err, void **resp,
size_t *resp_len);
struct hrpc_client *hrpc_client_alloc(struct htrace_log *lg,
uint64_t write_timeo_ms, uint64_t read_timeo_ms,
const char *endpoint)
{
struct hrpc_client *hcli;
hcli = calloc(1, sizeof(*hcli));
if (!hcli) {
htrace_log(lg, "Failed to allocate memory for the HRPC client.\n");
goto error;
}
hcli->lg = lg;
hcli->write_timeo_ms = write_timeo_ms;
hcli->read_timeo_ms = read_timeo_ms;
hcli->sock = -1;
hcli->endpoint = strdup(endpoint);
if (!hcli->endpoint) {
htrace_log(lg, "Failed to allocate memory for the endpoint string.\n");
goto error;
}
if (!parse_endpoint(lg, endpoint, DEFAULT_HTRACED_HRPC_PORT,
&hcli->host, &hcli->port)) {
goto error;
}
return hcli;
error:
if (hcli) {
free(hcli->host);
free(hcli->endpoint);
free(hcli);
}
return NULL;
}
void hrpc_client_free(struct hrpc_client *hcli)
{
if (!hcli) {
return;
}
if (hcli->sock >= 0) {
close(hcli->sock);
hcli->sock = -1;
}
free(hcli->host);
free(hcli->endpoint);
free(hcli);
}
int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
const void *buf1, size_t buf1_len,
const void *buf2, size_t buf2_len,
char **err, void **resp, size_t *resp_len)
{
uint64_t seq;
if (hcli->sock < 0) {
if (!hrpc_client_open_conn(hcli)) {
goto error;
}
htrace_log(hcli->lg, "hrpc_client_call: successfully opened connection\n");
} else {
htrace_log(hcli->lg, "hrpc_client_call: connection was already open\n");
}
if (!hrpc_client_send_req(hcli, method_id,
buf1, buf1_len, buf2, buf2_len, &seq)) {
goto error;
}
htrace_log(hcli->lg, "hrpc_client_call: waiting for response\n");
if (!hrpc_client_rcv_resp(hcli, method_id, seq, err, resp, resp_len)) {
goto error;
}
return 1;
error:
if (hcli->sock >= 0) {
close(hcli->sock);
hcli->sock = -1;
}
return 0;
}
static int hrpc_client_open_conn(struct hrpc_client *hcli)
{
int res, sock = -1;
struct addrinfo hints, *list, *info;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
res = getaddrinfo(hcli->host, NULL, &hints, &list);
if (res) {
htrace_log(hcli->lg, "hrpc_client_open_conn: "
"getaddrinfo(%s) error %d: %s\n",
hcli->host, res, gai_strerror(res));
return 0;
}
for (info = list; info; info = info->ai_next) {
sock = try_connect(hcli, info);
if (sock >= 0) {
break;
}
}
freeaddrinfo(list);
if (!info) {
htrace_log(hcli->lg, "hrpc_client_open_conn(%s): failed to connect.\n",
hcli->host);
return 0;
}
hcli->sock = sock;
return 1;
}
static int set_port(struct hrpc_client *hcli, struct sockaddr *addr,
int ai_family)
{
switch (ai_family) {
case AF_INET: {
struct sockaddr_in *in4 = (struct sockaddr_in*)addr;
in4->sin_port = htons(hcli->port);
return 1;
}
case AF_INET6: {
struct sockaddr_in6 *in6 = (struct sockaddr_in6*)addr;
in6->sin6_port = htons(hcli->port);
return 1;
}
default:
htrace_log(hcli->lg, "try_connect(%s): set_port %d failed: unknown "
"ai_family %d\n", hcli->addr_str, hcli->port, ai_family);
return 0;
}
}
static int try_connect(struct hrpc_client *hcli, struct addrinfo *p)
{
int e, sock = -1;
char ip[INET6_ADDRSTRLEN];
e = getnameinfo(p->ai_addr, p->ai_addrlen,
ip, sizeof(ip), 0, 0, NI_NUMERICHOST);
if (e) {
htrace_log(hcli->lg, "try_connect: getnameinfo failed. error "
"%d: %s\n", e, gai_strerror(e));
return 0;
}
snprintf(hcli->addr_str, ADDR_STR_MAX, "%s:%d", ip, hcli->port);
if (!set_port(hcli, p->ai_addr, p->ai_family)) {
goto error;
}
sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (sock < 0) {
e = errno;
htrace_log(hcli->lg, "try_connect(%s): failed to create new "
"socket: error %d (%s)\n", hcli->addr_str, e, terror(e));
goto error;
}
if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) {
e = errno;
htrace_log(hcli->lg, "try_connect(%s): fcntl(FD_CLOEXEC) "
"failed: error %d (%s)\n", hcli->addr_str, e, terror(e));
goto error;
}
if (!set_socket_read_and_write_timeout(hcli, sock)) {
goto error;
}
if (connect(sock, p->ai_addr, p->ai_addrlen) < 0) {
e = errno;
htrace_log(hcli->lg, "try_connect(%s): connect "
"failed: error %d (%s)\n", hcli->addr_str, e, terror(e));
goto error;
}
return sock;
error:
if (sock >= 0) {
close(sock);
}
return -1;
}
static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
int sock)
{
struct timeval tv;
ms_to_timeval(hcli->read_timeo_ms, &tv);
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
int e = errno;
htrace_log(hcli->lg, "setsockopt(%d, SO_RCVTIMEO, %"PRId64") failed: "
"error %d (%s)\n", sock, hcli->read_timeo_ms, e, terror(e));
return 0;
}
ms_to_timeval(hcli->write_timeo_ms, &tv);
if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
int e = errno;
htrace_log(hcli->lg, "setsockopt(%d, SO_SNDTIMEO, %"PRId64") failed: "
"error %d (%s)\n", sock, hcli->write_timeo_ms, e, terror(e));
return 0;
}
return 1;
}
static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
const void *buf1, size_t buf1_len,
const void *buf2, size_t buf2_len, uint64_t *seq)
{
// We use writev (scatter/gather I/O) here in order to avoid sending
// multiple packets when TCP_NODELAY is turned on.
struct hrpc_req_header hdr;
struct iovec iov[3];
hdr.magic = htole64(HRPC_MAGIC);
hdr.method_id = htole32(method_id);
*seq = hcli->seq++;
hdr.seq = htole64(*seq);
hdr.length = htole32(buf1_len + buf2_len);
iov[0].iov_base = &hdr;
iov[0].iov_len = sizeof(hdr);
iov[1].iov_base = (void*)buf1;
iov[1].iov_len = buf1_len;
iov[2].iov_base = (void*)buf2;
iov[2].iov_len = buf2_len;
while (1) {
ssize_t res = writev(hcli->sock, iov, sizeof(iov)/sizeof(iov[0]));
int i;
if (res < 0) {
int e = errno;
if (e == EINTR) {
continue;
}
htrace_log(hcli->lg, "hrpc_client_send_req: writev error: "
"error %d: %s\n", e, terror(e));
return 0;
}
i = 0;
while (res > 0) {
if (iov[i].iov_len < res) {
res -= iov[i].iov_len;
iov[i].iov_len = 0;
} else {
iov[i].iov_len -= res;
res = 0;
}
if (++i >= (sizeof(iov)/sizeof(iov[0]))) {
if (res == 0) {
return 1;
}
htrace_log(hcli->lg, "hrpc_client_send_req: unexpectedly "
"large writev return.\n");
return 0;
}
}
}
}
static int safe_read(int fd, void *buf, size_t amt)
{
uint8_t *b = buf;
int e, res, nread = 0;
while (1) {
res = read(fd, b + nread, amt - nread);
if (res <= 0) {
if (res == 0) {
return nread;
}
e = errno;
if (e == EINTR) {
continue;
}
return -e;
}
nread += res;
if (nread >= amt) {
return nread;
}
}
}
static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
uint64_t seq, char **err_out, void **resp_out,
size_t *resp_len)
{
int res;
struct hrpc_resp_header hdr;
uint64_t resp_seq;
uint32_t resp_method_id, err_length, length;
char *err = NULL, *resp = NULL;
res = safe_read(hcli->sock, &hdr, sizeof(hdr));
if (res < 0) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
"response header: %d (%s)\n", hcli->addr_str, -res,
terror(-res));
goto error;
}
if (res != sizeof(hdr)) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
"reading response header.\n", hcli->addr_str);
goto error;
}
resp_seq = le64toh(hdr.seq);
if (resp_seq != seq) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected sequence "
"ID 0x%"PRIx64", but got sequence ID 0x%"PRId64".\n",
hcli->addr_str, seq, resp_seq);
goto error;
}
resp_method_id = le32toh(hdr.method_id);
if (resp_method_id != method_id) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected method "
"ID 0x%"PRIx32", but got method ID 0x%"PRId32".\n",
hcli->addr_str, method_id, resp_method_id);
goto error;
}
err_length = le32toh(hdr.err_length);
if (err_length > MAX_HRPC_ERROR_LENGTH) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error length was "
"%"PRId32", but the maximum error length is %"PRId32".",
hcli->addr_str, err_length, MAX_HRPC_ERROR_LENGTH);
goto error;
}
if (err_length > 0) {
err = malloc(err_length + 1);
res = safe_read(hcli->sock, err, err_length);
if (res < 0) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
"error string: %d (%s)\n", hcli->addr_str, -res,
terror(-res));
goto error;
}
if (res != err_length) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
"reading error string.\n", hcli->addr_str);
goto error;
}
err[err_length] = '\0';
}
length = le32toh(hdr.length);
if (length > MAX_HRPC_BODY_LENGTH) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): body length was "
"%"PRId32", but the maximum body length is %"PRId32".",
hcli->addr_str, length, MAX_HRPC_BODY_LENGTH);
goto error;
}
if (length > 0) {
resp = malloc(length);
res = safe_read(hcli->sock, resp, length);
if (res < 0) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
"body: %d (%s)\n", hcli->addr_str, -res, terror(-res));
goto error;
}
if (res != length) {
htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
"reading body.\n", hcli->addr_str);
goto error;
}
}
*err_out = err;
*resp_out = resp;
*resp_len = length;
return 1;
error:
free(err);
free(resp);
*err_out = NULL;
*resp_out = NULL;
*resp_len = 0;
return 0;
}
const char *hrpc_client_get_endpoint(struct hrpc_client *hcli)
{
return hcli->endpoint;
}
// vim:ts=4:sw=4:et