HTRACE-164. htrace hrpc: use msgpack for serialization (cmccabe)
diff --git a/LICENSE.txt b/LICENSE.txt
index fa6051c..19dd8f7 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -276,3 +276,11 @@
Backbone.Marionette is a composite application library for Backbone.js.
It is MIT licensed:
https://github.com/marionettejs/backbone.marionette/blob/v2.4.1/license.txt
+
+CMP is an implementation of the MessagePack serialization format in
+C. It is licensed under the MIT license:
+https://github.com/camgunz/cmp/blob/master/LICENSE
+
+go-codec is an implementation of several serialization and deserialization
+codecs in Go. It is licensed under the MIT license:
+https://github.com/ugorji/go/blob/master/LICENSE
\ No newline at end of file
diff --git a/htrace-c/src/CMakeLists.txt b/htrace-c/src/CMakeLists.txt
index 3d51968..da08ee9 100644
--- a/htrace-c/src/CMakeLists.txt
+++ b/htrace-c/src/CMakeLists.txt
@@ -83,6 +83,8 @@
sampler/never.c
sampler/prob.c
sampler/sampler.c
+ util/cmp.c
+ util/cmp_util.c
util/htable.c
util/log.c
util/process_id.c
@@ -130,6 +132,10 @@
add_test(${utest} ${CMAKE_CURRENT_BINARY_DIR}/${utest} ${utest})
endmacro(add_utest)
+add_utest(cmp_util-unit
+ test/cmp_util-unit.c
+)
+
add_utest(conf-unit
test/conf-unit.c
)
diff --git a/htrace-c/src/core/conf.c b/htrace-c/src/core/conf.c
index a561906..d9eed2a 100644
--- a/htrace-c/src/core/conf.c
+++ b/htrace-c/src/core/conf.c
@@ -34,6 +34,7 @@
";" HTRACED_READ_TIMEO_MS_KEY "=60000"\
";" HTRACE_PROCESS_ID "=%{tname}/%{ip}"\
";" HTRACED_ADDRESS_KEY "=localhost:9095"\
+ ";" HTRACED_BUFFER_SEND_TRIGGER_FRACTION "=0.50"\
)
static int parse_key_value(char *str, char **key, char **val)
diff --git a/htrace-c/src/core/htrace.h b/htrace-c/src/core/htrace.h
index 4e2f1f9..816c222 100644
--- a/htrace-c/src/core/htrace.h
+++ b/htrace-c/src/core/htrace.h
@@ -135,6 +135,13 @@
#define HTRACED_BUFFER_SIZE_KEY "htraced.buffer.size"
/**
+ * The fraction of the buffer that needs to be full to trigger the spans to be
+ * sent from the htraced span receiver.
+ */
+#define HTRACED_BUFFER_SEND_TRIGGER_FRACTION \
+ "htraced.buffer.send.trigger.fraction"
+
+/**
* The process ID string to use.
*
* %{ip} will be replaced by an IP address;
diff --git a/htrace-c/src/core/span.c b/htrace-c/src/core/span.c
index 13ba3cf..790afa5 100644
--- a/htrace-c/src/core/span.c
+++ b/htrace-c/src/core/span.c
@@ -19,6 +19,7 @@
#include "core/span.h"
#include "receiver/receiver.h"
#include "sampler/sampler.h"
+#include "util/cmp.h"
#include "util/log.h"
#include "util/rand.h"
#include "util/string.h"
@@ -26,6 +27,7 @@
#include <inttypes.h>
#include <stdint.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -127,7 +129,7 @@
* containing the span contents. With buf non-NULL, we will write the span
* contents to the provided buffer.
*
- * @param scope The scope
+ * @param span The span
* @param max The maximum number of bytes to write to buf.
* @param buf If non-NULL, where the string will be written.
*
@@ -147,7 +149,7 @@
ret += fwdprintf(&buf, &max, "{\"s\":\"%016" PRIx64 "\",\"b\":%" PRId64
",\"e\":%" PRId64",", span->span_id, span->begin_ms,
span->end_ms);
- if (span->desc) {
+ if (span->desc[0]) {
ret += fwdprintf(&buf, &max, "\"d\":\"%s\",", span->desc);
}
if (span->prid) {
@@ -174,14 +176,87 @@
return ret + 1;
}
-int span_json_size(const struct htrace_span *scope)
+int span_json_size(const struct htrace_span *span)
{
- return span_json_sprintf_impl(scope, 0, NULL);
+ return span_json_sprintf_impl(span, 0, NULL);
}
-void span_json_sprintf(const struct htrace_span *scope, int max, void *buf)
+void span_json_sprintf(const struct htrace_span *span, int max, void *buf)
{
- span_json_sprintf_impl(scope, max, buf);
+ span_json_sprintf_impl(span, max, buf);
+}
+
+int span_write_msgpack(const struct htrace_span *span, cmp_ctx_t *ctx)
+{
+ int i, num_parents;
+ uint16_t map_size =
+ 1 + // desc
+ 1 + // begin_ms
+ 1 + // end_ms
+ 1; // span_id
+
+ num_parents = span->num_parents;
+ if (span->prid) {
+ map_size++;
+ }
+ if (num_parents > 0) {
+ map_size++;
+ }
+ if (!cmp_write_map16(ctx, map_size)) {
+ return 0;
+ }
+ if (!cmp_write_fixstr(ctx, "d", 1)) {
+ return 0;
+ }
+ if (!cmp_write_str16(ctx, span->desc, strlen(span->desc))) {
+ return 0;
+ }
+ if (!cmp_write_fixstr(ctx, "b", 1)) {
+ return 0;
+ }
+ if (!cmp_write_u64(ctx, span->begin_ms)) {
+ return 0;
+ }
+ if (!cmp_write_fixstr(ctx, "e", 1)) {
+ return 0;
+ }
+ if (!cmp_write_u64(ctx, span->end_ms)) {
+ return 0;
+ }
+ if (!cmp_write_fixstr(ctx, "s", 1)) {
+ return 0;
+ }
+ if (!cmp_write_u64(ctx, span->span_id)) {
+ return 0;
+ }
+ if (span->prid) {
+ if (!cmp_write_fixstr(ctx, "r", 1)) {
+ return 0;
+ }
+ if (!cmp_write_str16(ctx, span->prid, strlen(span->prid))) {
+ return 0;
+ }
+ }
+ if (num_parents > 0) {
+ if (!cmp_write_fixstr(ctx, "p", 1)) {
+ return 0;
+ }
+ if (!cmp_write_array16(ctx, num_parents)) {
+ return 0;
+ }
+ if (num_parents == 1) {
+ if (!cmp_write_u64(ctx, span->parent.single)) {
+ return 0;
+ }
+ } else {
+ for (i = 0; i < num_parents; i++) {
+ if (!cmp_write_u64(ctx, span->parent.list[i])) {
+ return 0;
+ }
+ }
+ }
+ }
+ return 1;
}
// vim:ts=4:sw=4:et
diff --git a/htrace-c/src/core/span.h b/htrace-c/src/core/span.h
index b19bd94..710cff7 100644
--- a/htrace-c/src/core/span.h
+++ b/htrace-c/src/core/span.h
@@ -29,6 +29,7 @@
#include <stdint.h>
+struct cmp_ctx_s;
struct htracer;
struct htrace_span {
@@ -105,16 +106,6 @@
void htrace_span_sort_and_dedupe_parents(struct htrace_span *span);
/**
- * Escape a JSON string. Specifically, put backslashes before double quotes and
- * other backslashes.
- *
- * @param in The string to escape.
- *
- * @param out The escaped string. Malloced. NULL on OOM.
- */
-char *json_escape(const char *in);
-
-/**
* Get the buffer size that would be needed to serialize this span to a buffer.
*
* @param span The span.
@@ -133,6 +124,16 @@
*/
void span_json_sprintf(const struct htrace_span *span, int max, void *buf);
+/**
+ * Write a span to the provided CMP context.
+ *
+ * @param span The span.
+ * @param ctx The CMP context.
+ *
+ * @return 0 on failure; 1 on success.
+ */
+int span_write_msgpack(const struct htrace_span *span, struct cmp_ctx_s *ctx);
+
#endif
// vim: ts=4:sw=4:et
diff --git a/htrace-c/src/receiver/hrpc.c b/htrace-c/src/receiver/hrpc.c
index 32a1f4a..f2e296d 100644
--- a/htrace-c/src/receiver/hrpc.c
+++ b/htrace-c/src/receiver/hrpc.c
@@ -52,7 +52,7 @@
* Implements sending messages via HRPC.
*/
-#define HRPC_MAGIC 0x48545243U
+#define HRPC_MAGIC 0x43525448U
#define MAX_HRPC_ERROR_LENGTH (4 * 1024 * 1024)
@@ -130,7 +130,8 @@
static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
int sock);
static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
- const void *req, size_t req_len, uint64_t *seq);
+ const void *buf1, size_t buf1_len,
+ const void *buf2, size_t buf2_len, uint64_t *seq);
static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
uint64_t seq, char **err, void **resp,
size_t *resp_len);
@@ -185,7 +186,8 @@
}
int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
- const void *req, size_t req_len,
+ const void *buf1, size_t buf1_len,
+ const void *buf2, size_t buf2_len,
char **err, void **resp, size_t *resp_len)
{
uint64_t seq;
@@ -198,7 +200,8 @@
} else {
htrace_log(hcli->lg, "hrpc_client_call: connection was already open\n");
}
- if (!hrpc_client_send_req(hcli, method_id, req, req_len, &seq)) {
+ if (!hrpc_client_send_req(hcli, method_id,
+ buf1, buf1_len, buf2, buf2_len, &seq)) {
goto error;
}
htrace_log(hcli->lg, "hrpc_client_call: waiting for response\n");
@@ -338,20 +341,25 @@
}
static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
- const void *req, size_t req_len, uint64_t *seq)
+ const void *buf1, size_t buf1_len,
+ const void *buf2, size_t buf2_len, uint64_t *seq)
{
+ // We use writev (scatter/gather I/O) here in order to avoid sending
+ // multiple packets when TCP_NODELAY is turned on.
struct hrpc_req_header hdr;
- struct iovec iov[2];
+ struct iovec iov[3];
hdr.magic = htole64(HRPC_MAGIC);
hdr.method_id = htole32(method_id);
*seq = hcli->seq++;
hdr.seq = htole64(*seq);
- hdr.length = htole32(req_len);
+ hdr.length = htole32(buf1_len + buf2_len);
iov[0].iov_base = &hdr;
iov[0].iov_len = sizeof(hdr);
- iov[1].iov_base = (void*)req;
- iov[1].iov_len = req_len;
+ iov[1].iov_base = (void*)buf1;
+ iov[1].iov_len = buf1_len;
+ iov[2].iov_base = (void*)buf2;
+ iov[2].iov_len = buf2_len;
while (1) {
ssize_t res = writev(hcli->sock, iov, sizeof(iov)/sizeof(iov[0]));
diff --git a/htrace-c/src/receiver/hrpc.h b/htrace-c/src/receiver/hrpc.h
index 8ec20be..c7126b0 100644
--- a/htrace-c/src/receiver/hrpc.h
+++ b/htrace-c/src/receiver/hrpc.h
@@ -60,8 +60,10 @@
*
* @param hcli The HRPC client.
* @param method_id The method ID to use.
- * @param req The request buffer to send.
- * @param req_len The size of the request buffer to send.
+ * @param buf1 The first buffer to send.
+ * @param buf1_len The size of the first buffer to send.
+ * @param buf2 The second buffer to send.
+ * @param buf2_len The size of the second buffer to send.
* @param err (out param) Will be set to a malloced
* NULL-terminated string if the server returned an
* error response. NULL otherwise.
@@ -72,7 +74,8 @@
* @return 0 on failure, 1 on success.
*/
int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
- const void *req, size_t req_len,
+ const void *buf1, size_t buf1_len,
+ const void *buf2, size_t buf2_len,
char **err, void **resp, size_t *resp_len);
/**
diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c
index 7e4af81..8f392b7 100644
--- a/htrace-c/src/receiver/htraced.c
+++ b/htrace-c/src/receiver/htraced.c
@@ -23,6 +23,8 @@
#include "receiver/hrpc.h"
#include "receiver/receiver.h"
#include "test/test.h"
+#include "util/cmp.h"
+#include "util/cmp_util.h"
#include "util/log.h"
#include "util/string.h"
#include "util/time.h"
@@ -30,6 +32,7 @@
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
+#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
@@ -38,22 +41,59 @@
/**
* @file htraced.c
*
- * The htraced span receiver which implements sending spans on to htraced.
+ * The htraced span receiver which implements sending spans on to the htraced
+ * daemon.
+ *
+ * We send spans via the HRPC protocol. HRPC consists of a simple fixed-size
+ * header specifying a magic number, the length, and a message type, followed by
+ * data in the msgpack serialization format. The message type will determine
+ * the type of msgpack data. Messages bodies are sent as maps, essentially
+ * making all fields optional and allowing the protocol to evolve. See hrpc.c
+ * and rpc.go for the implementation of HRPC.
+ *
+ * Spans are serialized immediately when they are added to the buffer. This is
+ * one of the advantages of msgpack-- it has a good streaming interface. We do
+ * not need to keep around the span objects after htraced_rcv_add_span.
+ *
+ * The htraced receiver keeps two equally sized buffers around internally.
+ * While we are writing spans to one buffer, we can be sending the data from the
+ * other buffer over the wire. The intention here is to avoid copies as much as
+ * possible. In general, what we send over the wire is exactly what is in the
+ * buffer, except that we have to add a short "prequel" to it containing the
+ * other WriteSpansReq fields.
+ *
+ * Note that we may change the serialization in the future if we discover better
+ * alternatives. Sending spans over HTTP as JSON will always be supported
+ * as a fallback.
*/
/**
- * The minimum buffer size to allow for the htraced circular buffer.
+ * The maximum length of the message we will send to the server.
+ * This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go.
+ */
+#define MAX_HRPC_LEN (64ULL * 1024ULL * 1024ULL)
+
+/**
+ * The maximum length of the prequel in a WriteSpans message.
+ */
+#define MAX_WRITESPANS_PREQUEL_LEN 1024
+
+/**
+ * The maximum length of the span data in a WriteSpans message.
+ */
+#define MAX_SPAN_DATA_LEN (MAX_HRPC_LEN - MAX_WRITESPANS_PREQUEL_LEN)
+
+/**
+ * The minimum total buffer size to allow.
*
- * This should hopefully allow at least a few spans to be buffered.
+ * This should allow at least a few spans to be buffered.
*/
#define HTRACED_MIN_BUFFER_SIZE (4ULL * 1024ULL * 1024ULL)
/**
- * The maximum buffer size to allow for the htraced circular buffer.
- * This is mainly to avoid overflow. Of course, you couldn't allocate a buffer
- * anywhere near this size anyway.
+ * The maximum total buffer size to allow.
*/
-#define HTRACED_MAX_BUFFER_SIZE 0x7ffffffffffffffLL
+#define HTRACED_MAX_BUFFER_SIZE (2ULL * MAX_SPAN_DATA_LEN)
/**
* The minimum number of milliseconds to allow for flush_interval_ms.
@@ -77,13 +117,6 @@
#define HTRACED_READ_TIMEO_MS_MIN 50LL
/**
- * The maximum size of the message we will send over the wire.
- * This also sets the size of the transmission buffer.
- * This constant must not be more than 2^^32 on 32-bit systems.
- */
-#define HTRACED_MAX_MSG_LEN (8ULL * 1024ULL * 1024ULL)
-
-/**
* The maximum number of times we will try to add a span to the circular buffer
* before giving up.
*/
@@ -101,6 +134,36 @@
*/
#define HTRACED_SEND_RETRY_SLEEP_MS 5000
+/**
+ * The number of buffers used by htraced.
+ */
+#define HTRACED_NUM_BUFS 2
+
+/**
+ * An HTraced send buffer.
+ */
+struct htraced_sbuf {
+ /**
+ * Current offset within the buffer.
+ */
+ uint64_t off;
+
+ /**
+ * Length of the buffer.
+ */
+ uint64_t len;
+
+ /**
+ * The number of spans in the buffer.
+ */
+ uint64_t num_spans;
+
+ /**
+ * The buffer data. This field actually has size 'len,' not size 1.
+ */
+ char buf[1];
+};
+
/*
* A span receiver that writes spans to htraced.
*/
@@ -136,45 +199,34 @@
struct hrpc_client *hcli;
/**
- * Length of the circular buffer.
- */
- uint64_t clen;
-
- /**
- * A circular buffer containing span data.
- */
- uint8_t *cbuf;
-
- /**
- * 'start' pointer of the circular buffer.
- */
- uint64_t cstart;
-
- /**
- * 'end' pointer of the circular buffer.
- */
- uint64_t cend;
-
- /**
* The monotonic-clock time at which we last did a send operation.
*/
uint64_t last_send_ms;
/**
- * RPC messages are copied into this buffer before being sent.
- * Its length is HTRACED_MAX_MSG_LEN.
+ * The index of the active buffer.
*/
- uint8_t *sbuf;
+ int active_buf;
/**
- * Lock protecting the circular buffer from concurrent writes.
+ * The two send buffers.
+ */
+ struct htraced_sbuf *sbuf[HTRACED_NUM_BUFS];
+
+ /**
+ * Lock protecting the buffers from concurrent writes.
*/
pthread_mutex_t lock;
/**
* Condition variable used to wake up the background thread.
*/
- pthread_cond_t cond;
+ pthread_cond_t bg_cond;
+
+ /**
+ * Condition variable used to wake up flushing threads.
+ */
+ pthread_cond_t flush_cond;
/**
* Background transmitter thread.
@@ -185,8 +237,44 @@
void* run_htraced_xmit_manager(void *data);
static int should_xmit(struct htraced_rcv *rcv, uint64_t now);
static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now);
-static uint64_t cbuf_used(const struct htraced_rcv *rcv);
-static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv);
+
+static int htraced_sbufs_empty(struct htraced_rcv *rcv)
+{
+ int i;
+ for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+ if (rcv->sbuf[i]->off) {
+ return 0;
+ }
+ }
+ return 1;
+}
+
+static struct htraced_sbuf *htraced_sbuf_alloc(uint64_t len)
+{
+ struct htraced_sbuf *sbuf;
+
+ // The final field of the htraced_sbuf structure is declared as having size
+ // 1, but really it has size 'len'. This avoids a pointer dereference when
+ // accessing data in the sbuf.
+ sbuf = malloc(offsetof(struct htraced_sbuf, buf) + len);
+ if (!sbuf) {
+ return NULL;
+ }
+ sbuf->off = 0;
+ sbuf->len = len;
+ sbuf->num_spans = 0;
+ return sbuf;
+}
+
+static void htraced_sbuf_free(struct htraced_sbuf *sbuf)
+{
+ free(sbuf);
+}
+
+static uint64_t htraced_sbuf_remaining(const struct htraced_sbuf *sbuf)
+{
+ return sbuf->len - sbuf->off;
+}
static uint64_t htraced_get_bounded_u64(struct htrace_log *lg,
const struct htrace_conf *cnf, const char *prop,
@@ -207,13 +295,33 @@
return val;
}
+static double htraced_get_bounded_double(struct htrace_log *lg,
+ const struct htrace_conf *cnf, const char *prop,
+ double min, double max)
+{
+ double val = htrace_conf_get_double(lg, cnf, prop);
+ if (val < min) {
+ htrace_log(lg, "htraced_rcv_create: can't set %s to %g"
+ ". Using minimum value of %g instead.\n",
+ prop, val, min);
+ return min;
+ } else if (val > max) {
+ htrace_log(lg, "htraced_rcv_create: can't set %s to %g"
+ ". Using maximum value of %g instead.\n",
+ prop, val, max);
+ return max;
+ }
+ return val;
+}
+
static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
const struct htrace_conf *conf)
{
struct htraced_rcv *rcv;
const char *endpoint;
- int ret;
- uint64_t write_timeo_ms, read_timeo_ms;
+ int i, ret;
+ uint64_t write_timeo_ms, read_timeo_ms, buf_len;
+ double send_fraction;
endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
if (!endpoint) {
@@ -223,7 +331,7 @@
HTRACED_ADDRESS_KEY);
goto error;
}
- rcv = malloc(sizeof(*rcv));
+ rcv = calloc(1, sizeof(*rcv));
if (!rcv) {
htrace_log(tracer->lg, "htraced_rcv_create: OOM while "
"allocating htraced_rcv.\n");
@@ -247,68 +355,66 @@
if (!rcv->hcli) {
goto error_free_rcv;
}
- rcv->clen = htrace_conf_get_u64(tracer->lg, conf, HTRACED_BUFFER_SIZE_KEY);
- if (rcv->clen < HTRACED_MIN_BUFFER_SIZE) {
- htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64
- ". Setting the minimum buffer size of %llu"
- " instead.\n", rcv->clen, HTRACED_MIN_BUFFER_SIZE);
- rcv->clen = HTRACED_MIN_BUFFER_SIZE;
- } else if (rcv->clen > HTRACED_MAX_BUFFER_SIZE) {
- htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64
- ". Setting the maximum buffer size of %lld"
- " instead.\n", rcv->clen, HTRACED_MAX_BUFFER_SIZE);
- rcv->clen = HTRACED_MAX_BUFFER_SIZE;
+ buf_len = htraced_get_bounded_u64(tracer->lg, conf,
+ HTRACED_BUFFER_SIZE_KEY, HTRACED_MIN_BUFFER_SIZE,
+ HTRACED_MAX_BUFFER_SIZE) / 2;
+ for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+ rcv->sbuf[i] = htraced_sbuf_alloc(buf_len);
+ if (!rcv->sbuf[i]) {
+ htrace_log(tracer->lg, "htraced_rcv_create: htraced_sbuf_alloc("
+ "buf_len=%"PRId64") failed: OOM.\n", buf_len);
+ goto error_free_bufs;
+ }
}
- rcv->cbuf = malloc(rcv->clen);
- if (!rcv->cbuf) {
- htrace_log(tracer->lg, "htraced_rcv_create: failed to malloc %"PRId64
- " bytes for the htraced circular buffer.\n", rcv->clen);
- goto error_free_hcli;
+ send_fraction = htraced_get_bounded_double(tracer->lg, conf,
+ HTRACED_BUFFER_SEND_TRIGGER_FRACTION, 0.1, 1.0);
+ rcv->send_threshold = buf_len * send_fraction;
+ if (rcv->send_threshold > buf_len) {
+ rcv->send_threshold = buf_len;
}
- // Send when the buffer gets 1/4 full.
- rcv->send_threshold = rcv->clen * 0.25;
- rcv->cstart = 0;
- rcv->cend = 0;
rcv->last_send_ms = monotonic_now_ms(tracer->lg);
- rcv->sbuf = malloc(HTRACED_MAX_MSG_LEN);
- if (!rcv->sbuf) {
- goto error_free_cbuf;
- }
ret = pthread_mutex_init(&rcv->lock, NULL);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: pthread_mutex_init "
"error %d: %s\n", ret, terror(ret));
- goto error_free_sbuf;
+ goto error_free_bufs;
}
- ret = pthread_cond_init(&rcv->cond, NULL);
+ ret = pthread_cond_init(&rcv->bg_cond, NULL);
if (ret) {
- htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init "
- "error %d: %s\n", ret, terror(ret));
+ htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init("
+ "bg_cond) error %d: %s\n", ret, terror(ret));
goto error_free_lock;
}
+ ret = pthread_cond_init(&rcv->flush_cond, NULL);
+ if (ret) {
+ htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init("
+ "flush_cond) error %d: %s\n", ret, terror(ret));
+ goto error_free_bg_cond;
+ }
ret = pthread_create(&rcv->xmit_thread, NULL, run_htraced_xmit_manager, rcv);
if (ret) {
htrace_log(tracer->lg, "htraced_rcv_create: failed to create xmit thread: "
"error %d: %s\n", ret, terror(ret));
- goto error_free_cvar;
+ goto error_free_flush_cond;
}
htrace_log(tracer->lg, "Initialized htraced receiver for %s"
", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64
", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64
- ", clen=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli),
+ ", buf_len=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli),
rcv->flush_interval_ms, rcv->send_threshold,
- write_timeo_ms, read_timeo_ms, rcv->clen);
+ write_timeo_ms, read_timeo_ms, buf_len);
return (struct htrace_rcv*)rcv;
-error_free_cvar:
- pthread_cond_destroy(&rcv->cond);
+error_free_flush_cond:
+ pthread_cond_destroy(&rcv->flush_cond);
+error_free_bg_cond:
+ pthread_cond_destroy(&rcv->bg_cond);
error_free_lock:
pthread_mutex_destroy(&rcv->lock);
-error_free_sbuf:
- free(rcv->sbuf);
-error_free_cbuf:
- free(rcv->cbuf);
-error_free_hcli:
+error_free_bufs:
+ for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+ htraced_sbuf_free(rcv->sbuf[i]);
+ }
hrpc_client_free(rcv->hcli);
error_free_rcv:
free(rcv);
@@ -331,7 +437,7 @@
htraced_xmit(rcv, now);
}
if (rcv->shutdown) {
- while (cbuf_used(rcv) > 0) {
+ while (!htraced_sbufs_empty(rcv)) {
htraced_xmit(rcv, now);
}
break;
@@ -344,7 +450,7 @@
// buffered.
wakeup = now + (rcv->flush_interval_ms / 2);
ms_to_timespec(wakeup, &wakeup_ts);
- ret = pthread_cond_timedwait(&rcv->cond, &rcv->lock, &wakeup_ts);
+ ret = pthread_cond_timedwait(&rcv->bg_cond, &rcv->lock, &wakeup_ts);
if ((ret != 0) && (ret != ETIMEDOUT)) {
htrace_log(lg, "run_htraced_xmit_manager: pthread_cond_timedwait "
"error: %d (%s)\n", ret, terror(ret));
@@ -367,67 +473,107 @@
*/
static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
{
- uint64_t used;
+ uint64_t off = rcv->sbuf[rcv->active_buf]->off;
- used = cbuf_used(rcv);
- if (used > rcv->send_threshold) {
+ if (off > rcv->send_threshold) {
// We have buffered a lot of bytes, so let's send.
return 1;
}
if (now - rcv->last_send_ms > rcv->flush_interval_ms) {
// It's been too long since the last transmission, so let's send.
- if (used > 0) {
+ if (off > 0) {
return 1;
}
}
return 0; // Let's wait.
}
+#define DEFAULT_PID_STR "DefaultPid"
+#define DEFAULT_PID_STR_LEN (sizeof(DEFAULT_PID_STR) - 1)
+#define SPANS_STR "Spans"
+#define SPANS_STR_LEN (sizeof(SPANS_STR) - 1)
+
+/**
+ * Write the prequel to the WriteSpans message.
+ */
+static int add_writespans_prequel(struct htraced_rcv *rcv,
+ struct htraced_sbuf *sbuf, uint8_t *prequel)
+{
+ struct cmp_bcopy_ctx bctx;
+ struct cmp_ctx_s *ctx = (struct cmp_ctx_s *)&bctx;
+ cmp_bcopy_ctx_init(&bctx, prequel, MAX_WRITESPANS_PREQUEL_LEN);
+ if (!cmp_write_fixmap(ctx, 2)) {
+ return -1;
+ }
+ if (!cmp_write_fixstr(ctx, DEFAULT_PID_STR, DEFAULT_PID_STR_LEN)) {
+ return -1;
+ }
+ if (!cmp_write_str(ctx, rcv->tracer->prid, strlen(rcv->tracer->prid))) {
+ return -1;
+ }
+ if (!cmp_write_fixstr(ctx, SPANS_STR, SPANS_STR_LEN)) {
+ return -1;
+ }
+ if (!cmp_write_array(ctx, sbuf->num_spans)) {
+ return -1;
+ }
+ return bctx.off;
+}
+
/**
* Send all the spans which we have buffered.
*
* @param rcv The htraced receiver.
- * @param slen The length of the buffer to send.
+ * @param sbuf The span buffer to send.
*
* @return 1 on success; 0 otherwise.
*/
-static int htraced_xmit_impl(struct htraced_rcv *rcv, int32_t slen)
+static int htraced_xmit_impl(struct htraced_rcv *rcv, struct htraced_sbuf *sbuf)
{
struct htrace_log *lg = rcv->tracer->lg;
- int res, retval = 0;
- char *prequel = NULL, *err = NULL, *resp = NULL;
+ uint8_t prequel[MAX_WRITESPANS_PREQUEL_LEN];
+ int prequel_len, ret;
+ char *err = NULL, *resp = NULL;
size_t resp_len = 0;
- res = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
- rcv->sbuf, slen, &err, (void**)&resp, &resp_len);
- if (!res) {
+ prequel_len = add_writespans_prequel(rcv, sbuf, prequel);
+ if (prequel_len < 0) {
+ htrace_log(lg, "htrace_xmit_impl: add_writespans_prequel failed.\n");
+ ret = 0;
+ goto done;
+ }
+ ret = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
+ prequel, prequel_len, sbuf->buf, sbuf->off,
+ &err, (void**)&resp, &resp_len);
+ if (!ret) {
htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n");
- retval = 0;
+ goto done;
} else if (err) {
htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err);
- retval = 0;
- } else {
- retval = 1;
+ ret = 0;
+ goto done;
}
- free(prequel);
+ ret = 1;
+done:
free(err);
free(resp);
- return retval;
+ return ret;
}
static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
{
- int32_t slen;
int tries = 0;
+ struct htraced_sbuf *sbuf;
- // Move span data from the circular buffer into the transmission buffer.
- slen = cbuf_to_sbuf(rcv);
+ // Flip to the other buffer.
+ sbuf = rcv->sbuf[rcv->active_buf];
+ rcv->active_buf = !rcv->active_buf;
// Release the lock while doing network I/O, so that we don't block threads
// adding spans.
pthread_mutex_unlock(&rcv->lock);
while (1) {
- int retry, success = htraced_xmit_impl(rcv, slen);
+ int retry, success = htraced_xmit_impl(rcv, sbuf);
if (success) {
break;
}
@@ -440,175 +586,100 @@
break;
}
}
+ sbuf->off = 0;
+ sbuf->num_spans = 0;
pthread_mutex_lock(&rcv->lock);
rcv->last_send_ms = now;
-}
-
-/**
- * Move data from the circular buffer into the transmission buffer, advancing
- * the circular buffer's start offset.
- *
- * This function must be called with the lock held.
- *
- * Note that we rely on HTRACED_MAX_MSG_LEN being < 4GB in this function for
- * correctness on 32-bit systems.
- *
- * @param rcv The htraced receiver.
- *
- * @return The amount of data copied.
- */
-static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
-{
- const char * const SUFFIX = "]}";
- int SUFFIX_LEN = sizeof(SUFFIX) - 1;
- int rem = HTRACED_MAX_MSG_LEN - SUFFIX_LEN;
- size_t amt;
- char *sbuf = (char*)rcv->sbuf;
-
- fwdprintf(&sbuf, &rem, "{\"DefaultPid\":\"%s\",\"Spans\":[",
- rcv->tracer->prid);
- if (rcv->cstart < rcv->cend) {
- amt = rcv->cend - rcv->cstart;
- if (amt > rem) {
- amt = rem;
- }
- memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
- sbuf += amt;
- rem -= amt;
- rcv->cstart += amt;
- } else {
- amt = rcv->clen - rcv->cstart;
- if (amt > rem) {
- amt = rem;
- }
- memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
- sbuf += amt;
- rem -= amt;
- rcv->cstart += amt;
- if (rem > 0) {
- amt = rcv->cend;
- if (amt > rem) {
- amt = rem;
- }
- memcpy(sbuf, rcv->cbuf, amt);
- sbuf += amt;
- rem -= amt;
- rcv->cstart = amt;
- }
- }
- // overwrite last comma
- rem++;
- sbuf--;
- rem += SUFFIX_LEN;
- fwdprintf(&sbuf, &rem, "%s", SUFFIX);
- return HTRACED_MAX_MSG_LEN - rem;
-}
-
-/**
- * Returns the current number of bytes used in the htraced circular buffer.
- * Must be called under the lock.
- *
- * @param rcv The htraced receiver.
- *
- * @return The number of bytes used.
- */
-static uint64_t cbuf_used(const struct htraced_rcv *rcv)
-{
- if (rcv->cstart <= rcv->cend) {
- return rcv->cend - rcv->cstart;
- }
- return rcv->clen - (rcv->cstart - rcv->cend);
+ pthread_cond_broadcast(&rcv->flush_cond);
}
static void htraced_rcv_add_span(struct htrace_rcv *r,
struct htrace_span *span)
{
- int json_len, tries, retry;
- uint64_t used, rem;
+ int tries, retry;
+ uint64_t rem, off;
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
+ struct htraced_sbuf *sbuf;
struct htrace_log *lg = rcv->tracer->lg;
+ struct cmp_counter_ctx cctx;
+ struct cmp_bcopy_ctx bctx;
+ uint64_t msgpack_len;
- json_len = span_json_size(span);
+ // Determine the length of the span when serialized to msgpack.
+ cmp_counter_ctx_init(&cctx);
+ if (!span_write_msgpack(span, (cmp_ctx_t*)&cctx)) {
+ htrace_log(lg, "htraced_rcv_add_span: span_write_msgpack failed.\n");
+ return;
+ }
+ msgpack_len = cctx.count;
+
+ // Try to get enough space in the current buffer.
tries = 0;
do {
pthread_mutex_lock(&rcv->lock);
- used = cbuf_used(rcv);
- if (used + json_len >= rcv->clen) {
- pthread_cond_signal(&rcv->cond);
+ sbuf = rcv->sbuf[rcv->active_buf];
+ rem = htraced_sbuf_remaining(sbuf);
+ if (rem < msgpack_len) {
+ pthread_cond_signal(&rcv->bg_cond);
pthread_mutex_unlock(&rcv->lock);
tries++;
retry = tries < HTRACED_MAX_ADD_TRIES;
htrace_log(lg, "htraced_rcv_add_span: not enough space in the "
- "circular buffer. Have %" PRId64 ", need %d"
- ". %s...\n", (rcv->clen - used), json_len,
+ "current buffer. Have %" PRId64 ", need %"
+ PRId64 ". %s...\n", rem, msgpack_len,
(retry ? "Retrying" : "Giving up"));
if (retry) {
pthread_yield();
continue;
}
+ pthread_mutex_unlock(&rcv->lock);
return;
}
} while (0);
// OK, now we have the lock, and we know that there is enough space in the
- // circular buffer.
- rem = rcv->clen - rcv->cend;
- if (rem < json_len) {
- // Handle a 'torn write' where the circular buffer loops around to the
- // beginning in the middle of the write.
- char *temp = malloc(json_len);
- if (!temp) {
- htrace_log(lg, "htraced_rcv_add_span: failed to malloc %d byte "
- "buffer for torn write.\n", json_len);
- goto done;
- }
- span_json_sprintf(span, json_len, temp);
- temp[json_len - 1] = ',';
- memcpy(rcv->cbuf + rcv->cend, temp, rem);
- memcpy(rcv->cbuf, temp + rem, json_len - rem);
- rcv->cend = json_len - rem;
- free(temp);
- } else {
- span_json_sprintf(span, json_len, rcv->cbuf + rcv->cend);
- rcv->cbuf[rcv->cend + json_len - 1] = ',';
- rcv->cend += json_len;
+ // current buffer.
+ off = sbuf->off;
+ cmp_bcopy_ctx_init(&bctx, sbuf->buf + off, msgpack_len);
+ bctx.base.write = cmp_bcopy_write_nocheck_fn;
+ span_write_msgpack(span, (cmp_ctx_t*)&bctx);
+ off += msgpack_len;
+ sbuf->off = off;
+ sbuf->num_spans++;
+ if (off > rcv->send_threshold) {
+ pthread_cond_signal(&rcv->bg_cond);
}
- used += json_len;
- if (used > rcv->send_threshold) {
- pthread_cond_signal(&rcv->cond);
- }
-done:
pthread_mutex_unlock(&rcv->lock);
}
static void htraced_rcv_flush(struct htrace_rcv *r)
{
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
+ uint64_t now;
+ // Note: This assumes that we only flush one buffer at once, and
+ // that we flush buffers in order. If we revisit those assumptions we'll
+ // need to change this.
+ // The SpanReceiver flush is only used for testing anyway.
+ pthread_mutex_lock(&rcv->lock);
+ now = monotonic_now_ms(rcv->tracer->lg);
while (1) {
- pthread_mutex_lock(&rcv->lock);
- if (cbuf_used(rcv) == 0) {
- // If the buffer is empty, we're done.
- // Note that there is no guarantee that we'll ever be done if spans
- // are being added continuously throughout the flush. This is OK,
- // since flush() is actually only used by unit tests.
- // We could do something more clever here, but it would be a lot more
- // complex.
- pthread_mutex_unlock(&rcv->lock);
+ if (rcv->last_send_ms >= now) {
break;
}
- // Get the xmit thread to send what it can, by resetting the "last send
- // time" to the oldest possible monotonic time.
+ if (htraced_sbufs_empty(rcv)) {
+ break;
+ }
rcv->last_send_ms = 0;
- pthread_cond_signal(&rcv->cond);
- pthread_mutex_unlock(&rcv->lock);
+ pthread_cond_wait(&rcv->flush_cond, &rcv->lock);
}
+ pthread_mutex_unlock(&rcv->lock);
}
static void htraced_rcv_free(struct htrace_rcv *r)
{
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
struct htrace_log *lg;
- int ret;
+ int i, ret;
if (!rcv) {
return;
@@ -618,24 +689,30 @@
hrpc_client_get_endpoint(rcv->hcli));
pthread_mutex_lock(&rcv->lock);
rcv->shutdown = 1;
- pthread_cond_signal(&rcv->cond);
+ pthread_cond_signal(&rcv->bg_cond);
pthread_mutex_unlock(&rcv->lock);
ret = pthread_join(rcv->xmit_thread, NULL);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_join "
"error %d: %s\n", ret, terror(ret));
}
- free(rcv->cbuf);
- free(rcv->sbuf);
+ for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+ htraced_sbuf_free(rcv->sbuf[i]);
+ }
hrpc_client_free(rcv->hcli);
ret = pthread_mutex_destroy(&rcv->lock);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy "
"error %d: %s\n", ret, terror(ret));
}
- ret = pthread_cond_destroy(&rcv->cond);
+ ret = pthread_cond_destroy(&rcv->bg_cond);
if (ret) {
- htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy "
+ htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(bg_cond) "
+ "error %d: %s\n", ret, terror(ret));
+ }
+ ret = pthread_cond_destroy(&rcv->flush_cond);
+ if (ret) {
+ htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(flush_cond) "
"error %d: %s\n", ret, terror(ret));
}
free(rcv);
diff --git a/htrace-c/src/test/cmp_util-unit.c b/htrace-c/src/test/cmp_util-unit.c
new file mode 100644
index 0000000..bfe4edc
--- /dev/null
+++ b/htrace-c/src/test/cmp_util-unit.c
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/span.h"
+#include "test/span_util.h"
+#include "test/test.h"
+#include "util/cmp.h"
+#include "util/cmp_util.h"
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define NUM_TEST_SPANS 3
+#define TEST_BUF_LENGTH (8UL * 1024UL * 1024UL)
+
+static struct htrace_span **setup_test_spans(void)
+{
+ struct htrace_span **spans =
+ xcalloc(sizeof(struct htrace_span*) * NUM_TEST_SPANS);
+
+ spans[0] = xcalloc(sizeof(struct htrace_span));
+ spans[0]->desc = xstrdup("FirstSpan");
+ spans[0]->begin_ms = 1927;
+ spans[0]->end_ms = 2000;
+ spans[0]->span_id = 1;
+
+ spans[1] = xcalloc(sizeof(struct htrace_span));
+ spans[1]->desc = xstrdup("SecondSpan");
+ spans[1]->begin_ms = 1950;
+ spans[1]->end_ms = 2000;
+ spans[1]->span_id = 0xffffffffffffffffULL;
+ spans[1]->prid = xstrdup("SecondSpanProc");
+ spans[1]->num_parents = 1;
+ spans[1]->parent.single = 1;
+
+ spans[2] = xcalloc(sizeof(struct htrace_span));
+ spans[2]->desc = xstrdup("ThirdSpan");
+ spans[2]->begin_ms = 1969;
+ spans[2]->end_ms = 1997;
+ spans[2]->span_id = 0xcfcfcfcfcfcfcfcfULL;
+ spans[2]->prid = xstrdup("ThirdSpanProc");
+ spans[2]->num_parents = 2;
+ spans[2]->parent.list = xcalloc(sizeof(uint64_t) * 2);
+ spans[2]->parent.list[0] = 1;
+ spans[2]->parent.list[1] = 0xffffffffffffffffULL;
+
+ return spans;
+}
+
+static int serialize_test_spans(struct htrace_span **test_spans, cmp_ctx_t *ctx)
+{
+ int i;
+ for (i = 0; i < NUM_TEST_SPANS; i++) {
+ EXPECT_INT_EQ(1, span_write_msgpack(test_spans[i], ctx));
+ }
+ return EXIT_SUCCESS;
+}
+
+static int test_serialize_spans(struct htrace_span **test_spans)
+{
+ int i;
+ struct htrace_span *span;
+ struct cmp_counter_ctx cctx;
+ struct cmp_bcopy_ctx bctx;
+ char *buf;
+ char err[1024];
+ size_t err_len = sizeof(err);
+
+ cmp_counter_ctx_init(&cctx);
+ EXPECT_INT_ZERO(serialize_test_spans(test_spans, (cmp_ctx_t *)&cctx));
+
+ buf = xcalloc(TEST_BUF_LENGTH);
+ cmp_bcopy_ctx_init(&bctx, buf, TEST_BUF_LENGTH);
+ EXPECT_INT_ZERO(serialize_test_spans(test_spans, (cmp_ctx_t *)&bctx));
+ EXPECT_UINT64_EQ(cctx.count, bctx.off);
+ EXPECT_UINT64_EQ(TEST_BUF_LENGTH, bctx.len);
+
+ bctx.off = 0;
+ for (i = 0; i < NUM_TEST_SPANS; i++) {
+ span = span_read_msgpack((cmp_ctx_t*)&bctx, err, err_len);
+ EXPECT_STR_EQ("", err);
+ EXPECT_NONNULL(span);
+ EXPECT_INT_ZERO(span_compare(test_spans[i], span));
+ htrace_span_free(span);
+ }
+
+ free(buf);
+ return EXIT_SUCCESS;
+}
+
+int main(void)
+{
+ int i;
+ struct htrace_span **test_spans;
+
+ test_spans = setup_test_spans();
+ EXPECT_NONNULL(test_spans);
+ EXPECT_INT_ZERO(test_serialize_spans(test_spans));
+ for (i = 0; i < NUM_TEST_SPANS; i++) {
+ htrace_span_free(test_spans[i]);
+ }
+ free(test_spans);
+ return EXIT_SUCCESS;
+}
+
+// vim: ts=4:sw=4:tw=79:et
diff --git a/htrace-c/src/test/span_util.c b/htrace-c/src/test/span_util.c
index fc4b041..8a615b1 100644
--- a/htrace-c/src/test/span_util.c
+++ b/htrace-c/src/test/span_util.c
@@ -18,6 +18,7 @@
#include "core/span.h"
#include "test/span_util.h"
+#include "util/cmp.h"
#include "util/log.h"
#include <errno.h>
@@ -291,4 +292,194 @@
return compare_parents(a, b);
}
+static int span_read_key_str(struct cmp_ctx_s *ctx, char *out,
+ uint32_t out_len, char *err, size_t err_len)
+{
+ uint32_t size = 0;
+
+ err[0] = '\0';
+ if (!cmp_read_str_size(ctx, &size)) {
+ snprintf(err, err_len, "span_read_key_str: cmp_read_str_size failed.");
+ return 0;
+ }
+ if (size >= out_len) {
+ snprintf(err, err_len, "span_read_key_str: size of key string was "
+ "%"PRId32", but we can only handle key strings less than "
+ "%"PRId32" bytes.", size, out_len);
+ return 0;
+ }
+ if (!ctx->read(ctx, out, size)) {
+ snprintf(err, err_len, "span_read_key_str: ctx->read failed for "
+ "%"PRId32"-byte key string.", size);
+ return 0;
+ }
+ out[size] = '\0';
+ return 1;
+}
+
+static char *cmp_read_malloced_string(struct cmp_ctx_s *ctx, const char *what,
+ char *err, size_t err_len)
+{
+ uint32_t size = 0;
+ char *str;
+
+ err[0] = '\0';
+ if (!cmp_read_str_size(ctx, &size)) {
+ snprintf(err, err_len, "cmp_read_malloced_string: cmp_read_str_size "
+ "failed for %s.", what);
+ return NULL;
+ }
+ str = malloc(size + 1);
+ if (!str) {
+ snprintf(err, err_len, "cmp_read_malloced_string: failed to malloc "
+ "failed for %d-byte string for %s.", size + 1, what);
+ return NULL;
+ }
+ if (!ctx->read(ctx, str, size)) {
+ snprintf(err, err_len, "cmp_read_malloced_string: failed to read "
+ "%"PRId32"-byte string for %s.", size, what);
+ free(str);
+ return NULL;
+ }
+ str[size] = '\0';
+ return str;
+}
+
+static void span_parse_msgpack_parents(struct cmp_ctx_s *ctx,
+ struct htrace_span *span, char *err, size_t err_len)
+{
+ uint32_t i, size;
+
+ err[0] = '\0';
+ if (span->num_parents > 1) {
+ free(span->parent.list);
+ span->parent.list = NULL;
+ }
+ span->parent.single = 0;
+ span->num_parents = 0;
+ if (!cmp_read_array(ctx, &size)) {
+ snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_array "
+ "failed.");
+ return;
+ }
+ if (size == 1) {
+ if (!cmp_read_u64(ctx, &span->parent.single)) {
+ snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_u64 "
+ "for single child ID failed");
+ return;
+ }
+ } else if (size > 1) {
+ span->parent.list = malloc(sizeof(uint64_t) * size);
+ if (!span->parent.list) {
+ snprintf(err, err_len, "span_parse_msgpack_parents: failed to "
+ "malloc %"PRId32"-entry parent array.", size);
+ return;
+ }
+ for (i = 0; i < size; i++) {
+ if (!cmp_read_u64(ctx, &span->parent.list[i])) {
+ snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_u64 "
+ "for child %d ID failed", i);
+ free(span->parent.list);
+ span->parent.list = NULL;
+ return;
+ }
+ }
+ }
+ span->num_parents = size;
+}
+
+struct htrace_span *span_read_msgpack(struct cmp_ctx_s *ctx,
+ char *err, size_t err_len)
+{
+ struct htrace_span *span = NULL;
+ uint32_t map_size = 0;
+ char key[8];
+
+ err[0] = '\0';
+ span = calloc(1, sizeof(*span));
+ if (!span) {
+ snprintf(err, err_len, "span_read_msgpack: OOM allocating "
+ "htrace_span.");
+ goto error;
+ }
+ if (!cmp_read_map(ctx, &map_size)) {
+ snprintf(err, err_len, "span_read_msgpack: cmp_read_map failed to "
+ "read enclosing map object.\n");
+ goto error;
+ }
+ while (map_size > 0) {
+ if (!span_read_key_str(ctx, key, sizeof(key), err, err_len)) {
+ goto error;
+ }
+ switch (key[0]) {
+ case 'd':
+ if (span->desc) {
+ free(span->desc);
+ }
+ span->desc = cmp_read_malloced_string(ctx, "description",
+ err, err_len);
+ if (err[0]) {
+ goto error;
+ }
+ break;
+ case 'b':
+ if (!cmp_read_u64(ctx, &span->begin_ms)) {
+ snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 "
+ "failed for span->begin_ms.");
+ goto error;
+ }
+ break;
+ case 'e':
+ if (!cmp_read_u64(ctx, &span->end_ms)) {
+ snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 "
+ "failed for span->end_ms.");
+ goto error;
+ }
+ break;
+ case 's':
+ if (!cmp_read_u64(ctx, &span->span_id)) {
+ snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 "
+ "failed for span->span_id");
+ goto error;
+ }
+ break;
+ case 'r':
+ if (span->prid) {
+ free(span->prid);
+ }
+ span->prid = cmp_read_malloced_string(ctx, "process_id",
+ err, err_len);
+ if (err[0]) {
+ goto error;
+ }
+ break;
+ case 'p':
+ span_parse_msgpack_parents(ctx, span, err, err_len);
+ if (err[0]) {
+ goto error;
+ }
+ break;
+ default:
+ snprintf(err, err_len, "span_read_msgpack: can't understand key "
+ "'%s'.\n", key);
+ goto error;
+ }
+ map_size--;
+ }
+ // Description cannot be NULL.
+ if (!span->desc) {
+ span->desc = strdup("");
+ if (!span->desc) {
+ snprintf(err, err_len, "span_read_msgpack: OOM allocating empty "
+ "description string.");
+ goto error;
+ }
+ }
+ return span;
+
+error:
+ htrace_span_free(span);
+ return NULL;
+}
+
// vim:ts=4:sw=4:et
diff --git a/htrace-c/src/test/span_util.h b/htrace-c/src/test/span_util.h
index 393e213..8ab197f 100644
--- a/htrace-c/src/test/span_util.h
+++ b/htrace-c/src/test/span_util.h
@@ -22,6 +22,7 @@
#include <stdint.h>
#include <unistd.h> /* for size_t */
+struct cmp_ctx_s;
struct htrace_span;
/**
@@ -61,6 +62,19 @@
*/
int span_compare(struct htrace_span *a, struct htrace_span *b);
+/**
+ * Read a span from the provided CMP context.
+ *
+ * @param ctx The CMP context.
+ * @param err (out param) On error, where the error message will be
+ * written. Will be set to the empty string on success.
+ * @param err_len The length of the error buffer. Must be nonzero.
+ *
+ * @return The span on success; NULL otherwise.
+ */
+struct htrace_span *span_read_msgpack(struct cmp_ctx_s *ctx,
+ char *err, size_t err_len);
+
#endif
// vim:ts=4:sw=4:et
diff --git a/htrace-c/src/util/cmp.c b/htrace-c/src/util/cmp.c
new file mode 100644
index 0000000..9d3101f
--- /dev/null
+++ b/htrace-c/src/util/cmp.c
@@ -0,0 +1,2674 @@
+/*
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2014 Charles Gunyon
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "cmp.h"
+
+#include <stddef.h>
+#include <stdint.h>
+#include <unistd.h>
+
+static const uint32_t version = 10;
+static const uint32_t mp_version = 5;
+
+enum {
+ POSITIVE_FIXNUM_MARKER = 0x00,
+ FIXMAP_MARKER = 0x80,
+ FIXARRAY_MARKER = 0x90,
+ FIXSTR_MARKER = 0xA0,
+ NIL_MARKER = 0xC0,
+ FALSE_MARKER = 0xC2,
+ TRUE_MARKER = 0xC3,
+ BIN8_MARKER = 0xC4,
+ BIN16_MARKER = 0xC5,
+ BIN32_MARKER = 0xC6,
+ EXT8_MARKER = 0xC7,
+ EXT16_MARKER = 0xC8,
+ EXT32_MARKER = 0xC9,
+ FLOAT_MARKER = 0xCA,
+ DOUBLE_MARKER = 0xCB,
+ U8_MARKER = 0xCC,
+ U16_MARKER = 0xCD,
+ U32_MARKER = 0xCE,
+ U64_MARKER = 0xCF,
+ S8_MARKER = 0xD0,
+ S16_MARKER = 0xD1,
+ S32_MARKER = 0xD2,
+ S64_MARKER = 0xD3,
+ FIXEXT1_MARKER = 0xD4,
+ FIXEXT2_MARKER = 0xD5,
+ FIXEXT4_MARKER = 0xD6,
+ FIXEXT8_MARKER = 0xD7,
+ FIXEXT16_MARKER = 0xD8,
+ STR8_MARKER = 0xD9,
+ STR16_MARKER = 0xDA,
+ STR32_MARKER = 0xDB,
+ ARRAY16_MARKER = 0xDC,
+ ARRAY32_MARKER = 0xDD,
+ MAP16_MARKER = 0xDE,
+ MAP32_MARKER = 0xDF,
+ NEGATIVE_FIXNUM_MARKER = 0xE0
+};
+
+enum {
+ FIXARRAY_SIZE = 0xF,
+ FIXMAP_SIZE = 0xF,
+ FIXSTR_SIZE = 0x1F
+};
+
+enum {
+ ERROR_NONE,
+ STR_DATA_LENGTH_TOO_LONG_ERROR,
+ BIN_DATA_LENGTH_TOO_LONG_ERROR,
+ ARRAY_LENGTH_TOO_LONG_ERROR,
+ MAP_LENGTH_TOO_LONG_ERROR,
+ INPUT_VALUE_TOO_LARGE_ERROR,
+ FIXED_VALUE_WRITING_ERROR,
+ TYPE_MARKER_READING_ERROR,
+ TYPE_MARKER_WRITING_ERROR,
+ DATA_READING_ERROR,
+ DATA_WRITING_ERROR,
+ EXT_TYPE_READING_ERROR,
+ EXT_TYPE_WRITING_ERROR,
+ INVALID_TYPE_ERROR,
+ LENGTH_READING_ERROR,
+ LENGTH_WRITING_ERROR,
+ ERROR_MAX
+};
+
+const char *cmp_error_messages[ERROR_MAX + 1] = {
+ "No Error",
+ "Specified string data length is too long (> 0xFFFFFFFF)",
+ "Specified binary data length is too long (> 0xFFFFFFFF)",
+ "Specified array length is too long (> 0xFFFFFFFF)",
+ "Specified map length is too long (> 0xFFFFFFFF)",
+ "Input value is too large",
+ "Error writing fixed value",
+ "Error reading type marker",
+ "Error writing type marker",
+ "Error reading packed data",
+ "Error writing packed data",
+ "Error reading ext type",
+ "Error writing ext type",
+ "Invalid type",
+ "Error reading size",
+ "Error writing size",
+ "Max Error"
+};
+
+static const int32_t _i = 1;
+#define is_bigendian() ((*(char *)&_i) == 0)
+
+static uint16_t be16(uint16_t x) {
+ char *b = (char *)&x;
+
+ if (!is_bigendian()) {
+ char swap = 0;
+
+ swap = b[0];
+ b[0] = b[1];
+ b[1] = swap;
+ }
+
+ return x;
+}
+
+static uint32_t be32(uint32_t x) {
+ char *b = (char *)&x;
+
+ if (!is_bigendian()) {
+ char swap = 0;
+
+ swap = b[0];
+ b[0] = b[3];
+ b[3] = swap;
+
+ swap = b[1];
+ b[1] = b[2];
+ b[2] = swap;
+ }
+
+ return x;
+}
+
+static uint64_t be64(uint64_t x) {
+ char *b = (char *)&x;
+
+ if (!is_bigendian()) {
+ char swap = 0;
+
+ swap = b[0];
+ b[0] = b[7];
+ b[7] = swap;
+
+ swap = b[1];
+ b[1] = b[6];
+ b[6] = swap;
+
+ swap = b[2];
+ b[2] = b[5];
+ b[5] = swap;
+
+ swap = b[3];
+ b[3] = b[4];
+ b[4] = swap;
+ }
+
+ return x;
+}
+
+static float befloat(float x) {
+ char *b = (char *)&x;
+
+ if (!is_bigendian()) {
+ char swap = 0;
+
+ swap = b[0];
+ b[0] = b[3];
+ b[3] = swap;
+
+ swap = b[1];
+ b[1] = b[2];
+ b[2] = swap;
+ }
+
+ return x;
+}
+
+static double bedouble(double x) {
+ char *b = (char *)&x;
+
+ if (!is_bigendian()) {
+ char swap = 0;
+
+ swap = b[0];
+ b[0] = b[7];
+ b[7] = swap;
+
+ swap = b[1];
+ b[1] = b[6];
+ b[6] = swap;
+
+ swap = b[2];
+ b[2] = b[5];
+ b[5] = swap;
+
+ swap = b[3];
+ b[3] = b[4];
+ b[4] = swap;
+ }
+
+ return x;
+}
+
+static bool read_byte(cmp_ctx_t *ctx, uint8_t *x) {
+ return ctx->read(ctx, x, sizeof(uint8_t));
+}
+
+static bool write_byte(cmp_ctx_t *ctx, uint8_t x) {
+ return (ctx->write(ctx, &x, sizeof(uint8_t)) == (sizeof(uint8_t)));
+}
+
+static bool read_type_marker(cmp_ctx_t *ctx, uint8_t *marker) {
+ if (read_byte(ctx, marker))
+ return true;
+
+ ctx->error = TYPE_MARKER_READING_ERROR;
+ return false;
+}
+
+static bool write_type_marker(cmp_ctx_t *ctx, uint8_t marker) {
+ if (write_byte(ctx, marker))
+ return true;
+
+ ctx->error = TYPE_MARKER_WRITING_ERROR;
+ return false;
+}
+
+static bool write_fixed_value(cmp_ctx_t *ctx, uint8_t value) {
+ if (write_byte(ctx, value))
+ return true;
+
+ ctx->error = FIXED_VALUE_WRITING_ERROR;
+ return false;
+}
+
+void cmp_init(cmp_ctx_t *ctx, void *buf, cmp_reader read, cmp_writer write) {
+ ctx->error = ERROR_NONE;
+ ctx->buf = buf;
+ ctx->read = read;
+ ctx->write = write;
+}
+
+uint32_t cmp_version(void) {
+ return version;
+}
+
+uint32_t cmp_mp_version(void) {
+ return mp_version;
+}
+
+const char* cmp_strerror(cmp_ctx_t *ctx) {
+ if (ctx->error > ERROR_NONE && ctx->error < ERROR_MAX)
+ return cmp_error_messages[ctx->error];
+
+ return "";
+}
+
+bool cmp_write_pfix(cmp_ctx_t *ctx, uint8_t c) {
+ if (c <= 0x7F)
+ return write_fixed_value(ctx, c);
+
+ ctx->error = INPUT_VALUE_TOO_LARGE_ERROR;
+ return false;
+}
+
+bool cmp_write_nfix(cmp_ctx_t *ctx, int8_t c) {
+ if (c >= -32 && c <= -1)
+ return write_fixed_value(ctx, c);
+
+ ctx->error = INPUT_VALUE_TOO_LARGE_ERROR;
+ return false;
+}
+
+bool cmp_write_sfix(cmp_ctx_t *ctx, int8_t c) {
+ if (c >= 0)
+ return cmp_write_pfix(ctx, c);
+ if (c >= -32 && c <= -1)
+ return cmp_write_nfix(ctx, c);
+
+ ctx->error = INPUT_VALUE_TOO_LARGE_ERROR;
+ return false;
+}
+
+bool cmp_write_s8(cmp_ctx_t *ctx, int8_t c) {
+ if (!write_type_marker(ctx, S8_MARKER))
+ return false;
+
+ return ctx->write(ctx, &c, sizeof(int8_t));
+}
+
+bool cmp_write_s16(cmp_ctx_t *ctx, int16_t s) {
+ if (!write_type_marker(ctx, S16_MARKER))
+ return false;
+
+ s = be16(s);
+
+ return ctx->write(ctx, &s, sizeof(int16_t));
+}
+
+bool cmp_write_s32(cmp_ctx_t *ctx, int32_t i) {
+ if (!write_type_marker(ctx, S32_MARKER))
+ return false;
+
+ i = be32(i);
+
+ return ctx->write(ctx, &i, sizeof(int32_t));
+}
+
+bool cmp_write_s64(cmp_ctx_t *ctx, int64_t l) {
+ if (!write_type_marker(ctx, S64_MARKER))
+ return false;
+
+ l = be64(l);
+
+ return ctx->write(ctx, &l, sizeof(int64_t));
+}
+
+bool cmp_write_sint(cmp_ctx_t *ctx, int64_t d) {
+ if (d >= 0)
+ return cmp_write_uint(ctx, d);
+ if (d >= -32)
+ return cmp_write_nfix(ctx, d);
+ if (d >= -128)
+ return cmp_write_s8(ctx, d);
+ if (d >= -32768)
+ return cmp_write_s16(ctx, d);
+ if (d >= (-2147483647 - 1))
+ return cmp_write_s32(ctx, (int32_t) d);
+
+ return cmp_write_s64(ctx, d);
+}
+
+bool cmp_write_ufix(cmp_ctx_t *ctx, uint8_t c) {
+ return cmp_write_pfix(ctx, c);
+}
+
+bool cmp_write_u8(cmp_ctx_t *ctx, uint8_t c) {
+ if (!write_type_marker(ctx, U8_MARKER))
+ return false;
+
+ return ctx->write(ctx, &c, sizeof(uint8_t));
+}
+
+bool cmp_write_u16(cmp_ctx_t *ctx, uint16_t s) {
+ if (!write_type_marker(ctx, U16_MARKER))
+ return false;
+
+ s = be16(s);
+
+ return ctx->write(ctx, &s, sizeof(uint16_t));
+}
+
+bool cmp_write_u32(cmp_ctx_t *ctx, uint32_t i) {
+ if (!write_type_marker(ctx, U32_MARKER))
+ return false;
+
+ i = be32(i);
+
+ return ctx->write(ctx, &i, sizeof(uint32_t));
+}
+
+bool cmp_write_u64(cmp_ctx_t *ctx, uint64_t l) {
+ if (!write_type_marker(ctx, U64_MARKER))
+ return false;
+
+ l = be64(l);
+
+ return ctx->write(ctx, &l, sizeof(uint64_t));
+}
+
+bool cmp_write_uint(cmp_ctx_t *ctx, uint64_t u) {
+ if (u <= 0x7F)
+ return cmp_write_pfix(ctx, u);
+ if (u <= 0xFF)
+ return cmp_write_u8(ctx, u);
+ if (u <= 0xFFFF)
+ return cmp_write_u16(ctx, u);
+ if (u <= 0xFFFFFFFF)
+ return cmp_write_u32(ctx, (uint32_t) u);
+
+ return cmp_write_u64(ctx, u);
+}
+
+bool cmp_write_float(cmp_ctx_t *ctx, float f) {
+ if (!write_type_marker(ctx, FLOAT_MARKER))
+ return false;
+
+ f = befloat(f);
+
+ return ctx->write(ctx, &f, sizeof(float));
+}
+
+bool cmp_write_double(cmp_ctx_t *ctx, double d) {
+ if (!write_type_marker(ctx, DOUBLE_MARKER))
+ return false;
+
+ d = bedouble(d);
+
+ return ctx->write(ctx, &d, sizeof(double));
+}
+
+bool cmp_write_nil(cmp_ctx_t *ctx) {
+ return write_type_marker(ctx, NIL_MARKER);
+}
+
+bool cmp_write_true(cmp_ctx_t *ctx) {
+ return write_type_marker(ctx, TRUE_MARKER);
+}
+
+bool cmp_write_false(cmp_ctx_t *ctx) {
+ return write_type_marker(ctx, FALSE_MARKER);
+}
+
+bool cmp_write_bool(cmp_ctx_t *ctx, bool b) {
+ if (b)
+ return cmp_write_true(ctx);
+
+ return cmp_write_false(ctx);
+}
+
+bool cmp_write_u8_as_bool(cmp_ctx_t *ctx, uint8_t b) {
+ if (b)
+ return cmp_write_true(ctx);
+
+ return cmp_write_false(ctx);
+}
+
+bool cmp_write_fixstr_marker(cmp_ctx_t *ctx, uint8_t size) {
+ if (size <= FIXSTR_SIZE)
+ return write_fixed_value(ctx, FIXSTR_MARKER | size);
+
+ ctx->error = INPUT_VALUE_TOO_LARGE_ERROR;
+ return false;
+}
+
+bool cmp_write_fixstr(cmp_ctx_t *ctx, const char *data, uint8_t size) {
+ if (!cmp_write_fixstr_marker(ctx, size))
+ return false;
+
+ if (size == 0)
+ return true;
+
+ if (ctx->write(ctx, data, size))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_str8_marker(cmp_ctx_t *ctx, uint8_t size) {
+ if (!write_type_marker(ctx, STR8_MARKER))
+ return false;
+
+ if (ctx->write(ctx, &size, sizeof(uint8_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_str8(cmp_ctx_t *ctx, const char *data, uint8_t size) {
+ if (!cmp_write_str8_marker(ctx, size))
+ return false;
+
+ if (size == 0)
+ return true;
+
+ if (ctx->write(ctx, data, size))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_str16_marker(cmp_ctx_t *ctx, uint16_t size) {
+ if (!write_type_marker(ctx, STR16_MARKER))
+ return false;
+
+ size = be16(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint16_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_str16(cmp_ctx_t *ctx, const char *data, uint16_t size) {
+ if (!cmp_write_str16_marker(ctx, size))
+ return false;
+
+ if (size == 0)
+ return true;
+
+ if (ctx->write(ctx, data, size))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_str32_marker(cmp_ctx_t *ctx, uint32_t size) {
+ if (!write_type_marker(ctx, STR32_MARKER))
+ return false;
+
+ size = be32(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint32_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_str32(cmp_ctx_t *ctx, const char *data, uint32_t size) {
+ if (!cmp_write_str32_marker(ctx, size))
+ return false;
+
+ if (size == 0)
+ return true;
+
+ if (ctx->write(ctx, data, size))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_str_marker(cmp_ctx_t *ctx, uint32_t size) {
+ if (size <= FIXSTR_SIZE)
+ return cmp_write_fixstr_marker(ctx, size);
+ if (size <= 0xFF)
+ return cmp_write_str8_marker(ctx, size);
+ if (size <= 0xFFFF)
+ return cmp_write_str16_marker(ctx, size);
+
+ return cmp_write_str32_marker(ctx, size);
+}
+
+bool cmp_write_str(cmp_ctx_t *ctx, const char *data, uint32_t size) {
+ if (size <= FIXSTR_SIZE)
+ return cmp_write_fixstr(ctx, data, size);
+ if (size <= 0xFF)
+ return cmp_write_str8(ctx, data, size);
+ if (size <= 0xFFFF)
+ return cmp_write_str16(ctx, data, size);
+
+ return cmp_write_str32(ctx, data, size);
+}
+
+bool cmp_write_bin8_marker(cmp_ctx_t *ctx, uint8_t size) {
+ if (!write_type_marker(ctx, BIN8_MARKER))
+ return false;
+
+ if (ctx->write(ctx, &size, sizeof(uint8_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_bin8(cmp_ctx_t *ctx, const void *data, uint8_t size) {
+ if (!cmp_write_bin8_marker(ctx, size))
+ return false;
+
+ if (size == 0)
+ return true;
+
+ if (ctx->write(ctx, data, size))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_bin16_marker(cmp_ctx_t *ctx, uint16_t size) {
+ if (!write_type_marker(ctx, BIN16_MARKER))
+ return false;
+
+ size = be16(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint16_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_bin16(cmp_ctx_t *ctx, const void *data, uint16_t size) {
+ if (!cmp_write_bin16_marker(ctx, size))
+ return false;
+
+ if (size == 0)
+ return true;
+
+ if (ctx->write(ctx, data, size))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_bin32_marker(cmp_ctx_t *ctx, uint32_t size) {
+ if (!write_type_marker(ctx, BIN32_MARKER))
+ return false;
+
+ size = be32(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint32_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_bin32(cmp_ctx_t *ctx, const void *data, uint32_t size) {
+ if (!cmp_write_bin32_marker(ctx, size))
+ return false;
+
+ if (size == 0)
+ return true;
+
+ if (ctx->write(ctx, data, size))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_bin_marker(cmp_ctx_t *ctx, uint32_t size) {
+ if (size <= 0xFF)
+ return cmp_write_bin8_marker(ctx, size);
+ if (size <= 0xFFFF)
+ return cmp_write_bin16_marker(ctx, size);
+
+ return cmp_write_bin32_marker(ctx, size);
+}
+
+bool cmp_write_bin(cmp_ctx_t *ctx, const void *data, uint32_t size) {
+ if (size <= 0xFF)
+ return cmp_write_bin8(ctx, data, size);
+ if (size <= 0xFFFF)
+ return cmp_write_bin16(ctx, data, size);
+
+ return cmp_write_bin32(ctx, data, size);
+}
+
+bool cmp_write_fixarray(cmp_ctx_t *ctx, uint8_t size) {
+ if (size <= FIXARRAY_SIZE)
+ return write_fixed_value(ctx, FIXARRAY_MARKER | size);
+
+ ctx->error = INPUT_VALUE_TOO_LARGE_ERROR;
+ return false;
+}
+
+bool cmp_write_array16(cmp_ctx_t *ctx, uint16_t size) {
+ if (!write_type_marker(ctx, ARRAY16_MARKER))
+ return false;
+
+ size = be16(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint16_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_array32(cmp_ctx_t *ctx, uint32_t size) {
+ if (!write_type_marker(ctx, ARRAY32_MARKER))
+ return false;
+
+ size = be32(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint32_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_array(cmp_ctx_t *ctx, uint32_t size) {
+ if (size <= FIXARRAY_SIZE)
+ return cmp_write_fixarray(ctx, size);
+ if (size <= 0xFFFF)
+ return cmp_write_array16(ctx, size);
+
+ return cmp_write_array32(ctx, size);
+}
+
+bool cmp_write_fixmap(cmp_ctx_t *ctx, uint8_t size) {
+ if (size <= FIXMAP_SIZE)
+ return write_fixed_value(ctx, FIXMAP_MARKER | size);
+
+ ctx->error = INPUT_VALUE_TOO_LARGE_ERROR;
+ return false;
+}
+
+bool cmp_write_map16(cmp_ctx_t *ctx, uint16_t size) {
+ if (!write_type_marker(ctx, MAP16_MARKER))
+ return false;
+
+ size = be16(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint16_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_map32(cmp_ctx_t *ctx, uint32_t size) {
+ if (!write_type_marker(ctx, MAP32_MARKER))
+ return false;
+
+ size = be32(size);
+
+ if (ctx->write(ctx, &size, sizeof(uint32_t)))
+ return true;
+
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_map(cmp_ctx_t *ctx, uint32_t size) {
+ if (size <= FIXMAP_SIZE)
+ return cmp_write_fixmap(ctx, size);
+ if (size <= 0xFFFF)
+ return cmp_write_map16(ctx, size);
+
+ return cmp_write_map32(ctx, size);
+}
+
+bool cmp_write_fixext1_marker(cmp_ctx_t *ctx, int8_t type) {
+ if (!write_type_marker(ctx, FIXEXT1_MARKER))
+ return false;
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext1(cmp_ctx_t *ctx, int8_t type, const void *data) {
+ if (!cmp_write_fixext1_marker(ctx, type))
+ return false;
+
+ if (ctx->write(ctx, data, 1))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext2_marker(cmp_ctx_t *ctx, int8_t type) {
+ if (!write_type_marker(ctx, FIXEXT2_MARKER))
+ return false;
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext2(cmp_ctx_t *ctx, int8_t type, const void *data) {
+ if (!cmp_write_fixext2_marker(ctx, type))
+ return false;
+
+ if (ctx->write(ctx, data, 2))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext4_marker(cmp_ctx_t *ctx, int8_t type) {
+ if (!write_type_marker(ctx, FIXEXT4_MARKER))
+ return false;
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext4(cmp_ctx_t *ctx, int8_t type, const void *data) {
+ if (!cmp_write_fixext4_marker(ctx, type))
+ return false;
+
+ if (ctx->write(ctx, data, 4))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext8_marker(cmp_ctx_t *ctx, int8_t type) {
+ if (!write_type_marker(ctx, FIXEXT8_MARKER))
+ return false;
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext8(cmp_ctx_t *ctx, int8_t type, const void *data) {
+ if (!cmp_write_fixext8_marker(ctx, type))
+ return false;
+
+ if (ctx->write(ctx, data, 8))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext16_marker(cmp_ctx_t *ctx, int8_t type) {
+ if (!write_type_marker(ctx, FIXEXT16_MARKER))
+ return false;
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_fixext16(cmp_ctx_t *ctx, int8_t type, const void *data) {
+ if (!cmp_write_fixext16_marker(ctx, type))
+ return false;
+
+ if (ctx->write(ctx, data, 16))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_ext8_marker(cmp_ctx_t *ctx, int8_t type, uint8_t size) {
+ if (!write_type_marker(ctx, EXT8_MARKER))
+ return false;
+
+ if (!ctx->write(ctx, &size, sizeof(uint8_t))) {
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+ }
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_ext8(cmp_ctx_t *ctx, int8_t tp, uint8_t sz, const void *data) {
+ if (!cmp_write_ext8_marker(ctx, tp, sz))
+ return false;
+
+ if (ctx->write(ctx, data, sz))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_ext16_marker(cmp_ctx_t *ctx, int8_t type, uint16_t size) {
+ if (!write_type_marker(ctx, EXT16_MARKER))
+ return false;
+
+ size = be16(size);
+
+ if (!ctx->write(ctx, &size, sizeof(uint16_t))) {
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+ }
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_ext16(cmp_ctx_t *ctx, int8_t tp, uint16_t sz, const void *data) {
+ if (!cmp_write_ext16_marker(ctx, tp, sz))
+ return false;
+
+ if (ctx->write(ctx, data, sz))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_ext32_marker(cmp_ctx_t *ctx, int8_t type, uint32_t size) {
+ if (!write_type_marker(ctx, EXT32_MARKER))
+ return false;
+
+ size = be32(size);
+
+ if (!ctx->write(ctx, &size, sizeof(uint32_t))) {
+ ctx->error = LENGTH_WRITING_ERROR;
+ return false;
+ }
+
+ if (ctx->write(ctx, &type, sizeof(int8_t)))
+ return true;
+
+ ctx->error = EXT_TYPE_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_ext32(cmp_ctx_t *ctx, int8_t tp, uint32_t sz, const void *data) {
+ if (!cmp_write_ext32_marker(ctx, tp, sz))
+ return false;
+
+ if (ctx->write(ctx, data, sz))
+ return true;
+
+ ctx->error = DATA_WRITING_ERROR;
+ return false;
+}
+
+bool cmp_write_ext_marker(cmp_ctx_t *ctx, int8_t tp, uint32_t sz) {
+ if (sz == 1)
+ return cmp_write_fixext1_marker(ctx, tp);
+ if (sz == 2)
+ return cmp_write_fixext2_marker(ctx, tp);
+ if (sz == 4)
+ return cmp_write_fixext4_marker(ctx, tp);
+ if (sz == 8)
+ return cmp_write_fixext8_marker(ctx, tp);
+ if (sz == 16)
+ return cmp_write_fixext16_marker(ctx, tp);
+ if (sz <= 0xFF)
+ return cmp_write_ext8_marker(ctx, tp, sz);
+ if (sz <= 0xFFFF)
+ return cmp_write_ext16_marker(ctx, tp, sz);
+
+ return cmp_write_ext32_marker(ctx, tp, sz);
+}
+
+bool cmp_write_ext(cmp_ctx_t *ctx, int8_t tp, uint32_t sz, const void *data) {
+ if (sz == 1)
+ return cmp_write_fixext1(ctx, tp, data);
+ if (sz == 2)
+ return cmp_write_fixext2(ctx, tp, data);
+ if (sz == 4)
+ return cmp_write_fixext4(ctx, tp, data);
+ if (sz == 8)
+ return cmp_write_fixext8(ctx, tp, data);
+ if (sz == 16)
+ return cmp_write_fixext16(ctx, tp, data);
+ if (sz <= 0xFF)
+ return cmp_write_ext8(ctx, tp, sz, data);
+ if (sz <= 0xFFFF)
+ return cmp_write_ext16(ctx, tp, sz, data);
+
+ return cmp_write_ext32(ctx, tp, sz, data);
+}
+
+bool cmp_write_object(cmp_ctx_t *ctx, cmp_object_t *obj) {
+ switch(obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ return cmp_write_pfix(ctx, obj->as.u8);
+ case CMP_TYPE_FIXMAP:
+ return cmp_write_fixmap(ctx, obj->as.map_size);
+ case CMP_TYPE_FIXARRAY:
+ return cmp_write_fixarray(ctx, obj->as.array_size);
+ case CMP_TYPE_FIXSTR:
+ return cmp_write_fixstr_marker(ctx, obj->as.str_size);
+ case CMP_TYPE_NIL:
+ return cmp_write_nil(ctx);
+ case CMP_TYPE_BOOLEAN:
+ if (obj->as.boolean)
+ return cmp_write_true(ctx);
+ return cmp_write_false(ctx);
+ case CMP_TYPE_BIN8:
+ return cmp_write_bin8_marker(ctx, obj->as.bin_size);
+ case CMP_TYPE_BIN16:
+ return cmp_write_bin16_marker(ctx, obj->as.bin_size);
+ case CMP_TYPE_BIN32:
+ return cmp_write_bin32_marker(ctx, obj->as.bin_size);
+ case CMP_TYPE_EXT8:
+ return cmp_write_ext8_marker(ctx, obj->as.ext.type, obj->as.ext.size);
+ case CMP_TYPE_EXT16:
+ return cmp_write_ext16_marker(ctx, obj->as.ext.type, obj->as.ext.size);
+ case CMP_TYPE_EXT32:
+ return cmp_write_ext32_marker(ctx, obj->as.ext.type, obj->as.ext.size);
+ case CMP_TYPE_FLOAT:
+ return cmp_write_float(ctx, obj->as.flt);
+ case CMP_TYPE_DOUBLE:
+ return cmp_write_double(ctx, obj->as.dbl);
+ case CMP_TYPE_UINT8:
+ return cmp_write_u8(ctx, obj->as.u8);
+ case CMP_TYPE_UINT16:
+ return cmp_write_u16(ctx, obj->as.u16);
+ case CMP_TYPE_UINT32:
+ return cmp_write_u32(ctx, obj->as.u32);
+ case CMP_TYPE_UINT64:
+ return cmp_write_u64(ctx, obj->as.u64);
+ case CMP_TYPE_SINT8:
+ return cmp_write_s8(ctx, obj->as.s8);
+ case CMP_TYPE_SINT16:
+ return cmp_write_s16(ctx, obj->as.s16);
+ case CMP_TYPE_SINT32:
+ return cmp_write_s32(ctx, obj->as.s32);
+ case CMP_TYPE_SINT64:
+ return cmp_write_s64(ctx, obj->as.s64);
+ case CMP_TYPE_FIXEXT1:
+ return cmp_write_fixext1_marker(ctx, obj->as.ext.type);
+ case CMP_TYPE_FIXEXT2:
+ return cmp_write_fixext2_marker(ctx, obj->as.ext.type);
+ case CMP_TYPE_FIXEXT4:
+ return cmp_write_fixext4_marker(ctx, obj->as.ext.type);
+ case CMP_TYPE_FIXEXT8:
+ return cmp_write_fixext8_marker(ctx, obj->as.ext.type);
+ case CMP_TYPE_FIXEXT16:
+ return cmp_write_fixext16_marker(ctx, obj->as.ext.type);
+ case CMP_TYPE_STR8:
+ return cmp_write_str8_marker(ctx, obj->as.str_size);
+ case CMP_TYPE_STR16:
+ return cmp_write_str16_marker(ctx, obj->as.str_size);
+ case CMP_TYPE_STR32:
+ return cmp_write_str32_marker(ctx, obj->as.str_size);
+ case CMP_TYPE_ARRAY16:
+ return cmp_write_array16(ctx, obj->as.array_size);
+ case CMP_TYPE_ARRAY32:
+ return cmp_write_array32(ctx, obj->as.array_size);
+ case CMP_TYPE_MAP16:
+ return cmp_write_map16(ctx, obj->as.map_size);
+ case CMP_TYPE_MAP32:
+ return cmp_write_map32(ctx, obj->as.map_size);
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ return cmp_write_nfix(ctx, obj->as.s8);
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_pfix(cmp_ctx_t *ctx, uint8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_POSITIVE_FIXNUM) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *c = obj.as.u8;
+ return true;
+}
+
+bool cmp_read_nfix(cmp_ctx_t *ctx, int8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_NEGATIVE_FIXNUM) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *c = obj.as.s8;
+ return true;
+}
+
+bool cmp_read_sfix(cmp_ctx_t *ctx, int8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ *c = obj.as.s8;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_s8(cmp_ctx_t *ctx, int8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_SINT8) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *c = obj.as.s8;
+ return true;
+}
+
+bool cmp_read_s16(cmp_ctx_t *ctx, int16_t *s) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_SINT16) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *s = obj.as.s16;
+ return true;
+}
+
+bool cmp_read_s32(cmp_ctx_t *ctx, int32_t *i) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_SINT32) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *i = obj.as.s32;
+ return true;
+}
+
+bool cmp_read_s64(cmp_ctx_t *ctx, int64_t *l) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_SINT64) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *l = obj.as.s64;
+ return true;
+}
+
+bool cmp_read_char(cmp_ctx_t *ctx, int8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *c = obj.as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ if (obj.as.u8 <= 127) {
+ *c = obj.as.u8;
+ return true;
+ }
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_short(cmp_ctx_t *ctx, int16_t *s) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *s = obj.as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ *s = obj.as.u8;
+ return true;
+ case CMP_TYPE_SINT16:
+ *s = obj.as.s16;
+ return true;
+ case CMP_TYPE_UINT16:
+ if (obj.as.u16 <= 32767) {
+ *s = obj.as.u16;
+ return true;
+ }
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_int(cmp_ctx_t *ctx, int32_t *i) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *i = obj.as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ *i = obj.as.u8;
+ return true;
+ case CMP_TYPE_SINT16:
+ *i = obj.as.s16;
+ return true;
+ case CMP_TYPE_UINT16:
+ *i = obj.as.u16;
+ return true;
+ case CMP_TYPE_SINT32:
+ *i = obj.as.s32;
+ return true;
+ case CMP_TYPE_UINT32:
+ if (obj.as.u32 <= 2147483647) {
+ *i = obj.as.u32;
+ return true;
+ }
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_long(cmp_ctx_t *ctx, int64_t *d) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *d = obj.as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ *d = obj.as.u8;
+ return true;
+ case CMP_TYPE_SINT16:
+ *d = obj.as.s16;
+ return true;
+ case CMP_TYPE_UINT16:
+ *d = obj.as.u16;
+ return true;
+ case CMP_TYPE_SINT32:
+ *d = obj.as.s32;
+ return true;
+ case CMP_TYPE_UINT32:
+ *d = obj.as.u32;
+ return true;
+ case CMP_TYPE_SINT64:
+ *d = obj.as.s64;
+ return true;
+ case CMP_TYPE_UINT64:
+ if (obj.as.u64 <= 9223372036854775807) {
+ *d = obj.as.u64;
+ return true;
+ }
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_sinteger(cmp_ctx_t *ctx, int64_t *d) {
+ return cmp_read_long(ctx, d);
+}
+
+bool cmp_read_ufix(cmp_ctx_t *ctx, uint8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_NEGATIVE_FIXNUM) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *c = obj.as.u8;
+ return true;
+}
+
+bool cmp_read_u8(cmp_ctx_t *ctx, uint8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_UINT8) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *c = obj.as.u8;
+ return true;
+}
+
+bool cmp_read_u16(cmp_ctx_t *ctx, uint16_t *s) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_UINT16) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *s = obj.as.u16;
+ return true;
+}
+
+bool cmp_read_u32(cmp_ctx_t *ctx, uint32_t *i) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_UINT32) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *i = obj.as.u32;
+ return true;
+}
+
+bool cmp_read_u64(cmp_ctx_t *ctx, uint64_t *l) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_UINT64) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *l = obj.as.u64;
+ return true;
+}
+
+bool cmp_read_uchar(cmp_ctx_t *ctx, uint8_t *c) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *c = obj.as.u8;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_ushort(cmp_ctx_t *ctx, uint16_t *s) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *s = obj.as.u8;
+ return true;
+ case CMP_TYPE_UINT16:
+ *s = obj.as.u16;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_uint(cmp_ctx_t *ctx, uint32_t *i) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *i = obj.as.u8;
+ return true;
+ case CMP_TYPE_UINT16:
+ *i = obj.as.u16;
+ return true;
+ case CMP_TYPE_UINT32:
+ *i = obj.as.u32;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_ulong(cmp_ctx_t *ctx, uint64_t *u) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *u = obj.as.u8;
+ return true;
+ case CMP_TYPE_UINT16:
+ *u = obj.as.u16;
+ return true;
+ case CMP_TYPE_UINT32:
+ *u = obj.as.u32;
+ return true;
+ case CMP_TYPE_UINT64:
+ *u = obj.as.u64;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_uinteger(cmp_ctx_t *ctx, uint64_t *d) {
+ return cmp_read_ulong(ctx, d);
+}
+
+bool cmp_read_float(cmp_ctx_t *ctx, float *f) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_FLOAT) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *f = obj.as.flt;
+
+ return true;
+}
+
+bool cmp_read_double(cmp_ctx_t *ctx, double *d) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_DOUBLE) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *d = obj.as.dbl;
+
+ return true;
+}
+
+bool cmp_read_nil(cmp_ctx_t *ctx) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type == CMP_TYPE_NIL)
+ return true;
+
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+}
+
+bool cmp_read_bool(cmp_ctx_t *ctx, bool *b) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_BOOLEAN) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ if (obj.as.boolean)
+ *b = true;
+ else
+ *b = false;
+
+ return true;
+}
+
+bool cmp_read_bool_as_u8(cmp_ctx_t *ctx, uint8_t *b) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_BOOLEAN) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ if (obj.as.boolean)
+ *b = 1;
+ else
+ *b = 0;
+
+ return true;
+}
+
+bool cmp_read_str_size(cmp_ctx_t *ctx, uint32_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_FIXSTR:
+ case CMP_TYPE_STR8:
+ case CMP_TYPE_STR16:
+ case CMP_TYPE_STR32:
+ *size = obj.as.str_size;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_str(cmp_ctx_t *ctx, char *data, uint32_t *size) {
+ uint32_t str_size = 0;
+
+ if (!cmp_read_str_size(ctx, &str_size))
+ return false;
+
+ if ((str_size + 1) > *size) {
+ *size = str_size;
+ ctx->error = STR_DATA_LENGTH_TOO_LONG_ERROR;
+ return false;
+ }
+
+ if (!ctx->read(ctx, data, str_size)) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+
+ data[str_size] = 0;
+
+ *size = str_size;
+ return true;
+}
+
+bool cmp_read_bin_size(cmp_ctx_t *ctx, uint32_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_BIN8:
+ case CMP_TYPE_BIN16:
+ case CMP_TYPE_BIN32:
+ *size = obj.as.bin_size;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_bin(cmp_ctx_t *ctx, void *data, uint32_t *size) {
+ uint32_t bin_size = 0;
+
+ if (!cmp_read_bin_size(ctx, &bin_size))
+ return false;
+
+ if (bin_size > *size) {
+ ctx->error = BIN_DATA_LENGTH_TOO_LONG_ERROR;
+ return false;
+ }
+
+ if (!ctx->read(ctx, data, bin_size)) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+
+ *size = bin_size;
+ return true;
+}
+
+bool cmp_read_array(cmp_ctx_t *ctx, uint32_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_FIXARRAY:
+ case CMP_TYPE_ARRAY16:
+ case CMP_TYPE_ARRAY32:
+ *size = obj.as.array_size;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_map(cmp_ctx_t *ctx, uint32_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_FIXMAP:
+ case CMP_TYPE_MAP16:
+ case CMP_TYPE_MAP32:
+ *size = obj.as.map_size;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_fixext1_marker(cmp_ctx_t *ctx, int8_t *type) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_FIXEXT1) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ return true;
+}
+
+bool cmp_read_fixext1(cmp_ctx_t *ctx, int8_t *type, void *data) {
+ if (!cmp_read_fixext1_marker(ctx, type))
+ return false;
+
+ if (ctx->read(ctx, data, 1))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_fixext2_marker(cmp_ctx_t *ctx, int8_t *type) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_FIXEXT2) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ return true;
+}
+
+bool cmp_read_fixext2(cmp_ctx_t *ctx, int8_t *type, void *data) {
+ if (!cmp_read_fixext2_marker(ctx, type))
+ return false;
+
+ if (ctx->read(ctx, data, 2))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_fixext4_marker(cmp_ctx_t *ctx, int8_t *type) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_FIXEXT4) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ return true;
+}
+
+bool cmp_read_fixext4(cmp_ctx_t *ctx, int8_t *type, void *data) {
+ if (!cmp_read_fixext4_marker(ctx, type))
+ return false;
+
+ if (ctx->read(ctx, data, 4))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_fixext8_marker(cmp_ctx_t *ctx, int8_t *type) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_FIXEXT8) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ return true;
+}
+
+bool cmp_read_fixext8(cmp_ctx_t *ctx, int8_t *type, void *data) {
+ if (!cmp_read_fixext8_marker(ctx, type))
+ return false;
+
+ if (ctx->read(ctx, data, 8))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_fixext16_marker(cmp_ctx_t *ctx, int8_t *type) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_FIXEXT16) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ return true;
+}
+
+bool cmp_read_fixext16(cmp_ctx_t *ctx, int8_t *type, void *data) {
+ if (!cmp_read_fixext16_marker(ctx, type))
+ return false;
+
+ if (ctx->read(ctx, data, 16))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_ext8_marker(cmp_ctx_t *ctx, int8_t *type, uint8_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_EXT8) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ *size = obj.as.ext.size;
+
+ return true;
+}
+
+bool cmp_read_ext8(cmp_ctx_t *ctx, int8_t *type, uint8_t *size, void *data) {
+ if (!cmp_read_ext8_marker(ctx, type, size))
+ return false;
+
+ if (ctx->read(ctx, data, *size))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_ext16_marker(cmp_ctx_t *ctx, int8_t *type, uint16_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_EXT16) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ *size = obj.as.ext.size;
+
+ return true;
+}
+
+bool cmp_read_ext16(cmp_ctx_t *ctx, int8_t *type, uint16_t *size, void *data) {
+ if (!cmp_read_ext16_marker(ctx, type, size))
+ return false;
+
+ if (ctx->read(ctx, data, *size))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_ext32_marker(cmp_ctx_t *ctx, int8_t *type, uint32_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ if (obj.type != CMP_TYPE_EXT32) {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ *type = obj.as.ext.type;
+ *size = obj.as.ext.size;
+
+ return true;
+}
+
+bool cmp_read_ext32(cmp_ctx_t *ctx, int8_t *type, uint32_t *size, void *data) {
+ if (!cmp_read_ext32_marker(ctx, type, size))
+ return false;
+
+ if (ctx->read(ctx, data, *size))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_ext_marker(cmp_ctx_t *ctx, int8_t *type, uint32_t *size) {
+ cmp_object_t obj;
+
+ if (!cmp_read_object(ctx, &obj))
+ return false;
+
+ switch (obj.type) {
+ case CMP_TYPE_FIXEXT1:
+ case CMP_TYPE_FIXEXT2:
+ case CMP_TYPE_FIXEXT4:
+ case CMP_TYPE_FIXEXT8:
+ case CMP_TYPE_FIXEXT16:
+ case CMP_TYPE_EXT8:
+ case CMP_TYPE_EXT16:
+ case CMP_TYPE_EXT32:
+ *type = obj.as.ext.type;
+ *size = obj.as.ext.size;
+ return true;
+ default:
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+}
+
+bool cmp_read_ext(cmp_ctx_t *ctx, int8_t *type, uint32_t *size, void *data) {
+ if (!cmp_read_ext_marker(ctx, type, size))
+ return false;
+
+ if (ctx->read(ctx, data, *size))
+ return true;
+
+ ctx->error = DATA_READING_ERROR;
+ return false;
+}
+
+bool cmp_read_object(cmp_ctx_t *ctx, cmp_object_t *obj) {
+ uint8_t type_marker = 0;
+
+ if (!read_type_marker(ctx, &type_marker))
+ return false;
+
+ if (type_marker <= 0x7F) {
+ obj->type = CMP_TYPE_POSITIVE_FIXNUM;
+ obj->as.u8 = type_marker;
+ }
+ else if (type_marker <= 0x8F) {
+ obj->type = CMP_TYPE_FIXMAP;
+ obj->as.map_size = type_marker & FIXMAP_SIZE;
+ }
+ else if (type_marker <= 0x9F) {
+ obj->type = CMP_TYPE_FIXARRAY;
+ obj->as.array_size = type_marker & FIXARRAY_SIZE;
+ }
+ else if (type_marker <= 0xBF) {
+ obj->type = CMP_TYPE_FIXSTR;
+ obj->as.str_size = type_marker & FIXSTR_SIZE;
+ }
+ else if (type_marker == NIL_MARKER) {
+ obj->type = CMP_TYPE_NIL;
+ obj->as.u8 = 0;
+ }
+ else if (type_marker == FALSE_MARKER) {
+ obj->type = CMP_TYPE_BOOLEAN;
+ obj->as.boolean = false;
+ }
+ else if (type_marker == TRUE_MARKER) {
+ obj->type = CMP_TYPE_BOOLEAN;
+ obj->as.boolean = true;
+ }
+ else if (type_marker == BIN8_MARKER) {
+ obj->type = CMP_TYPE_BIN8;
+ if (!ctx->read(ctx, &obj->as.u8, sizeof(uint8_t))) {
+ ctx->error = LENGTH_READING_ERROR;
+ return false;
+ }
+ obj->as.bin_size = obj->as.u8;
+ }
+ else if (type_marker == BIN16_MARKER) {
+ obj->type = CMP_TYPE_BIN16;
+ if (!ctx->read(ctx, &obj->as.u16, sizeof(uint16_t))) {
+ ctx->error = LENGTH_READING_ERROR;
+ return false;
+ }
+ obj->as.bin_size = be16(obj->as.u16);
+ }
+ else if (type_marker == BIN32_MARKER) {
+ obj->type = CMP_TYPE_BIN32;
+ if (!ctx->read(ctx, &obj->as.u32, sizeof(uint32_t))) {
+ ctx->error = LENGTH_READING_ERROR;
+ return false;
+ }
+ obj->as.bin_size = be32(obj->as.u32);
+ }
+ else if (type_marker == EXT8_MARKER) {
+ uint8_t ext_size;
+ int8_t ext_type;
+
+ obj->type = CMP_TYPE_EXT8;
+ if (!ctx->read(ctx, &ext_size, sizeof(uint8_t))) {
+ ctx->error = LENGTH_READING_ERROR;
+ return false;
+ }
+ if (!ctx->read(ctx, &ext_type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = ext_size;
+ obj->as.ext.type = ext_type;
+ }
+ else if (type_marker == EXT16_MARKER) {
+ int8_t ext_type;
+ uint16_t ext_size;
+
+ obj->type = CMP_TYPE_EXT16;
+ if (!ctx->read(ctx, &ext_size, sizeof(uint16_t))) {
+ ctx->error = LENGTH_READING_ERROR;
+ return false;
+ }
+ if (!ctx->read(ctx, &ext_type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = be16(ext_size);
+ obj->as.ext.type = ext_type;
+ }
+ else if (type_marker == EXT32_MARKER) {
+ int8_t ext_type;
+ uint32_t ext_size;
+
+ obj->type = CMP_TYPE_EXT32;
+ if (!ctx->read(ctx, &ext_size, sizeof(uint32_t))) {
+ ctx->error = LENGTH_READING_ERROR;
+ return false;
+ }
+ if (!ctx->read(ctx, &ext_type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = be32(ext_size);
+ obj->as.ext.type = ext_type;
+ }
+ else if (type_marker == FLOAT_MARKER) {
+ obj->type = CMP_TYPE_FLOAT;
+ if (!ctx->read(ctx, &obj->as.flt, sizeof(float))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.flt = befloat(obj->as.flt);
+ }
+ else if (type_marker == DOUBLE_MARKER) {
+ obj->type = CMP_TYPE_DOUBLE;
+ if (!ctx->read(ctx, &obj->as.dbl, sizeof(double))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.dbl = bedouble(obj->as.dbl);
+ }
+ else if (type_marker == U8_MARKER) {
+ obj->type = CMP_TYPE_UINT8;
+ if (!ctx->read(ctx, &obj->as.u8, sizeof(uint8_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ }
+ else if (type_marker == U16_MARKER) {
+ obj->type = CMP_TYPE_UINT16;
+ if (!ctx->read(ctx, &obj->as.u16, sizeof(uint16_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.u16 = be16(obj->as.u16);
+ }
+ else if (type_marker == U32_MARKER) {
+ obj->type = CMP_TYPE_UINT32;
+ if (!ctx->read(ctx, &obj->as.u32, sizeof(uint32_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.u32 = be32(obj->as.u32);
+ }
+ else if (type_marker == U64_MARKER) {
+ obj->type = CMP_TYPE_UINT64;
+ if (!ctx->read(ctx, &obj->as.u64, sizeof(uint64_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.u64 = be64(obj->as.u64);
+ }
+ else if (type_marker == S8_MARKER) {
+ obj->type = CMP_TYPE_SINT8;
+ if (!ctx->read(ctx, &obj->as.s8, sizeof(int8_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ }
+ else if (type_marker == S16_MARKER) {
+ obj->type = CMP_TYPE_SINT16;
+ if (!ctx->read(ctx, &obj->as.s16, sizeof(int16_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.s16 = be16(obj->as.s16);
+ }
+ else if (type_marker == S32_MARKER) {
+ obj->type = CMP_TYPE_SINT32;
+ if (!ctx->read(ctx, &obj->as.s32, sizeof(int32_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.s32 = be32(obj->as.s32);
+ }
+ else if (type_marker == S64_MARKER) {
+ obj->type = CMP_TYPE_SINT64;
+ if (!ctx->read(ctx, &obj->as.s64, sizeof(int64_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.s64 = be64(obj->as.s64);
+ }
+ else if (type_marker == FIXEXT1_MARKER) {
+ obj->type = CMP_TYPE_FIXEXT1;
+ if (!ctx->read(ctx, &obj->as.ext.type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = 1;
+ }
+ else if (type_marker == FIXEXT2_MARKER) {
+ obj->type = CMP_TYPE_FIXEXT2;
+ if (!ctx->read(ctx, &obj->as.ext.type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = 2;
+ }
+ else if (type_marker == FIXEXT4_MARKER) {
+ obj->type = CMP_TYPE_FIXEXT4;
+ if (!ctx->read(ctx, &obj->as.ext.type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = 4;
+ }
+ else if (type_marker == FIXEXT8_MARKER) {
+ obj->type = CMP_TYPE_FIXEXT8;
+ if (!ctx->read(ctx, &obj->as.ext.type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = 8;
+ }
+ else if (type_marker == FIXEXT16_MARKER) {
+ obj->type = CMP_TYPE_FIXEXT16;
+ if (!ctx->read(ctx, &obj->as.ext.type, sizeof(int8_t))) {
+ ctx->error = EXT_TYPE_READING_ERROR;
+ return false;
+ }
+ obj->as.ext.size = 16;
+ }
+ else if (type_marker == STR8_MARKER) {
+ obj->type = CMP_TYPE_STR8;
+ if (!ctx->read(ctx, &obj->as.u8, sizeof(uint8_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.str_size = obj->as.u8;
+ }
+ else if (type_marker == STR16_MARKER) {
+ obj->type = CMP_TYPE_STR16;
+ if (!ctx->read(ctx, &obj->as.u16, sizeof(uint16_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.str_size = be16(obj->as.u16);
+ }
+ else if (type_marker == STR32_MARKER) {
+ obj->type = CMP_TYPE_STR32;
+ if (!ctx->read(ctx, &obj->as.u32, sizeof(uint32_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.str_size = be32(obj->as.u32);
+ }
+ else if (type_marker == ARRAY16_MARKER) {
+ obj->type = CMP_TYPE_ARRAY16;
+ if (!ctx->read(ctx, &obj->as.u16, sizeof(uint16_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.array_size = be16(obj->as.u16);
+ }
+ else if (type_marker == ARRAY32_MARKER) {
+ obj->type = CMP_TYPE_ARRAY32;
+ if (!ctx->read(ctx, &obj->as.u32, sizeof(uint32_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.array_size = be32(obj->as.u32);
+ }
+ else if (type_marker == MAP16_MARKER) {
+ obj->type = CMP_TYPE_MAP16;
+ if (!ctx->read(ctx, &obj->as.u16, sizeof(uint16_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.map_size = be16(obj->as.u16);
+ }
+ else if (type_marker == MAP32_MARKER) {
+ obj->type = CMP_TYPE_MAP32;
+ if (!ctx->read(ctx, &obj->as.u32, sizeof(uint32_t))) {
+ ctx->error = DATA_READING_ERROR;
+ return false;
+ }
+ obj->as.map_size = be32(obj->as.u32);
+ }
+ else if (type_marker >= NEGATIVE_FIXNUM_MARKER) {
+ obj->type = CMP_TYPE_NEGATIVE_FIXNUM;
+ obj->as.s8 = type_marker;
+ }
+ else {
+ ctx->error = INVALID_TYPE_ERROR;
+ return false;
+ }
+
+ return true;
+}
+
+bool cmp_object_is_char(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_short(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ case CMP_TYPE_SINT16:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_int(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ case CMP_TYPE_SINT16:
+ case CMP_TYPE_SINT32:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_long(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ case CMP_TYPE_SINT16:
+ case CMP_TYPE_SINT32:
+ case CMP_TYPE_SINT64:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_sinteger(cmp_object_t *obj) {
+ return cmp_object_is_long(obj);
+}
+
+bool cmp_object_is_uchar(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_ushort(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ return true;
+ case CMP_TYPE_UINT16:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_uint(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ case CMP_TYPE_UINT16:
+ case CMP_TYPE_UINT32:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_ulong(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ case CMP_TYPE_UINT16:
+ case CMP_TYPE_UINT32:
+ case CMP_TYPE_UINT64:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_uinteger(cmp_object_t *obj) {
+ return cmp_object_is_ulong(obj);
+}
+
+bool cmp_object_is_float(cmp_object_t *obj) {
+ if (obj->type == CMP_TYPE_FLOAT)
+ return true;
+
+ return false;
+}
+
+bool cmp_object_is_double(cmp_object_t *obj) {
+ if (obj->type == CMP_TYPE_DOUBLE)
+ return true;
+
+ return false;
+}
+
+bool cmp_object_is_nil(cmp_object_t *obj) {
+ if (obj->type == CMP_TYPE_NIL)
+ return true;
+
+ return false;
+}
+
+bool cmp_object_is_bool(cmp_object_t *obj) {
+ if (obj->type == CMP_TYPE_BOOLEAN)
+ return true;
+
+ return false;
+}
+
+bool cmp_object_is_str(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXSTR:
+ case CMP_TYPE_STR8:
+ case CMP_TYPE_STR16:
+ case CMP_TYPE_STR32:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_bin(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_BIN8:
+ case CMP_TYPE_BIN16:
+ case CMP_TYPE_BIN32:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_array(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXARRAY:
+ case CMP_TYPE_ARRAY16:
+ case CMP_TYPE_ARRAY32:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_map(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXMAP:
+ case CMP_TYPE_MAP16:
+ case CMP_TYPE_MAP32:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_is_ext(cmp_object_t *obj) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXEXT1:
+ case CMP_TYPE_FIXEXT2:
+ case CMP_TYPE_FIXEXT4:
+ case CMP_TYPE_FIXEXT8:
+ case CMP_TYPE_FIXEXT16:
+ case CMP_TYPE_EXT8:
+ case CMP_TYPE_EXT16:
+ case CMP_TYPE_EXT32:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_char(cmp_object_t *obj, int8_t *c) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *c = obj->as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ if (obj->as.u8 <= 127) {
+ *c = obj->as.s8;
+ return true;
+ }
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_short(cmp_object_t *obj, int16_t *s) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *s = obj->as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ *s = obj->as.u8;
+ return true;
+ case CMP_TYPE_SINT16:
+ *s = obj->as.s16;
+ return true;
+ case CMP_TYPE_UINT16:
+ if (obj->as.u16 <= 32767) {
+ *s = obj->as.u16;
+ return true;
+ }
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_int(cmp_object_t *obj, int32_t *i) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *i = obj->as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ *i = obj->as.u8;
+ return true;
+ case CMP_TYPE_SINT16:
+ *i = obj->as.s16;
+ return true;
+ case CMP_TYPE_UINT16:
+ *i = obj->as.u16;
+ return true;
+ case CMP_TYPE_SINT32:
+ *i = obj->as.s32;
+ return true;
+ case CMP_TYPE_UINT32:
+ if (obj->as.u32 <= 2147483647) {
+ *i = obj->as.u32;
+ return true;
+ }
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_long(cmp_object_t *obj, int64_t *d) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_NEGATIVE_FIXNUM:
+ case CMP_TYPE_SINT8:
+ *d = obj->as.s8;
+ return true;
+ case CMP_TYPE_UINT8:
+ *d = obj->as.u8;
+ return true;
+ case CMP_TYPE_SINT16:
+ *d = obj->as.s16;
+ return true;
+ case CMP_TYPE_UINT16:
+ *d = obj->as.u16;
+ return true;
+ case CMP_TYPE_SINT32:
+ *d = obj->as.s32;
+ return true;
+ case CMP_TYPE_UINT32:
+ *d = obj->as.u32;
+ return true;
+ case CMP_TYPE_SINT64:
+ *d = obj->as.s64;
+ return true;
+ case CMP_TYPE_UINT64:
+ if (obj->as.u64 <= 9223372036854775807) {
+ *d = obj->as.u64;
+ return true;
+ }
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_sinteger(cmp_object_t *obj, int64_t *d) {
+ return cmp_object_as_long(obj, d);
+}
+
+bool cmp_object_as_uchar(cmp_object_t *obj, uint8_t *c) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *c = obj->as.u8;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_ushort(cmp_object_t *obj, uint16_t *s) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *s = obj->as.u8;
+ return true;
+ case CMP_TYPE_UINT16:
+ *s = obj->as.u16;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_uint(cmp_object_t *obj, uint32_t *i) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *i = obj->as.u8;
+ return true;
+ case CMP_TYPE_UINT16:
+ *i = obj->as.u16;
+ return true;
+ case CMP_TYPE_UINT32:
+ *i = obj->as.u32;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_ulong(cmp_object_t *obj, uint64_t *u) {
+ switch (obj->type) {
+ case CMP_TYPE_POSITIVE_FIXNUM:
+ case CMP_TYPE_UINT8:
+ *u = obj->as.u8;
+ return true;
+ case CMP_TYPE_UINT16:
+ *u = obj->as.u16;
+ return true;
+ case CMP_TYPE_UINT32:
+ *u = obj->as.u32;
+ return true;
+ case CMP_TYPE_UINT64:
+ *u = obj->as.u64;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_uinteger(cmp_object_t *obj, uint64_t *d) {
+ return cmp_object_as_ulong(obj, d);
+}
+
+bool cmp_object_as_float(cmp_object_t *obj, float *f) {
+ if (obj->type == CMP_TYPE_FLOAT) {
+ *f = obj->as.flt;
+ return true;
+ }
+
+ return false;
+}
+
+bool cmp_object_as_double(cmp_object_t *obj, double *d) {
+ if (obj->type == CMP_TYPE_DOUBLE) {
+ *d = obj->as.dbl;
+ return true;
+ }
+
+ return false;
+}
+
+bool cmp_object_as_bool(cmp_object_t *obj, bool *b) {
+ if (obj->type == CMP_TYPE_BOOLEAN) {
+ if (obj->as.boolean)
+ *b = true;
+ else
+ *b = false;
+
+ return true;
+ }
+
+ return false;
+}
+
+bool cmp_object_as_str(cmp_object_t *obj, uint32_t *size) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXSTR:
+ case CMP_TYPE_STR8:
+ case CMP_TYPE_STR16:
+ case CMP_TYPE_STR32:
+ *size = obj->as.str_size;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_bin(cmp_object_t *obj, uint32_t *size) {
+ switch (obj->type) {
+ case CMP_TYPE_BIN8:
+ case CMP_TYPE_BIN16:
+ case CMP_TYPE_BIN32:
+ *size = obj->as.bin_size;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_array(cmp_object_t *obj, uint32_t *size) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXARRAY:
+ case CMP_TYPE_ARRAY16:
+ case CMP_TYPE_ARRAY32:
+ *size = obj->as.array_size;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_map(cmp_object_t *obj, uint32_t *size) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXMAP:
+ case CMP_TYPE_MAP16:
+ case CMP_TYPE_MAP32:
+ *size = obj->as.map_size;
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool cmp_object_as_ext(cmp_object_t *obj, int8_t *type, uint32_t *size) {
+ switch (obj->type) {
+ case CMP_TYPE_FIXEXT1:
+ case CMP_TYPE_FIXEXT2:
+ case CMP_TYPE_FIXEXT4:
+ case CMP_TYPE_FIXEXT8:
+ case CMP_TYPE_FIXEXT16:
+ case CMP_TYPE_EXT8:
+ case CMP_TYPE_EXT16:
+ case CMP_TYPE_EXT32:
+ *type = obj->as.ext.type;
+ *size = obj->as.ext.size;
+ return true;
+ default:
+ return false;
+ }
+}
+
+/* vi: set et ts=2 sw=2: */
+
diff --git a/htrace-c/src/util/cmp.h b/htrace-c/src/util/cmp.h
new file mode 100644
index 0000000..c94efd0
--- /dev/null
+++ b/htrace-c/src/util/cmp.h
@@ -0,0 +1,441 @@
+/*
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2014 Charles Gunyon
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef CMP_H__
+#define CMP_H__
+
+#include <stdint.h> /* for uint32_t, etc. */
+#include <unistd.h> /* for size_t */
+
+// Define bool, true, false, etc. to avoid adding a dependency on C99
+// that we don't really need.
+#undef bool
+#define bool int
+#undef true
+#define true 1
+#undef false
+#define false 0
+
+struct cmp_ctx_s;
+
+typedef bool (*cmp_reader)(struct cmp_ctx_s *ctx, void *data, size_t limit);
+typedef size_t (*cmp_writer)(struct cmp_ctx_s *ctx, const void *data,
+ size_t count);
+
+enum {
+ CMP_TYPE_POSITIVE_FIXNUM, /* 0 */
+ CMP_TYPE_FIXMAP, /* 1 */
+ CMP_TYPE_FIXARRAY, /* 2 */
+ CMP_TYPE_FIXSTR, /* 3 */
+ CMP_TYPE_NIL, /* 4 */
+ CMP_TYPE_BOOLEAN, /* 5 */
+ CMP_TYPE_BIN8, /* 6 */
+ CMP_TYPE_BIN16, /* 7 */
+ CMP_TYPE_BIN32, /* 8 */
+ CMP_TYPE_EXT8, /* 9 */
+ CMP_TYPE_EXT16, /* 10 */
+ CMP_TYPE_EXT32, /* 11 */
+ CMP_TYPE_FLOAT, /* 12 */
+ CMP_TYPE_DOUBLE, /* 13 */
+ CMP_TYPE_UINT8, /* 14 */
+ CMP_TYPE_UINT16, /* 15 */
+ CMP_TYPE_UINT32, /* 16 */
+ CMP_TYPE_UINT64, /* 17 */
+ CMP_TYPE_SINT8, /* 18 */
+ CMP_TYPE_SINT16, /* 19 */
+ CMP_TYPE_SINT32, /* 20 */
+ CMP_TYPE_SINT64, /* 21 */
+ CMP_TYPE_FIXEXT1, /* 22 */
+ CMP_TYPE_FIXEXT2, /* 23 */
+ CMP_TYPE_FIXEXT4, /* 24 */
+ CMP_TYPE_FIXEXT8, /* 25 */
+ CMP_TYPE_FIXEXT16, /* 26 */
+ CMP_TYPE_STR8, /* 27 */
+ CMP_TYPE_STR16, /* 28 */
+ CMP_TYPE_STR32, /* 29 */
+ CMP_TYPE_ARRAY16, /* 30 */
+ CMP_TYPE_ARRAY32, /* 31 */
+ CMP_TYPE_MAP16, /* 32 */
+ CMP_TYPE_MAP32, /* 33 */
+ CMP_TYPE_NEGATIVE_FIXNUM /* 34 */
+};
+
+typedef struct cmp_ext_s {
+ int8_t type;
+ uint32_t size;
+} cmp_ext_t;
+
+union cmp_object_data_u {
+ bool boolean;
+ uint8_t u8;
+ uint16_t u16;
+ uint32_t u32;
+ uint64_t u64;
+ int8_t s8;
+ int16_t s16;
+ int32_t s32;
+ int64_t s64;
+ float flt;
+ double dbl;
+ uint32_t array_size;
+ uint32_t map_size;
+ uint32_t str_size;
+ uint32_t bin_size;
+ cmp_ext_t ext;
+};
+
+typedef struct cmp_ctx_s {
+ uint8_t error;
+ void *buf;
+ cmp_reader read;
+ cmp_writer write;
+} cmp_ctx_t;
+
+typedef struct cmp_object_s {
+ uint8_t type;
+ union cmp_object_data_u as;
+} cmp_object_t;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * ============================================================================
+ * === Main API
+ * ============================================================================
+ */
+
+/* Initializes a CMP context */
+void cmp_init(cmp_ctx_t *ctx, void *buf, cmp_reader read, cmp_writer write);
+
+/* Returns CMP's version */
+uint32_t cmp_version(void);
+
+/* Returns the MessagePack version employed by CMP */
+uint32_t cmp_mp_version(void);
+
+/* Returns a string description of a CMP context's error */
+const char* cmp_strerror(cmp_ctx_t *ctx);
+
+/* Writes a signed integer to the backend */
+bool cmp_write_sint(cmp_ctx_t *ctx, int64_t d);
+
+/* Writes an unsigned integer to the backend */
+bool cmp_write_uint(cmp_ctx_t *ctx, uint64_t u);
+
+/* Writes a single-precision float to the backend */
+bool cmp_write_float(cmp_ctx_t *ctx, float f);
+
+/* Writes a double-precision float to the backend */
+bool cmp_write_double(cmp_ctx_t *ctx, double d);
+
+/* Writes NULL to the backend */
+bool cmp_write_nil(cmp_ctx_t *ctx);
+
+/* Writes true to the backend */
+bool cmp_write_true(cmp_ctx_t *ctx);
+
+/* Writes false to the backend */
+bool cmp_write_false(cmp_ctx_t *ctx);
+
+/* Writes a boolean value to the backend */
+bool cmp_write_bool(cmp_ctx_t *ctx, bool b);
+
+/*
+ * Writes an unsigned char's value to the backend as a boolean. This is useful
+ * if you are using a different boolean type in your application.
+ */
+bool cmp_write_u8_as_bool(cmp_ctx_t *ctx, uint8_t b);
+
+/*
+ * Writes a string to the backend; according to the MessagePack spec, this must
+ * be encoded using UTF-8, but CMP leaves that job up to the programmer.
+ */
+bool cmp_write_str(cmp_ctx_t *ctx, const char *data, uint32_t size);
+
+/*
+ * Writes the string marker to the backend. This is useful if you are writing
+ * data in chunks instead of a single shot.
+ */
+bool cmp_write_str_marker(cmp_ctx_t *ctx, uint32_t size);
+
+/* Writes binary data to the backend */
+bool cmp_write_bin(cmp_ctx_t *ctx, const void *data, uint32_t size);
+
+/*
+ * Writes the binary data marker to the backend. This is useful if you are
+ * writing data in chunks instead of a single shot.
+ */
+bool cmp_write_bin_marker(cmp_ctx_t *ctx, uint32_t size);
+
+/* Writes an array to the backend. */
+bool cmp_write_array(cmp_ctx_t *ctx, uint32_t size);
+
+/* Writes a map to the backend. */
+bool cmp_write_map(cmp_ctx_t *ctx, uint32_t size);
+
+/* Writes an extended type to the backend */
+bool cmp_write_ext(cmp_ctx_t *ctx, int8_t type, uint32_t size,
+ const void *data);
+
+/*
+ * Writes the extended type marker to the backend. This is useful if you want
+ * to write the type's data in chunks instead of a single shot.
+ */
+bool cmp_write_ext_marker(cmp_ctx_t *ctx, int8_t type, uint32_t size);
+
+/* Writes an object to the backend */
+bool cmp_write_object(cmp_ctx_t *ctx, cmp_object_t *obj);
+
+/* Reads a signed integer that fits inside a signed char */
+bool cmp_read_char(cmp_ctx_t *ctx, int8_t *c);
+
+/* Reads a signed integer that fits inside a signed short */
+bool cmp_read_short(cmp_ctx_t *ctx, int16_t *s);
+
+/* Reads a signed integer that fits inside a signed int */
+bool cmp_read_int(cmp_ctx_t *ctx, int32_t *i);
+
+/* Reads a signed integer that fits inside a signed long */
+bool cmp_read_long(cmp_ctx_t *ctx, int64_t *d);
+
+/* Reads a signed integer */
+bool cmp_read_sinteger(cmp_ctx_t *ctx, int64_t *d);
+
+/* Reads an unsigned integer that fits inside an unsigned char */
+bool cmp_read_uchar(cmp_ctx_t *ctx, uint8_t *c);
+
+/* Reads an unsigned integer that fits inside an unsigned short */
+bool cmp_read_ushort(cmp_ctx_t *ctx, uint16_t *s);
+
+/* Reads an unsigned integer that fits inside an unsigned int */
+bool cmp_read_uint(cmp_ctx_t *ctx, uint32_t *i);
+
+/* Reads an unsigned integer that fits inside an unsigned long */
+bool cmp_read_ulong(cmp_ctx_t *ctx, uint64_t *u);
+
+/* Reads an unsigned integer */
+bool cmp_read_uinteger(cmp_ctx_t *ctx, uint64_t *u);
+
+/* Reads a single-precision float from the backend */
+bool cmp_read_float(cmp_ctx_t *ctx, float *f);
+
+/* Reads a double-precision float from the backend */
+bool cmp_read_double(cmp_ctx_t *ctx, double *d);
+
+/* "Reads" (more like "skips") a NULL value from the backend */
+bool cmp_read_nil(cmp_ctx_t *ctx);
+
+/* Reads a boolean from the backend */
+bool cmp_read_bool(cmp_ctx_t *ctx, bool *b);
+
+/*
+ * Reads a boolean as an unsigned char from the backend; this is useful if your
+ * application uses a different boolean type.
+ */
+bool cmp_read_bool_as_u8(cmp_ctx_t *ctx, uint8_t *b);
+
+/* Reads a string's size from the backend */
+bool cmp_read_str_size(cmp_ctx_t *ctx, uint32_t *size);
+
+/*
+ * Reads a string from the backend; according to the spec, the string's data
+ * ought to be encoded using UTF-8,
+ */
+bool cmp_read_str(cmp_ctx_t *ctx, char *data, uint32_t *size);
+
+/* Reads the size of packed binary data from the backend */
+bool cmp_read_bin_size(cmp_ctx_t *ctx, uint32_t *size);
+
+/* Reads packed binary data from the backend */
+bool cmp_read_bin(cmp_ctx_t *ctx, void *data, uint32_t *size);
+
+/* Reads an array from the backend */
+bool cmp_read_array(cmp_ctx_t *ctx, uint32_t *size);
+
+/* Reads a map from the backend */
+bool cmp_read_map(cmp_ctx_t *ctx, uint32_t *size);
+
+/* Reads the extended type's marker from the backend */
+bool cmp_read_ext_marker(cmp_ctx_t *ctx, int8_t *type, uint32_t *size);
+
+/* Reads an extended type from the backend */
+bool cmp_read_ext(cmp_ctx_t *ctx, int8_t *type, uint32_t *size, void *data);
+
+/* Reads an object from the backend */
+bool cmp_read_object(cmp_ctx_t *ctx, cmp_object_t *obj);
+
+/*
+ * ============================================================================
+ * === Specific API
+ * ============================================================================
+ */
+
+bool cmp_write_pfix(cmp_ctx_t *ctx, uint8_t c);
+bool cmp_write_nfix(cmp_ctx_t *ctx, int8_t c);
+
+bool cmp_write_sfix(cmp_ctx_t *ctx, int8_t c);
+bool cmp_write_s8(cmp_ctx_t *ctx, int8_t c);
+bool cmp_write_s16(cmp_ctx_t *ctx, int16_t s);
+bool cmp_write_s32(cmp_ctx_t *ctx, int32_t i);
+bool cmp_write_s64(cmp_ctx_t *ctx, int64_t l);
+
+bool cmp_write_ufix(cmp_ctx_t *ctx, uint8_t c);
+bool cmp_write_u8(cmp_ctx_t *ctx, uint8_t c);
+bool cmp_write_u16(cmp_ctx_t *ctx, uint16_t s);
+bool cmp_write_u32(cmp_ctx_t *ctx, uint32_t i);
+bool cmp_write_u64(cmp_ctx_t *ctx, uint64_t l);
+
+bool cmp_write_fixstr_marker(cmp_ctx_t *ctx, uint8_t size);
+bool cmp_write_fixstr(cmp_ctx_t *ctx, const char *data, uint8_t size);
+bool cmp_write_str8_marker(cmp_ctx_t *ctx, uint8_t size);
+bool cmp_write_str8(cmp_ctx_t *ctx, const char *data, uint8_t size);
+bool cmp_write_str16_marker(cmp_ctx_t *ctx, uint16_t size);
+bool cmp_write_str16(cmp_ctx_t *ctx, const char *data, uint16_t size);
+bool cmp_write_str32_marker(cmp_ctx_t *ctx, uint32_t size);
+bool cmp_write_str32(cmp_ctx_t *ctx, const char *data, uint32_t size);
+
+bool cmp_write_bin8_marker(cmp_ctx_t *ctx, uint8_t size);
+bool cmp_write_bin8(cmp_ctx_t *ctx, const void *data, uint8_t size);
+bool cmp_write_bin16_marker(cmp_ctx_t *ctx, uint16_t size);
+bool cmp_write_bin16(cmp_ctx_t *ctx, const void *data, uint16_t size);
+bool cmp_write_bin32_marker(cmp_ctx_t *ctx, uint32_t size);
+bool cmp_write_bin32(cmp_ctx_t *ctx, const void *data, uint32_t size);
+
+bool cmp_write_fixarray(cmp_ctx_t *ctx, uint8_t size);
+bool cmp_write_array16(cmp_ctx_t *ctx, uint16_t size);
+bool cmp_write_array32(cmp_ctx_t *ctx, uint32_t size);
+
+bool cmp_write_fixmap(cmp_ctx_t *ctx, uint8_t size);
+bool cmp_write_map16(cmp_ctx_t *ctx, uint16_t size);
+bool cmp_write_map32(cmp_ctx_t *ctx, uint32_t size);
+
+bool cmp_write_fixext1_marker(cmp_ctx_t *ctx, int8_t type);
+bool cmp_write_fixext1(cmp_ctx_t *ctx, int8_t type, const void *data);
+bool cmp_write_fixext2_marker(cmp_ctx_t *ctx, int8_t type);
+bool cmp_write_fixext2(cmp_ctx_t *ctx, int8_t type, const void *data);
+bool cmp_write_fixext4_marker(cmp_ctx_t *ctx, int8_t type);
+bool cmp_write_fixext4(cmp_ctx_t *ctx, int8_t type, const void *data);
+bool cmp_write_fixext8_marker(cmp_ctx_t *ctx, int8_t type);
+bool cmp_write_fixext8(cmp_ctx_t *ctx, int8_t type, const void *data);
+bool cmp_write_fixext16_marker(cmp_ctx_t *ctx, int8_t type);
+bool cmp_write_fixext16(cmp_ctx_t *ctx, int8_t type, const void *data);
+
+bool cmp_write_ext8_marker(cmp_ctx_t *ctx, int8_t type, uint8_t size);
+bool cmp_write_ext8(cmp_ctx_t *ctx, int8_t type, uint8_t size,
+ const void *data);
+bool cmp_write_ext16_marker(cmp_ctx_t *ctx, int8_t type, uint16_t size);
+bool cmp_write_ext16(cmp_ctx_t *ctx, int8_t type, uint16_t size,
+ const void *data);
+bool cmp_write_ext32_marker(cmp_ctx_t *ctx, int8_t type, uint32_t size);
+bool cmp_write_ext32(cmp_ctx_t *ctx, int8_t type, uint32_t size,
+ const void *data);
+
+bool cmp_read_pfix(cmp_ctx_t *ctx, uint8_t *c);
+bool cmp_read_nfix(cmp_ctx_t *ctx, int8_t *c);
+
+bool cmp_read_sfix(cmp_ctx_t *ctx, int8_t *c);
+bool cmp_read_s8(cmp_ctx_t *ctx, int8_t *c);
+bool cmp_read_s16(cmp_ctx_t *ctx, int16_t *s);
+bool cmp_read_s32(cmp_ctx_t *ctx, int32_t *i);
+bool cmp_read_s64(cmp_ctx_t *ctx, int64_t *l);
+
+bool cmp_read_ufix(cmp_ctx_t *ctx, uint8_t *c);
+bool cmp_read_u8(cmp_ctx_t *ctx, uint8_t *c);
+bool cmp_read_u16(cmp_ctx_t *ctx, uint16_t *s);
+bool cmp_read_u32(cmp_ctx_t *ctx, uint32_t *i);
+bool cmp_read_u64(cmp_ctx_t *ctx, uint64_t *l);
+
+bool cmp_read_fixext1_marker(cmp_ctx_t *ctx, int8_t *type);
+bool cmp_read_fixext1(cmp_ctx_t *ctx, int8_t *type, void *data);
+bool cmp_read_fixext2_marker(cmp_ctx_t *ctx, int8_t *type);
+bool cmp_read_fixext2(cmp_ctx_t *ctx, int8_t *type, void *data);
+bool cmp_read_fixext4_marker(cmp_ctx_t *ctx, int8_t *type);
+bool cmp_read_fixext4(cmp_ctx_t *ctx, int8_t *type, void *data);
+bool cmp_read_fixext8_marker(cmp_ctx_t *ctx, int8_t *type);
+bool cmp_read_fixext8(cmp_ctx_t *ctx, int8_t *type, void *data);
+bool cmp_read_fixext16_marker(cmp_ctx_t *ctx, int8_t *type);
+bool cmp_read_fixext16(cmp_ctx_t *ctx, int8_t *type, void *data);
+
+bool cmp_read_ext8_marker(cmp_ctx_t *ctx, int8_t *type, uint8_t *size);
+bool cmp_read_ext8(cmp_ctx_t *ctx, int8_t *type, uint8_t *size, void *data);
+bool cmp_read_ext16_marker(cmp_ctx_t *ctx, int8_t *type, uint16_t *size);
+bool cmp_read_ext16(cmp_ctx_t *ctx, int8_t *type, uint16_t *size, void *data);
+bool cmp_read_ext32_marker(cmp_ctx_t *ctx, int8_t *type, uint32_t *size);
+bool cmp_read_ext32(cmp_ctx_t *ctx, int8_t *type, uint32_t *size, void *data);
+
+/*
+ * ============================================================================
+ * === Object API
+ * ============================================================================
+ */
+
+bool cmp_object_is_char(cmp_object_t *obj);
+bool cmp_object_is_short(cmp_object_t *obj);
+bool cmp_object_is_int(cmp_object_t *obj);
+bool cmp_object_is_long(cmp_object_t *obj);
+bool cmp_object_is_sinteger(cmp_object_t *obj);
+bool cmp_object_is_uchar(cmp_object_t *obj);
+bool cmp_object_is_ushort(cmp_object_t *obj);
+bool cmp_object_is_uint(cmp_object_t *obj);
+bool cmp_object_is_ulong(cmp_object_t *obj);
+bool cmp_object_is_uinteger(cmp_object_t *obj);
+bool cmp_object_is_float(cmp_object_t *obj);
+bool cmp_object_is_double(cmp_object_t *obj);
+bool cmp_object_is_nil(cmp_object_t *obj);
+bool cmp_object_is_bool(cmp_object_t *obj);
+bool cmp_object_is_str(cmp_object_t *obj);
+bool cmp_object_is_bin(cmp_object_t *obj);
+bool cmp_object_is_array(cmp_object_t *obj);
+bool cmp_object_is_map(cmp_object_t *obj);
+bool cmp_object_is_ext(cmp_object_t *obj);
+
+bool cmp_object_as_char(cmp_object_t *obj, int8_t *c);
+bool cmp_object_as_short(cmp_object_t *obj, int16_t *s);
+bool cmp_object_as_int(cmp_object_t *obj, int32_t *i);
+bool cmp_object_as_long(cmp_object_t *obj, int64_t *d);
+bool cmp_object_as_sinteger(cmp_object_t *obj, int64_t *d);
+bool cmp_object_as_uchar(cmp_object_t *obj, uint8_t *c);
+bool cmp_object_as_ushort(cmp_object_t *obj, uint16_t *s);
+bool cmp_object_as_uint(cmp_object_t *obj, uint32_t *i);
+bool cmp_object_as_ulong(cmp_object_t *obj, uint64_t *u);
+bool cmp_object_as_uinteger(cmp_object_t *obj, uint64_t *u);
+bool cmp_object_as_float(cmp_object_t *obj, float *f);
+bool cmp_object_as_double(cmp_object_t *obj, double *d);
+bool cmp_object_as_bool(cmp_object_t *obj, bool *b);
+bool cmp_object_as_str(cmp_object_t *obj, uint32_t *size);
+bool cmp_object_as_bin(cmp_object_t *obj, uint32_t *size);
+bool cmp_object_as_array(cmp_object_t *obj, uint32_t *size);
+bool cmp_object_as_map(cmp_object_t *obj, uint32_t *size);
+bool cmp_object_as_ext(cmp_object_t *obj, int8_t *type, uint32_t *size);
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif /* CMP_H__ */
+
+/* vi: set et ts=2 sw=2: */
+
diff --git a/htrace-c/src/util/cmp_util.c b/htrace-c/src/util/cmp_util.c
new file mode 100644
index 0000000..ec6d1e5
--- /dev/null
+++ b/htrace-c/src/util/cmp_util.c
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/cmp_util.h"
+
+#include <stdint.h>
+#include <string.h>
+
+/**
+ * @file cmp_util.c
+ *
+ * Utilities for using the CMP library.
+ */
+
+static size_t cmp_counter_write_fn(struct cmp_ctx_s *c, const void *data,
+ size_t count)
+{
+ struct cmp_counter_ctx *ctx = (struct cmp_counter_ctx *)c;
+ ctx->count += count;
+ return count;
+}
+
+void cmp_counter_ctx_init(struct cmp_counter_ctx *ctx)
+{
+ cmp_init(&ctx->base, NULL, NULL, cmp_counter_write_fn);
+ ctx->count = 0;
+}
+
+static size_t cmp_bcopy_write_fn(struct cmp_ctx_s *c, const void *data,
+ size_t count)
+{
+ struct cmp_bcopy_ctx *ctx = (struct cmp_bcopy_ctx *)c;
+ uint64_t rem, o = ctx->off;
+
+ rem = ctx->len - o;
+ if (rem < count) {
+ count = rem;
+ }
+ memcpy(((uint8_t*)ctx->base.buf) + o, data, count);
+ ctx->off = o + count;
+ return count;
+}
+
+size_t cmp_bcopy_write_nocheck_fn(struct cmp_ctx_s *c, const void *data,
+ size_t count)
+{
+ struct cmp_bcopy_ctx *ctx = (struct cmp_bcopy_ctx *)c;
+ uint64_t o = ctx->off;
+ memcpy(((uint8_t*)ctx->base.buf) + o, data, count);
+ ctx->off = o + count;
+ return count;
+}
+
+static int cmp_bcopy_reader(struct cmp_ctx_s *c, void *data, size_t limit)
+{
+ struct cmp_bcopy_ctx *ctx = (struct cmp_bcopy_ctx *)c;
+ size_t count, o = ctx->off;
+
+ count = ctx->len - o;
+ if (count > limit) {
+ count = limit;
+ }
+ memcpy(data, ((uint8_t*)ctx->base.buf) + o, count);
+ ctx->off = o + count;
+ return count;
+}
+
+void cmp_bcopy_ctx_init(struct cmp_bcopy_ctx *ctx, void *buf, uint64_t len)
+{
+ cmp_init(&ctx->base, buf, cmp_bcopy_reader, cmp_bcopy_write_fn);
+ ctx->off = 0;
+ ctx->len = len;
+}
+
+// vim:ts=4:sw=4:et
diff --git a/htrace-c/src/util/cmp_util.h b/htrace-c/src/util/cmp_util.h
new file mode 100644
index 0000000..0164244
--- /dev/null
+++ b/htrace-c/src/util/cmp_util.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef APACHE_HTRACE_UTIL_CMP_UTIL_H
+#define APACHE_HTRACE_UTIL_CMP_UTIL_H
+
+/**
+ * @file cmp_util.h
+ *
+ * Utilities for using the CMP library.
+ *
+ * This is an internal header, not intended for external use.
+ */
+
+#include "util/cmp.h"
+
+#include <stdint.h> /* for uint64_t */
+
+/**
+ * CMP context for counting the number of bytes in the serialized form.
+ */
+struct cmp_counter_ctx {
+ cmp_ctx_t base;
+ uint64_t count;
+};
+
+/**
+ * Initialize a CMP counter ctx.
+ * This doesn't allocate any memory.
+ *
+ * @param ctx The context to initialize.
+ */
+void cmp_counter_ctx_init(struct cmp_counter_ctx *ctx);
+
+/**
+ * CMP context for counting the number of bytes in the serialized form.
+ */
+struct cmp_bcopy_ctx {
+ cmp_ctx_t base;
+ uint64_t off;
+ uint64_t len;
+};
+
+/**
+ * Initialize a CMP writer ctx.
+ * This doesn't allocate any memory.
+ *
+ * @param ctx The context to initialize.
+ */
+void cmp_bcopy_ctx_init(struct cmp_bcopy_ctx *ctx, void *buf, uint64_t len);
+
+/**
+ * A version of the bcopy write function that does not perform bounds checking.
+ * Useful if the serialized size has already been determined.
+ */
+size_t cmp_bcopy_write_nocheck_fn(struct cmp_ctx_s *c, const void *data,
+ size_t count);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et
diff --git a/htrace-htraced/src/go/Godeps/Godeps.json b/htrace-htraced/src/go/Godeps/Godeps.json
index 10c8e5d..0677fd2 100644
--- a/htrace-htraced/src/go/Godeps/Godeps.json
+++ b/htrace-htraced/src/go/Godeps/Godeps.json
@@ -21,6 +21,10 @@
{
"ImportPath": "github.com/jmhodges/levigo",
"Rev": "2c43dde93d0e056173706534afd514fcbc1dd578"
+ },
+ {
+ "ImportPath": "github.com/ugorji/go",
+ "Rev": "08bbe4aa39b9f189f4e294b5c8408b5fa5787bb2"
}
]
}
diff --git a/htrace-htraced/src/go/gobuild.sh b/htrace-htraced/src/go/gobuild.sh
index 1a4e5f1..81c9f7d 100755
--- a/htrace-htraced/src/go/gobuild.sh
+++ b/htrace-htraced/src/go/gobuild.sh
@@ -47,6 +47,11 @@
cd "${GOBIN}" || die "failed to cd to ${SCRIPT_DIR}"
export GOPATH="${GOBIN}:${SCRIPT_DIR}"
+# Use the unsafe package when possible to get greater speed. For example,
+# go-codec can bypass the overhead of converting between []byte and string in
+# some cases when using unsafe.
+TAGS="-tags unsafe"
+
# Check for go
which go &> /dev/null
if [ $? -ne 0 ]; then
@@ -100,12 +105,12 @@
# Inject the release and git version into the htraced ldflags.
FLAGS="-X main.RELEASE_VERSION ${RELEASE_VERSION} -X main.GIT_VERSION ${GIT_VERSION}"
- go install -ldflags "${FLAGS}" -v org/apache/htrace/... "$@"
+ go install ${TAGS} -ldflags "${FLAGS}" -v org/apache/htrace/... "$@"
;;
bench)
- go test org/apache/htrace/... -test.bench=. "$@"
+ go test org/apache/htrace/... ${TAGS} -test.bench=. "$@"
;;
*)
- go ${ACTION} org/apache/htrace/... "$@"
+ go ${ACTION} org/apache/htrace/... ${TAGS} "$@"
;;
esac
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
index 5406d73..608dd59 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
@@ -20,10 +20,11 @@
package client
import (
+ "bytes"
"encoding/binary"
- "encoding/json"
"errors"
"fmt"
+ "github.com/ugorji/go/codec"
"io"
"net"
"net/rpc"
@@ -45,11 +46,16 @@
return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s",
req.ServiceMethod))
}
- buf, err := json.Marshal(msg)
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ w := bytes.NewBuffer(make([]byte, 0, 2048))
+ enc := codec.NewEncoder(w, mh)
+ err := enc.Encode(msg)
if err != nil {
return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
- "message as JSON: %s", err.Error()))
+ "message as msgpack: %s", err.Error()))
}
+ buf := w.Bytes()
if len(buf) > common.MAX_HRPC_BODY_LENGTH {
return errors.New(fmt.Sprintf("HrpcClientCodec: message body is %d "+
"bytes, but the maximum message size is %d bytes.",
@@ -115,7 +121,9 @@
}
func (cdc *HrpcClientCodec) ReadResponseBody(body interface{}) error {
- dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ dec := codec.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)), mh)
err := dec.Decode(body)
if err != nil {
return errors.New(fmt.Sprintf("Failed to read response body: %s",
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/src/go/src/org/apache/htrace/common/rpc.go
index cdf7e08..fe50a44 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/common/rpc.go
@@ -20,7 +20,7 @@
package common
// The 4-byte magic number which is sent first in the HRPC header
-const HRPC_MAGIC = 0x48545243
+const HRPC_MAGIC = 0x43525448
// Method ID codes. Do not reorder these.
const (
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
index 54ca780..a53380e 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
@@ -21,10 +21,12 @@
import (
"bufio"
+ "bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
+ "github.com/ugorji/go/codec"
"io"
"net"
"net/rpc"
@@ -111,7 +113,9 @@
cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
cdc.length, cdc.conn.RemoteAddr())
}
- dec := json.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)))
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
err := dec.Decode(body)
if err != nil {
return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+
@@ -130,11 +134,16 @@
var err error
buf := EMPTY
if msg != nil {
- buf, err = json.Marshal(msg)
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ w := bytes.NewBuffer(make([]byte, 0, 128))
+ enc := codec.NewEncoder(w, mh)
+ err := enc.Encode(msg)
if err != nil {
return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to marshal "+
"response message: %s", err.Error()))
}
+ buf = w.Bytes()
}
hdr := common.HrpcResponseHeader{}
hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)