blob: 3a2a0917b71560c3e59cf224bf984eb93dd11a08 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/conf.h"
#include "core/htrace.h"
#include "core/htracer.h"
#include "core/span.h"
#include "receiver/hrpc.h"
#include "receiver/receiver.h"
#include "test/test.h"
#include "util/cmp.h"
#include "util/cmp_util.h"
#include "util/log.h"
#include "util/string.h"
#include "util/time.h"
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/**
* @file htraced.c
*
* The htraced span receiver which implements sending spans on to the htraced
* daemon.
*
* We send spans via the HRPC protocol. HRPC consists of a simple fixed-size
* header specifying a magic number, the length, and a message type, followed by
* data in the msgpack serialization format. The message type will determine
* the type of msgpack data. Messages bodies are sent as maps, essentially
* making all fields optional and allowing the protocol to evolve. See hrpc.c
* and rpc.go for the implementation of HRPC.
*
* Spans are serialized immediately when they are added to the buffer. This is
* one of the advantages of msgpack-- it has a good streaming interface. We do
* not need to keep around the span objects after htraced_rcv_add_span.
*
* The htraced receiver keeps two equally sized buffers around internally.
* While we are writing spans to one buffer, we can be sending the data from the
* other buffer over the wire. The intention here is to avoid copies as much as
* possible. In general, what we send over the wire is exactly what is in the
* buffer, except that we have to add a short "prequel" to it containing the
* other WriteSpansReq fields.
*
* Note that we may change the serialization in the future if we discover better
* alternatives. Sending spans over HTTP as JSON will always be supported
* as a fallback.
*/
/**
* The maximum length of the message we will send to the server.
* This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go.
*/
#define MAX_HRPC_LEN (32ULL * 1024ULL * 1024ULL)
/**
* The maximum length of the prequel in a WriteSpans message.
*/
#define MAX_WRITESPANS_PREQUEL_LEN 1024
/**
* The maximum length of the span data in a WriteSpans message.
*/
#define MAX_SPAN_DATA_LEN (MAX_HRPC_LEN - MAX_WRITESPANS_PREQUEL_LEN)
/**
* The minimum total buffer size to allow.
*
* This should allow at least a few spans to be buffered.
*/
#define HTRACED_MIN_BUFFER_SIZE (4ULL * 1024ULL * 1024ULL)
/**
* The maximum total buffer size to allow.
*/
#define HTRACED_MAX_BUFFER_SIZE (2ULL * MAX_SPAN_DATA_LEN)
/**
* The minimum number of milliseconds to allow for flush_interval_ms.
*/
#define HTRACED_FLUSH_INTERVAL_MS_MIN 30000LL
/**
* The maximum number of milliseconds to allow for flush_interval_ms.
* This is mainly to avoid overflow.
*/
#define HTRACED_FLUSH_INTERVAL_MS_MAX 86400000LL
/**
* The minimum number of milliseconds to allow for tcp write timeouts.
*/
#define HTRACED_WRITE_TIMEO_MS_MIN 50LL
/**
* The minimum number of milliseconds to allow for tcp read timeouts.
*/
#define HTRACED_READ_TIMEO_MS_MIN 50LL
/**
* The maximum number of times we will try to add a span to the circular buffer
* before giving up.
*/
#define HTRACED_MAX_ADD_TRIES 3
/**
* The maximum number of times to try to send some spans to the htraced daemon
* before giving up.
*/
#define HTRACED_MAX_SEND_TRIES 3
/**
* The number of milliseconds to sleep after failing to send some spans to the
* htraced daemon.
*/
#define HTRACED_SEND_RETRY_SLEEP_MS 5000
/**
* The number of buffers used by htraced.
*/
#define HTRACED_NUM_BUFS 2
/**
* An HTraced send buffer.
*/
struct htraced_sbuf {
/**
* Current offset within the buffer.
*/
uint64_t off;
/**
* Length of the buffer.
*/
uint64_t len;
/**
* The number of spans in the buffer.
*/
uint64_t num_spans;
/**
* The buffer data. This field actually has size 'len,' not size 1.
*/
char buf[1];
};
/*
* A span receiver that writes spans to htraced.
*/
struct htraced_rcv {
struct htrace_rcv base;
/**
* Nonzero if the receiver should shut down.
*/
int shutdown;
/**
* The htracer object associated with this receciver.
*/
struct htracer *tracer;
/**
* Buffered span data becomes eligible to be sent even if there isn't much
* in the buffer after this timeout elapses.
*/
uint64_t flush_interval_ms;
/**
* The maximum number of bytes we will buffer before waking the sending
* thread. We may sometimes send slightly more than this amount if the
* thread takes a while to wake up.
*/
uint64_t send_threshold;
/**
* The HRPC client.
*/
struct hrpc_client *hcli;
/**
* The monotonic-clock time at which we last did a send operation.
*/
uint64_t last_send_ms;
/**
* The index of the active buffer.
*/
int active_buf;
/**
* The two send buffers.
*/
struct htraced_sbuf *sbuf[HTRACED_NUM_BUFS];
/**
* Lock protecting the buffers from concurrent writes.
*/
pthread_mutex_t lock;
/**
* Condition variable used to wake up the background thread.
*/
pthread_cond_t bg_cond;
/**
* Condition variable used to wake up flushing threads.
*/
pthread_cond_t flush_cond;
/**
* Background transmitter thread.
*/
pthread_t xmit_thread;
};
void* run_htraced_xmit_manager(void *data);
static int should_xmit(struct htraced_rcv *rcv, uint64_t now);
static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now);
static int htraced_sbufs_empty(struct htraced_rcv *rcv)
{
int i;
for (i = 0; i < HTRACED_NUM_BUFS; i++) {
if (rcv->sbuf[i]->off) {
return 0;
}
}
return 1;
}
static struct htraced_sbuf *htraced_sbuf_alloc(uint64_t len)
{
struct htraced_sbuf *sbuf;
// The final field of the htraced_sbuf structure is declared as having size
// 1, but really it has size 'len'. This avoids a pointer dereference when
// accessing data in the sbuf.
sbuf = malloc(offsetof(struct htraced_sbuf, buf) + len);
if (!sbuf) {
return NULL;
}
sbuf->off = 0;
sbuf->len = len;
sbuf->num_spans = 0;
return sbuf;
}
static void htraced_sbuf_free(struct htraced_sbuf *sbuf)
{
free(sbuf);
}
static uint64_t htraced_sbuf_remaining(const struct htraced_sbuf *sbuf)
{
return sbuf->len - sbuf->off;
}
static uint64_t htraced_get_bounded_u64(struct htrace_log *lg,
const struct htrace_conf *cnf, const char *prop,
uint64_t min, uint64_t max)
{
uint64_t val = htrace_conf_get_u64(lg, cnf, prop);
if (val < min) {
htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
". Using minimum value of %"PRId64 " instead.\n",
prop, val, min);
return min;
} else if (val > max) {
htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
". Using maximum value of %"PRId64 " instead.\n",
prop, val, max);
return max;
}
return val;
}
static double htraced_get_bounded_double(struct htrace_log *lg,
const struct htrace_conf *cnf, const char *prop,
double min, double max)
{
double val = htrace_conf_get_double(lg, cnf, prop);
if (val < min) {
htrace_log(lg, "htraced_rcv_create: can't set %s to %g"
". Using minimum value of %g instead.\n",
prop, val, min);
return min;
} else if (val > max) {
htrace_log(lg, "htraced_rcv_create: can't set %s to %g"
". Using maximum value of %g instead.\n",
prop, val, max);
return max;
}
return val;
}
static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
const struct htrace_conf *conf)
{
struct htraced_rcv *rcv;
const char *endpoint;
int i, ret;
uint64_t write_timeo_ms, read_timeo_ms, buf_len;
double send_fraction;
endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
if (!endpoint) {
htrace_log(tracer->lg, "htraced_rcv_create: no value found for %s. "
"You must set this configuration key to the "
"hostname:port identifying the htraced server.\n",
HTRACED_ADDRESS_KEY);
goto error;
}
rcv = calloc(1, sizeof(*rcv));
if (!rcv) {
htrace_log(tracer->lg, "htraced_rcv_create: OOM while "
"allocating htraced_rcv.\n");
goto error;
}
rcv->base.ty = &g_htraced_rcv_ty;
rcv->shutdown = 0;
rcv->tracer = tracer;
rcv->flush_interval_ms = htraced_get_bounded_u64(tracer->lg, conf,
HTRACED_FLUSH_INTERVAL_MS_KEY, HTRACED_FLUSH_INTERVAL_MS_MIN,
HTRACED_FLUSH_INTERVAL_MS_MAX);
write_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
HTRACED_WRITE_TIMEO_MS_KEY, HTRACED_WRITE_TIMEO_MS_MIN,
0x7fffffffffffffffULL);
read_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
HTRACED_READ_TIMEO_MS_KEY, HTRACED_READ_TIMEO_MS_MIN,
0x7fffffffffffffffULL);
rcv->hcli = hrpc_client_alloc(tracer->lg, write_timeo_ms,
read_timeo_ms, endpoint);
if (!rcv->hcli) {
goto error_free_rcv;
}
buf_len = htraced_get_bounded_u64(tracer->lg, conf,
HTRACED_BUFFER_SIZE_KEY, HTRACED_MIN_BUFFER_SIZE,
HTRACED_MAX_BUFFER_SIZE) / 2;
for (i = 0; i < HTRACED_NUM_BUFS; i++) {
rcv->sbuf[i] = htraced_sbuf_alloc(buf_len);
if (!rcv->sbuf[i]) {
htrace_log(tracer->lg, "htraced_rcv_create: htraced_sbuf_alloc("
"buf_len=%"PRId64") failed: OOM.\n", buf_len);
goto error_free_bufs;
}
}
send_fraction = htraced_get_bounded_double(tracer->lg, conf,
HTRACED_BUFFER_SEND_TRIGGER_FRACTION, 0.1, 1.0);
rcv->send_threshold = buf_len * send_fraction;
if (rcv->send_threshold > buf_len) {
rcv->send_threshold = buf_len;
}
rcv->last_send_ms = monotonic_now_ms(tracer->lg);
ret = pthread_mutex_init(&rcv->lock, NULL);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: pthread_mutex_init "
"error %d: %s\n", ret, terror(ret));
goto error_free_bufs;
}
ret = pthread_cond_init(&rcv->bg_cond, NULL);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init("
"bg_cond) error %d: %s\n", ret, terror(ret));
goto error_free_lock;
}
ret = pthread_cond_init(&rcv->flush_cond, NULL);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init("
"flush_cond) error %d: %s\n", ret, terror(ret));
goto error_free_bg_cond;
}
ret = pthread_create(&rcv->xmit_thread, NULL, run_htraced_xmit_manager, rcv);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: failed to create xmit thread: "
"error %d: %s\n", ret, terror(ret));
goto error_free_flush_cond;
}
htrace_log(tracer->lg, "Initialized htraced receiver for %s"
", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64
", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64
", buf_len=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli),
rcv->flush_interval_ms, rcv->send_threshold,
write_timeo_ms, read_timeo_ms, buf_len);
return (struct htrace_rcv*)rcv;
error_free_flush_cond:
pthread_cond_destroy(&rcv->flush_cond);
error_free_bg_cond:
pthread_cond_destroy(&rcv->bg_cond);
error_free_lock:
pthread_mutex_destroy(&rcv->lock);
error_free_bufs:
for (i = 0; i < HTRACED_NUM_BUFS; i++) {
htraced_sbuf_free(rcv->sbuf[i]);
}
hrpc_client_free(rcv->hcli);
error_free_rcv:
free(rcv);
error:
return NULL;
}
void* run_htraced_xmit_manager(void *data)
{
struct htraced_rcv *rcv = data;
struct htrace_log *lg = rcv->tracer->lg;
uint64_t now, wakeup;
struct timespec wakeup_ts;
int ret;
pthread_mutex_lock(&rcv->lock);
while (1) {
now = monotonic_now_ms(lg);
while (should_xmit(rcv, now)) {
htraced_xmit(rcv, now);
}
if (rcv->shutdown) {
while (!htraced_sbufs_empty(rcv)) {
htraced_xmit(rcv, now);
}
break;
}
// Wait for one of a few things to happen:
// * Shutdown
// * The wakeup timer to elapse, leading us to check if we should send
// because of send_timeo_ms.
// * A writer to signal that we should wake up because enough bytes are
// buffered.
wakeup = now + (rcv->flush_interval_ms / 2);
ms_to_timespec(wakeup, &wakeup_ts);
ret = pthread_cond_timedwait(&rcv->bg_cond, &rcv->lock, &wakeup_ts);
if ((ret != 0) && (ret != ETIMEDOUT)) {
htrace_log(lg, "run_htraced_xmit_manager: pthread_cond_timedwait "
"error: %d (%s)\n", ret, terror(ret));
}
}
pthread_mutex_unlock(&rcv->lock);
htrace_log(lg, "run_htraced_xmit_manager: shutting down the transmission "
"manager thread.\n");
return NULL;
}
/**
* Determine if the xmit manager should send.
* This function must be called with the lock held.
*
* @param rcv The htraced receiver.
* @param now The current time in milliseconds.
*
* @return nonzero if we should send now.
*/
static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
{
uint64_t off = rcv->sbuf[rcv->active_buf]->off;
if (off > rcv->send_threshold) {
// We have buffered a lot of bytes, so let's send.
return 1;
}
if (now - rcv->last_send_ms > rcv->flush_interval_ms) {
// It's been too long since the last transmission, so let's send.
if (off > 0) {
return 1;
}
}
return 0; // Let's wait.
}
#define DEFAULT_TRID_STR "DefaultTrid"
#define DEFAULT_TRID_STR_LEN (sizeof(DEFAULT_TRID_STR) - 1)
#define NUM_SPANS_STR "NumSpans"
#define NUM_SPANS_STR_LEN (sizeof(NUM_SPANS_STR) - 1)
/**
* Write the prequel to the WriteSpans message.
*/
static int add_writespans_prequel(struct htraced_rcv *rcv,
struct htraced_sbuf *sbuf, uint8_t *prequel)
{
struct cmp_bcopy_ctx bctx;
struct cmp_ctx_s *ctx = (struct cmp_ctx_s *)&bctx;
cmp_bcopy_ctx_init(&bctx, prequel, MAX_WRITESPANS_PREQUEL_LEN);
if (!cmp_write_fixmap(ctx, 2)) {
return -1;
}
if (!cmp_write_fixstr(ctx, DEFAULT_TRID_STR, DEFAULT_TRID_STR_LEN)) {
return -1;
}
if (!cmp_write_str(ctx, rcv->tracer->trid, strlen(rcv->tracer->trid))) {
return -1;
}
if (!cmp_write_fixstr(ctx, NUM_SPANS_STR, NUM_SPANS_STR_LEN)) {
return -1;
}
if (!cmp_write_uint(ctx, sbuf->num_spans)) {
return -1;
}
return bctx.off;
}
/**
* Send all the spans which we have buffered.
*
* @param rcv The htraced receiver.
* @param sbuf The span buffer to send.
*
* @return 1 on success; 0 otherwise.
*/
static int htraced_xmit_impl(struct htraced_rcv *rcv, struct htraced_sbuf *sbuf)
{
struct htrace_log *lg = rcv->tracer->lg;
uint8_t prequel[MAX_WRITESPANS_PREQUEL_LEN];
int prequel_len, ret;
char *err = NULL, *resp = NULL;
size_t resp_len = 0;
prequel_len = add_writespans_prequel(rcv, sbuf, prequel);
if (prequel_len < 0) {
htrace_log(lg, "htrace_xmit_impl: add_writespans_prequel failed.\n");
ret = 0;
goto done;
}
ret = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
prequel, prequel_len, sbuf->buf, sbuf->off,
&err, (void**)&resp, &resp_len);
if (!ret) {
htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n");
goto done;
} else if (err) {
htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err);
ret = 0;
goto done;
}
ret = 1;
done:
free(err);
free(resp);
return ret;
}
static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
{
int tries = 0;
struct htraced_sbuf *sbuf;
// Flip to the other buffer.
sbuf = rcv->sbuf[rcv->active_buf];
rcv->active_buf = !rcv->active_buf;
// Release the lock while doing network I/O, so that we don't block threads
// adding spans.
pthread_mutex_unlock(&rcv->lock);
while (1) {
int retry, success = htraced_xmit_impl(rcv, sbuf);
if (success) {
break;
}
tries++;
retry = (tries < HTRACED_MAX_SEND_TRIES);
htrace_log(rcv->tracer->lg, "htraced_xmit(%s) failed on try %d. %s\n",
hrpc_client_get_endpoint(rcv->hcli), tries,
(retry ? "Retrying after a delay." : "Giving up."));
if (!retry) {
break;
}
}
sbuf->off = 0;
sbuf->num_spans = 0;
pthread_mutex_lock(&rcv->lock);
rcv->last_send_ms = now;
pthread_cond_broadcast(&rcv->flush_cond);
}
static void htraced_rcv_add_span(struct htrace_rcv *r,
struct htrace_span *span)
{
int tries, retry;
uint64_t rem, off;
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
struct htraced_sbuf *sbuf;
struct htrace_log *lg = rcv->tracer->lg;
struct cmp_counter_ctx cctx;
struct cmp_bcopy_ctx bctx;
uint64_t msgpack_len;
// Determine the length of the span when serialized to msgpack.
cmp_counter_ctx_init(&cctx);
if (!span_write_msgpack(span, (cmp_ctx_t*)&cctx)) {
htrace_log(lg, "htraced_rcv_add_span: span_write_msgpack failed.\n");
return;
}
msgpack_len = cctx.count;
// Try to get enough space in the current buffer.
tries = 0;
do {
pthread_mutex_lock(&rcv->lock);
sbuf = rcv->sbuf[rcv->active_buf];
rem = htraced_sbuf_remaining(sbuf);
if (rem < msgpack_len) {
pthread_cond_signal(&rcv->bg_cond);
pthread_mutex_unlock(&rcv->lock);
tries++;
retry = tries < HTRACED_MAX_ADD_TRIES;
htrace_log(lg, "htraced_rcv_add_span: not enough space in the "
"current buffer. Have %" PRId64 ", need %"
PRId64 ". %s...\n", rem, msgpack_len,
(retry ? "Retrying" : "Giving up"));
if (retry) {
pthread_yield();
continue;
}
pthread_mutex_unlock(&rcv->lock);
return;
}
} while (0);
// OK, now we have the lock, and we know that there is enough space in the
// current buffer.
off = sbuf->off;
cmp_bcopy_ctx_init(&bctx, sbuf->buf + off, msgpack_len);
bctx.base.write = cmp_bcopy_write_nocheck_fn;
span_write_msgpack(span, (cmp_ctx_t*)&bctx);
off += msgpack_len;
sbuf->off = off;
sbuf->num_spans++;
if (off > rcv->send_threshold) {
pthread_cond_signal(&rcv->bg_cond);
}
pthread_mutex_unlock(&rcv->lock);
}
static void htraced_rcv_flush(struct htrace_rcv *r)
{
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
uint64_t now;
// Note: This assumes that we only flush one buffer at once, and
// that we flush buffers in order. If we revisit those assumptions we'll
// need to change this.
// The SpanReceiver flush is only used for testing anyway.
pthread_mutex_lock(&rcv->lock);
now = monotonic_now_ms(rcv->tracer->lg);
while (1) {
if (rcv->last_send_ms >= now) {
break;
}
if (htraced_sbufs_empty(rcv)) {
break;
}
rcv->last_send_ms = 0;
pthread_cond_wait(&rcv->flush_cond, &rcv->lock);
}
pthread_mutex_unlock(&rcv->lock);
}
static void htraced_rcv_free(struct htrace_rcv *r)
{
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
struct htrace_log *lg;
int i, ret;
if (!rcv) {
return;
}
lg = rcv->tracer->lg;
htrace_log(lg, "Shutting down htraced receiver for %s\n",
hrpc_client_get_endpoint(rcv->hcli));
pthread_mutex_lock(&rcv->lock);
rcv->shutdown = 1;
pthread_cond_signal(&rcv->bg_cond);
pthread_mutex_unlock(&rcv->lock);
ret = pthread_join(rcv->xmit_thread, NULL);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_join "
"error %d: %s\n", ret, terror(ret));
}
for (i = 0; i < HTRACED_NUM_BUFS; i++) {
htraced_sbuf_free(rcv->sbuf[i]);
}
hrpc_client_free(rcv->hcli);
ret = pthread_mutex_destroy(&rcv->lock);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy "
"error %d: %s\n", ret, terror(ret));
}
ret = pthread_cond_destroy(&rcv->bg_cond);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(bg_cond) "
"error %d: %s\n", ret, terror(ret));
}
ret = pthread_cond_destroy(&rcv->flush_cond);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(flush_cond) "
"error %d: %s\n", ret, terror(ret));
}
free(rcv);
}
const struct htrace_rcv_ty g_htraced_rcv_ty = {
"htraced",
htraced_rcv_create,
htraced_rcv_add_span,
htraced_rcv_flush,
htraced_rcv_free,
};
// vim:ts=4:sw=4:et