blob: 7e4af81a2d81eee41023a8057a5c88690031ec1c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/conf.h"
#include "core/htrace.h"
#include "core/htracer.h"
#include "core/span.h"
#include "receiver/hrpc.h"
#include "receiver/receiver.h"
#include "test/test.h"
#include "util/log.h"
#include "util/string.h"
#include "util/time.h"
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/**
* @file htraced.c
*
* The htraced span receiver which implements sending spans on to htraced.
*/
/**
* The minimum buffer size to allow for the htraced circular buffer.
*
* This should hopefully allow at least a few spans to be buffered.
*/
#define HTRACED_MIN_BUFFER_SIZE (4ULL * 1024ULL * 1024ULL)
/**
* The maximum buffer size to allow for the htraced circular buffer.
* This is mainly to avoid overflow. Of course, you couldn't allocate a buffer
* anywhere near this size anyway.
*/
#define HTRACED_MAX_BUFFER_SIZE 0x7ffffffffffffffLL
/**
* The minimum number of milliseconds to allow for flush_interval_ms.
*/
#define HTRACED_FLUSH_INTERVAL_MS_MIN 30000LL
/**
* The maximum number of milliseconds to allow for flush_interval_ms.
* This is mainly to avoid overflow.
*/
#define HTRACED_FLUSH_INTERVAL_MS_MAX 86400000LL
/**
* The minimum number of milliseconds to allow for tcp write timeouts.
*/
#define HTRACED_WRITE_TIMEO_MS_MIN 50LL
/**
* The minimum number of milliseconds to allow for tcp read timeouts.
*/
#define HTRACED_READ_TIMEO_MS_MIN 50LL
/**
* The maximum size of the message we will send over the wire.
* This also sets the size of the transmission buffer.
* This constant must not be more than 2^^32 on 32-bit systems.
*/
#define HTRACED_MAX_MSG_LEN (8ULL * 1024ULL * 1024ULL)
/**
* The maximum number of times we will try to add a span to the circular buffer
* before giving up.
*/
#define HTRACED_MAX_ADD_TRIES 3
/**
* The maximum number of times to try to send some spans to the htraced daemon
* before giving up.
*/
#define HTRACED_MAX_SEND_TRIES 3
/**
* The number of milliseconds to sleep after failing to send some spans to the
* htraced daemon.
*/
#define HTRACED_SEND_RETRY_SLEEP_MS 5000
/*
* A span receiver that writes spans to htraced.
*/
struct htraced_rcv {
struct htrace_rcv base;
/**
* Nonzero if the receiver should shut down.
*/
int shutdown;
/**
* The htracer object associated with this receciver.
*/
struct htracer *tracer;
/**
* Buffered span data becomes eligible to be sent even if there isn't much
* in the buffer after this timeout elapses.
*/
uint64_t flush_interval_ms;
/**
* The maximum number of bytes we will buffer before waking the sending
* thread. We may sometimes send slightly more than this amount if the
* thread takes a while to wake up.
*/
uint64_t send_threshold;
/**
* The HRPC client.
*/
struct hrpc_client *hcli;
/**
* Length of the circular buffer.
*/
uint64_t clen;
/**
* A circular buffer containing span data.
*/
uint8_t *cbuf;
/**
* 'start' pointer of the circular buffer.
*/
uint64_t cstart;
/**
* 'end' pointer of the circular buffer.
*/
uint64_t cend;
/**
* The monotonic-clock time at which we last did a send operation.
*/
uint64_t last_send_ms;
/**
* RPC messages are copied into this buffer before being sent.
* Its length is HTRACED_MAX_MSG_LEN.
*/
uint8_t *sbuf;
/**
* Lock protecting the circular buffer from concurrent writes.
*/
pthread_mutex_t lock;
/**
* Condition variable used to wake up the background thread.
*/
pthread_cond_t cond;
/**
* Background transmitter thread.
*/
pthread_t xmit_thread;
};
void* run_htraced_xmit_manager(void *data);
static int should_xmit(struct htraced_rcv *rcv, uint64_t now);
static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now);
static uint64_t cbuf_used(const struct htraced_rcv *rcv);
static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv);
static uint64_t htraced_get_bounded_u64(struct htrace_log *lg,
const struct htrace_conf *cnf, const char *prop,
uint64_t min, uint64_t max)
{
uint64_t val = htrace_conf_get_u64(lg, cnf, prop);
if (val < min) {
htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
". Using minimum value of %"PRId64 " instead.\n",
prop, val, min);
return min;
} else if (val > max) {
htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
". Using maximum value of %"PRId64 " instead.\n",
prop, val, max);
return max;
}
return val;
}
static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
const struct htrace_conf *conf)
{
struct htraced_rcv *rcv;
const char *endpoint;
int ret;
uint64_t write_timeo_ms, read_timeo_ms;
endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
if (!endpoint) {
htrace_log(tracer->lg, "htraced_rcv_create: no value found for %s. "
"You must set this configuration key to the "
"hostname:port identifying the htraced server.\n",
HTRACED_ADDRESS_KEY);
goto error;
}
rcv = malloc(sizeof(*rcv));
if (!rcv) {
htrace_log(tracer->lg, "htraced_rcv_create: OOM while "
"allocating htraced_rcv.\n");
goto error;
}
rcv->base.ty = &g_htraced_rcv_ty;
rcv->shutdown = 0;
rcv->tracer = tracer;
rcv->flush_interval_ms = htraced_get_bounded_u64(tracer->lg, conf,
HTRACED_FLUSH_INTERVAL_MS_KEY, HTRACED_FLUSH_INTERVAL_MS_MIN,
HTRACED_FLUSH_INTERVAL_MS_MAX);
write_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
HTRACED_WRITE_TIMEO_MS_KEY, HTRACED_WRITE_TIMEO_MS_MIN,
0x7fffffffffffffffULL);
read_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
HTRACED_READ_TIMEO_MS_KEY, HTRACED_READ_TIMEO_MS_MIN,
0x7fffffffffffffffULL);
rcv->hcli = hrpc_client_alloc(tracer->lg, write_timeo_ms,
read_timeo_ms, endpoint);
if (!rcv->hcli) {
goto error_free_rcv;
}
rcv->clen = htrace_conf_get_u64(tracer->lg, conf, HTRACED_BUFFER_SIZE_KEY);
if (rcv->clen < HTRACED_MIN_BUFFER_SIZE) {
htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64
". Setting the minimum buffer size of %llu"
" instead.\n", rcv->clen, HTRACED_MIN_BUFFER_SIZE);
rcv->clen = HTRACED_MIN_BUFFER_SIZE;
} else if (rcv->clen > HTRACED_MAX_BUFFER_SIZE) {
htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64
". Setting the maximum buffer size of %lld"
" instead.\n", rcv->clen, HTRACED_MAX_BUFFER_SIZE);
rcv->clen = HTRACED_MAX_BUFFER_SIZE;
}
rcv->cbuf = malloc(rcv->clen);
if (!rcv->cbuf) {
htrace_log(tracer->lg, "htraced_rcv_create: failed to malloc %"PRId64
" bytes for the htraced circular buffer.\n", rcv->clen);
goto error_free_hcli;
}
// Send when the buffer gets 1/4 full.
rcv->send_threshold = rcv->clen * 0.25;
rcv->cstart = 0;
rcv->cend = 0;
rcv->last_send_ms = monotonic_now_ms(tracer->lg);
rcv->sbuf = malloc(HTRACED_MAX_MSG_LEN);
if (!rcv->sbuf) {
goto error_free_cbuf;
}
ret = pthread_mutex_init(&rcv->lock, NULL);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: pthread_mutex_init "
"error %d: %s\n", ret, terror(ret));
goto error_free_sbuf;
}
ret = pthread_cond_init(&rcv->cond, NULL);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init "
"error %d: %s\n", ret, terror(ret));
goto error_free_lock;
}
ret = pthread_create(&rcv->xmit_thread, NULL, run_htraced_xmit_manager, rcv);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: failed to create xmit thread: "
"error %d: %s\n", ret, terror(ret));
goto error_free_cvar;
}
htrace_log(tracer->lg, "Initialized htraced receiver for %s"
", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64
", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64
", clen=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli),
rcv->flush_interval_ms, rcv->send_threshold,
write_timeo_ms, read_timeo_ms, rcv->clen);
return (struct htrace_rcv*)rcv;
error_free_cvar:
pthread_cond_destroy(&rcv->cond);
error_free_lock:
pthread_mutex_destroy(&rcv->lock);
error_free_sbuf:
free(rcv->sbuf);
error_free_cbuf:
free(rcv->cbuf);
error_free_hcli:
hrpc_client_free(rcv->hcli);
error_free_rcv:
free(rcv);
error:
return NULL;
}
void* run_htraced_xmit_manager(void *data)
{
struct htraced_rcv *rcv = data;
struct htrace_log *lg = rcv->tracer->lg;
uint64_t now, wakeup;
struct timespec wakeup_ts;
int ret;
pthread_mutex_lock(&rcv->lock);
while (1) {
now = monotonic_now_ms(lg);
while (should_xmit(rcv, now)) {
htraced_xmit(rcv, now);
}
if (rcv->shutdown) {
while (cbuf_used(rcv) > 0) {
htraced_xmit(rcv, now);
}
break;
}
// Wait for one of a few things to happen:
// * Shutdown
// * The wakeup timer to elapse, leading us to check if we should send
// because of send_timeo_ms.
// * A writer to signal that we should wake up because enough bytes are
// buffered.
wakeup = now + (rcv->flush_interval_ms / 2);
ms_to_timespec(wakeup, &wakeup_ts);
ret = pthread_cond_timedwait(&rcv->cond, &rcv->lock, &wakeup_ts);
if ((ret != 0) && (ret != ETIMEDOUT)) {
htrace_log(lg, "run_htraced_xmit_manager: pthread_cond_timedwait "
"error: %d (%s)\n", ret, terror(ret));
}
}
pthread_mutex_unlock(&rcv->lock);
htrace_log(lg, "run_htraced_xmit_manager: shutting down the transmission "
"manager thread.\n");
return NULL;
}
/**
* Determine if the xmit manager should send.
* This function must be called with the lock held.
*
* @param rcv The htraced receiver.
* @param now The current time in milliseconds.
*
* @return nonzero if we should send now.
*/
static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
{
uint64_t used;
used = cbuf_used(rcv);
if (used > rcv->send_threshold) {
// We have buffered a lot of bytes, so let's send.
return 1;
}
if (now - rcv->last_send_ms > rcv->flush_interval_ms) {
// It's been too long since the last transmission, so let's send.
if (used > 0) {
return 1;
}
}
return 0; // Let's wait.
}
/**
* Send all the spans which we have buffered.
*
* @param rcv The htraced receiver.
* @param slen The length of the buffer to send.
*
* @return 1 on success; 0 otherwise.
*/
static int htraced_xmit_impl(struct htraced_rcv *rcv, int32_t slen)
{
struct htrace_log *lg = rcv->tracer->lg;
int res, retval = 0;
char *prequel = NULL, *err = NULL, *resp = NULL;
size_t resp_len = 0;
res = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
rcv->sbuf, slen, &err, (void**)&resp, &resp_len);
if (!res) {
htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n");
retval = 0;
} else if (err) {
htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err);
retval = 0;
} else {
retval = 1;
}
free(prequel);
free(err);
free(resp);
return retval;
}
static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
{
int32_t slen;
int tries = 0;
// Move span data from the circular buffer into the transmission buffer.
slen = cbuf_to_sbuf(rcv);
// Release the lock while doing network I/O, so that we don't block threads
// adding spans.
pthread_mutex_unlock(&rcv->lock);
while (1) {
int retry, success = htraced_xmit_impl(rcv, slen);
if (success) {
break;
}
tries++;
retry = (tries < HTRACED_MAX_SEND_TRIES);
htrace_log(rcv->tracer->lg, "htraced_xmit(%s) failed on try %d. %s\n",
hrpc_client_get_endpoint(rcv->hcli), tries,
(retry ? "Retrying after a delay." : "Giving up."));
if (!retry) {
break;
}
}
pthread_mutex_lock(&rcv->lock);
rcv->last_send_ms = now;
}
/**
* Move data from the circular buffer into the transmission buffer, advancing
* the circular buffer's start offset.
*
* This function must be called with the lock held.
*
* Note that we rely on HTRACED_MAX_MSG_LEN being < 4GB in this function for
* correctness on 32-bit systems.
*
* @param rcv The htraced receiver.
*
* @return The amount of data copied.
*/
static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
{
const char * const SUFFIX = "]}";
int SUFFIX_LEN = sizeof(SUFFIX) - 1;
int rem = HTRACED_MAX_MSG_LEN - SUFFIX_LEN;
size_t amt;
char *sbuf = (char*)rcv->sbuf;
fwdprintf(&sbuf, &rem, "{\"DefaultPid\":\"%s\",\"Spans\":[",
rcv->tracer->prid);
if (rcv->cstart < rcv->cend) {
amt = rcv->cend - rcv->cstart;
if (amt > rem) {
amt = rem;
}
memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
sbuf += amt;
rem -= amt;
rcv->cstart += amt;
} else {
amt = rcv->clen - rcv->cstart;
if (amt > rem) {
amt = rem;
}
memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
sbuf += amt;
rem -= amt;
rcv->cstart += amt;
if (rem > 0) {
amt = rcv->cend;
if (amt > rem) {
amt = rem;
}
memcpy(sbuf, rcv->cbuf, amt);
sbuf += amt;
rem -= amt;
rcv->cstart = amt;
}
}
// overwrite last comma
rem++;
sbuf--;
rem += SUFFIX_LEN;
fwdprintf(&sbuf, &rem, "%s", SUFFIX);
return HTRACED_MAX_MSG_LEN - rem;
}
/**
* Returns the current number of bytes used in the htraced circular buffer.
* Must be called under the lock.
*
* @param rcv The htraced receiver.
*
* @return The number of bytes used.
*/
static uint64_t cbuf_used(const struct htraced_rcv *rcv)
{
if (rcv->cstart <= rcv->cend) {
return rcv->cend - rcv->cstart;
}
return rcv->clen - (rcv->cstart - rcv->cend);
}
static void htraced_rcv_add_span(struct htrace_rcv *r,
struct htrace_span *span)
{
int json_len, tries, retry;
uint64_t used, rem;
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
struct htrace_log *lg = rcv->tracer->lg;
json_len = span_json_size(span);
tries = 0;
do {
pthread_mutex_lock(&rcv->lock);
used = cbuf_used(rcv);
if (used + json_len >= rcv->clen) {
pthread_cond_signal(&rcv->cond);
pthread_mutex_unlock(&rcv->lock);
tries++;
retry = tries < HTRACED_MAX_ADD_TRIES;
htrace_log(lg, "htraced_rcv_add_span: not enough space in the "
"circular buffer. Have %" PRId64 ", need %d"
". %s...\n", (rcv->clen - used), json_len,
(retry ? "Retrying" : "Giving up"));
if (retry) {
pthread_yield();
continue;
}
return;
}
} while (0);
// OK, now we have the lock, and we know that there is enough space in the
// circular buffer.
rem = rcv->clen - rcv->cend;
if (rem < json_len) {
// Handle a 'torn write' where the circular buffer loops around to the
// beginning in the middle of the write.
char *temp = malloc(json_len);
if (!temp) {
htrace_log(lg, "htraced_rcv_add_span: failed to malloc %d byte "
"buffer for torn write.\n", json_len);
goto done;
}
span_json_sprintf(span, json_len, temp);
temp[json_len - 1] = ',';
memcpy(rcv->cbuf + rcv->cend, temp, rem);
memcpy(rcv->cbuf, temp + rem, json_len - rem);
rcv->cend = json_len - rem;
free(temp);
} else {
span_json_sprintf(span, json_len, rcv->cbuf + rcv->cend);
rcv->cbuf[rcv->cend + json_len - 1] = ',';
rcv->cend += json_len;
}
used += json_len;
if (used > rcv->send_threshold) {
pthread_cond_signal(&rcv->cond);
}
done:
pthread_mutex_unlock(&rcv->lock);
}
static void htraced_rcv_flush(struct htrace_rcv *r)
{
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
while (1) {
pthread_mutex_lock(&rcv->lock);
if (cbuf_used(rcv) == 0) {
// If the buffer is empty, we're done.
// Note that there is no guarantee that we'll ever be done if spans
// are being added continuously throughout the flush. This is OK,
// since flush() is actually only used by unit tests.
// We could do something more clever here, but it would be a lot more
// complex.
pthread_mutex_unlock(&rcv->lock);
break;
}
// Get the xmit thread to send what it can, by resetting the "last send
// time" to the oldest possible monotonic time.
rcv->last_send_ms = 0;
pthread_cond_signal(&rcv->cond);
pthread_mutex_unlock(&rcv->lock);
}
}
static void htraced_rcv_free(struct htrace_rcv *r)
{
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
struct htrace_log *lg;
int ret;
if (!rcv) {
return;
}
lg = rcv->tracer->lg;
htrace_log(lg, "Shutting down htraced receiver for %s\n",
hrpc_client_get_endpoint(rcv->hcli));
pthread_mutex_lock(&rcv->lock);
rcv->shutdown = 1;
pthread_cond_signal(&rcv->cond);
pthread_mutex_unlock(&rcv->lock);
ret = pthread_join(rcv->xmit_thread, NULL);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_join "
"error %d: %s\n", ret, terror(ret));
}
free(rcv->cbuf);
free(rcv->sbuf);
hrpc_client_free(rcv->hcli);
ret = pthread_mutex_destroy(&rcv->lock);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy "
"error %d: %s\n", ret, terror(ret));
}
ret = pthread_cond_destroy(&rcv->cond);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy "
"error %d: %s\n", ret, terror(ret));
}
free(rcv);
}
const struct htrace_rcv_ty g_htraced_rcv_ty = {
"htraced",
htraced_rcv_create,
htraced_rcv_add_span,
htraced_rcv_flush,
htraced_rcv_free,
};
// vim:ts=4:sw=4:et