HTRACE-308. Deserialize WriteSpans requests incrementally rather than all at once to optimize GC (cmccabe)
diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c
index d92518d..3a2a091 100644
--- a/htrace-c/src/receiver/htraced.c
+++ b/htrace-c/src/receiver/htraced.c
@@ -71,7 +71,7 @@
* The maximum length of the message we will send to the server.
* This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go.
*/
-#define MAX_HRPC_LEN (64ULL * 1024ULL * 1024ULL)
+#define MAX_HRPC_LEN (32ULL * 1024ULL * 1024ULL)
/**
* The maximum length of the prequel in a WriteSpans message.
@@ -490,8 +490,8 @@
#define DEFAULT_TRID_STR "DefaultTrid"
#define DEFAULT_TRID_STR_LEN (sizeof(DEFAULT_TRID_STR) - 1)
-#define SPANS_STR "Spans"
-#define SPANS_STR_LEN (sizeof(SPANS_STR) - 1)
+#define NUM_SPANS_STR "NumSpans"
+#define NUM_SPANS_STR_LEN (sizeof(NUM_SPANS_STR) - 1)
/**
* Write the prequel to the WriteSpans message.
@@ -511,10 +511,10 @@
if (!cmp_write_str(ctx, rcv->tracer->trid, strlen(rcv->tracer->trid))) {
return -1;
}
- if (!cmp_write_fixstr(ctx, SPANS_STR, SPANS_STR_LEN)) {
+ if (!cmp_write_fixstr(ctx, NUM_SPANS_STR, NUM_SPANS_STR_LEN)) {
return -1;
}
- if (!cmp_write_array(ctx, sbuf->num_spans)) {
+ if (!cmp_write_uint(ctx, sbuf->num_spans)) {
return -1;
}
return bctx.off;
diff --git a/htrace-htraced/go/Godeps/Godeps.json b/htrace-htraced/go/Godeps/Godeps.json
index 7c737fe..2db37be 100644
--- a/htrace-htraced/go/Godeps/Godeps.json
+++ b/htrace-htraced/go/Godeps/Godeps.json
@@ -24,7 +24,7 @@
},
{
"ImportPath": "github.com/ugorji/go/codec",
- "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58"
+ "Rev": "ea9cd21fa0bc41ee4bdd50ac7ed8cbc7ea2ed960"
}
]
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 65b04e4..a2a6f8b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -142,26 +142,36 @@
return &span, nil
}
-func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
+func (hcl *Client) WriteSpans(spans []*common.Span) error {
if hcl.hrpcAddr == "" {
- return hcl.writeSpansHttp(req)
+ return hcl.writeSpansHttp(spans)
}
hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks)
if err != nil {
return err
}
defer hcr.Close()
- return hcr.writeSpans(req)
+ return hcr.writeSpans(spans)
}
-func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
+func (hcl *Client) writeSpansHttp(spans []*common.Span) error {
+ req := common.WriteSpansReq {
+ NumSpans: len(spans),
+ }
var w bytes.Buffer
enc := json.NewEncoder(&w)
err := enc.Encode(req)
if err != nil {
- return errors.New(fmt.Sprintf("Error serializing span: %s",
+ return errors.New(fmt.Sprintf("Error serializing WriteSpansReq: %s",
err.Error()))
}
+ for spanIdx := range(spans) {
+ err := enc.Encode(spans[spanIdx])
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error serializing span %d out " +
+ "of %d: %s", spanIdx, len(spans), err.Error()))
+ }
+ }
_, _, err = hcl.makeRestRequest("POST", "writeSpans", &w)
if err != nil {
return err
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
index 2fcd9a0..43f0c6c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
@@ -41,20 +41,41 @@
testHooks *TestHooks
}
-func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error {
- methodId := common.HrpcMethodNameToId(req.ServiceMethod)
+func (cdc *HrpcClientCodec) WriteRequest(rr *rpc.Request, msg interface{}) error {
+ methodId := common.HrpcMethodNameToId(rr.ServiceMethod)
if methodId == common.METHOD_ID_NONE {
return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s",
- req.ServiceMethod))
+ rr.ServiceMethod))
}
mh := new(codec.MsgpackHandle)
mh.WriteExt = true
w := bytes.NewBuffer(make([]byte, 0, 2048))
+
+ var err error
enc := codec.NewEncoder(w, mh)
- err := enc.Encode(msg)
- if err != nil {
- return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
- "message as msgpack: %s", err.Error()))
+ if methodId == common.METHOD_ID_WRITE_SPANS {
+ spans := msg.([]*common.Span)
+ req := &common.WriteSpansReq {
+ NumSpans: len(spans),
+ }
+ err = enc.Encode(req)
+ if err != nil {
+ return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+ "message as msgpack: %s", err.Error()))
+ }
+ for spanIdx := range(spans) {
+ err = enc.Encode(spans[spanIdx])
+ if err != nil {
+ return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+ "span %d out of %d as msgpack: %s", spanIdx, len(spans), err.Error()))
+ }
+ }
+ } else {
+ err = enc.Encode(msg)
+ if err != nil {
+ return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+ "message as msgpack: %s", err.Error()))
+ }
}
buf := w.Bytes()
if len(buf) > common.MAX_HRPC_BODY_LENGTH {
@@ -65,7 +86,7 @@
hdr := common.HrpcRequestHeader{
Magic: common.HRPC_MAGIC,
MethodId: methodId,
- Seq: req.Seq,
+ Seq: rr.Seq,
Length: uint32(len(buf)),
}
err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr)
@@ -154,9 +175,9 @@
return &hcr, nil
}
-func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error {
+func (hcr *hClient) writeSpans(spans []*common.Span) error {
resp := common.WriteSpansResp{}
- return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp)
+ return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, spans, &resp)
}
func (hcr *hClient) Close() {
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 2ec5fe9..5f02db6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -34,13 +34,13 @@
const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024
// Maximum length of HRPC message body
-const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
+const MAX_HRPC_BODY_LENGTH = 32 * 1024 * 1024
// A request to write spans to htraced.
+// This request is followed by a sequence of spans.
type WriteSpansReq struct {
- Addr string `json:",omitempty"` // This gets filled in by the RPC layer.
DefaultTrid string `json:",omitempty"`
- Spans []*Span
+ NumSpans int
}
// Info returned by /server/version
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 3a877f6..7b64914 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -114,9 +114,7 @@
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
// Write half of the spans to htraced via the client.
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans[0 : NUM_TEST_SPANS/2],
- })
+ err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2])
if err != nil {
t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
err.Error())
@@ -209,9 +207,7 @@
NUM_TEST_SPANS := 100
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
sort.Sort(allSpans)
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans,
- })
+ err = hcl.WriteSpans(allSpans)
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
@@ -325,9 +321,7 @@
allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
go func(i int) {
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans[i : i+1],
- })
+ err = hcl.WriteSpans(allSpans[i : i+1])
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
@@ -379,9 +373,7 @@
// Keep in mind that we only block until we have seen
// TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that,
// we let requests through so that the test can exit cleanly.
- hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans[i : i+1],
- })
+ hcl.WriteSpans(allSpans[i : i+1])
}(iter)
}
for {
@@ -398,6 +390,7 @@
htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
Cnf: map[string]string{
conf.HTRACE_LOG_LEVEL: "INFO",
+ conf.HTRACE_NUM_HRPC_HANDLERS: "20",
},
WrittenSpans: common.NewSemaphore(int64(1 - N)),
}
@@ -416,7 +409,7 @@
// body length limit. TODO: a production-quality golang client would do
// this internally rather than needing us to do it here in the unit test.
bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
- reqs := make([]*common.WriteSpansReq, 0, 4)
+ reqs := make([][]*common.Span, 0, 4)
curReq := -1
curReqLen := bodyLen
var curReqSpans uint32
@@ -429,7 +422,7 @@
span := allSpans[n]
if (curReqSpans >= maxSpansPerRpc) ||
(curReqLen >= bodyLen) {
- reqs = append(reqs, &common.WriteSpansReq{})
+ reqs = append(reqs, make([]*common.Span, 0, 16))
curReqLen = 0
curReq++
curReqSpans = 0
@@ -446,7 +439,7 @@
panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen))
}
curReqLen += bufLen
- reqs[curReq].Spans = append(reqs[curReq].Spans, span)
+ reqs[curReq] = append(reqs[curReq], span)
curReqSpans++
}
ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs))
@@ -465,13 +458,13 @@
// Write many random spans.
for reqIdx := range reqs {
- go func() {
- err = hcl.WriteSpans(reqs[reqIdx])
+ go func(i int) {
+ err = hcl.WriteSpans(reqs[i])
if err != nil {
panic(fmt.Sprintf("failed to send WriteSpans request %d: %s",
- reqIdx, err.Error()))
+ i, err.Error()))
}
- }()
+ }(reqIdx)
}
// Wait for all the spans to be written.
ht.Store.WrittenSpans.Wait()
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 4fc400a..ebf3c47 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -478,9 +478,7 @@
// Create some random trace spans.
NUM_TEST_SPANS := 5
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans,
- })
+ err = hcl.WriteSpans(allSpans)
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index a6f6751..ecd13d4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -32,7 +32,6 @@
"net/rpc"
"org/apache/htrace/common"
"org/apache/htrace/conf"
- "reflect"
"sync"
"sync/atomic"
"time"
@@ -101,6 +100,13 @@
// The number of messages this connection has handled.
numHandled int
+
+ // The buffer for reading requests. These buffers are reused for multiple
+ // requests to avoid allocating memory.
+ buf []byte
+
+ // Configuration for msgpack decoding
+ msgpackHandle codec.MsgpackHandle
}
func asJson(val interface{}) string {
@@ -164,29 +170,55 @@
}
func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+ remoteAddr := cdc.conn.RemoteAddr().String()
if cdc.lg.TraceEnabled() {
cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
- cdc.conn.RemoteAddr(), cdc.length)
+ remoteAddr, cdc.length)
}
- mh := new(codec.MsgpackHandle)
- mh.WriteExt = true
- dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
- err := dec.Decode(body)
+ if cap(cdc.buf) < int(cdc.length) {
+ var pow uint
+ for pow=0;(1<<pow) < int(cdc.length);pow++ {
+ }
+ cdc.buf = make([]byte, 0, 1<<pow)
+ }
+ _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+
"request body: %s", cdc.length, err.Error()))
}
- if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("%s: read %d-byte request body %s\n",
- cdc.conn.RemoteAddr(), cdc.length, asJson(&body))
- }
- val := reflect.ValueOf(body)
- addr := val.Elem().FieldByName("Addr")
- if addr.IsValid() {
- addr.SetString(cdc.conn.RemoteAddr().String())
- }
var zeroTime time.Time
cdc.conn.SetDeadline(zeroTime)
+
+ dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
+ err = dec.Decode(body)
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("%s: read HRPC message: %s\n",
+ remoteAddr, asJson(&body))
+ }
+ req := body.(*common.WriteSpansReq)
+ if req == nil {
+ return nil
+ }
+ // We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage
+ // collector with a ton of trace spans all at once.
+ startTime := time.Now()
+ client, _, err := net.SplitHostPort(remoteAddr)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+
+ "for %s: %s\n", remoteAddr, err.Error()))
+ }
+ hand := cdc.hsv.hand
+ ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+ for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
+ var span *common.Span
+ err := dec.Decode(&span)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d " +
+ "out of %d: %s\n", spanIdx, req.NumSpans, err.Error()))
+ }
+ ing.IngestSpan(span)
+ }
+ ing.Close(startTime)
return nil
}
@@ -197,10 +229,8 @@
var err error
buf := EMPTY
if msg != nil {
- mh := new(codec.MsgpackHandle)
- mh.WriteExt = true
w := bytes.NewBuffer(make([]byte, 0, 128))
- enc := codec.NewEncoder(w, mh)
+ enc := codec.NewEncoder(w, &cdc.msgpackHandle)
err := enc.Encode(msg)
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
@@ -257,20 +287,8 @@
}
func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
- resp *common.WriteSpansResp) (err error) {
- startTime := time.Now()
- hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+
- "defaultTrid = %s\n", len(req.Spans), req.DefaultTrid)
- client, _, err := net.SplitHostPort(req.Addr)
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to split host and port "+
- "for %s: %s\n", req.Addr, err.Error()))
- }
- ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
- for spanIdx := range req.Spans {
- ing.IngestSpan(req.Spans[spanIdx])
- }
- ing.Close(startTime)
+ resp *common.WriteSpansResp) (err error) {
+ // Nothing to do here; WriteSpans is handled in ReadRequestBody.
return nil
}
@@ -303,6 +321,9 @@
hsv.cdcs <- &HrpcServerCodec{
lg: lg,
hsv: hsv,
+ msgpackHandle: codec.MsgpackHandle {
+ WriteExt: true,
+ },
}
}
var err error
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index bad7889..6daf640 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -118,9 +118,7 @@
NUM_TEST_SPANS := 12
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans,
- })
+ err = hcl.WriteSpans(allSpans)
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index da82912..74ec0cf 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -24,8 +24,6 @@
"encoding/json"
"fmt"
"github.com/gorilla/mux"
- "io"
- "io/ioutil"
"net"
"net/http"
"org/apache/htrace/common"
@@ -230,34 +228,32 @@
req.RemoteAddr, serr.Error()))
return
}
- var dec *json.Decoder
- if hand.lg.TraceEnabled() {
- b, err := ioutil.ReadAll(req.Body)
- if err != nil {
- writeError(hand.lg, w, http.StatusBadRequest,
- fmt.Sprintf("Error reading span data: %s", err.Error()))
- return
- }
- hand.lg.Tracef("writeSpansHandler: read %s\n", string(b))
- dec = json.NewDecoder(bytes.NewBuffer(b))
- } else {
- dec = json.NewDecoder(req.Body)
- }
+ dec := json.NewDecoder(req.Body)
var msg common.WriteSpansReq
err := dec.Decode(&msg)
- if (err != nil) && (err != io.EOF) {
+ if (err != nil) {
writeError(hand.lg, w, http.StatusBadRequest,
fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error()))
return
}
- hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultTrid = %s\n",
- len(msg.Spans), msg.DefaultTrid)
-
+ if hand.lg.TraceEnabled() {
+ hand.lg.Tracef("%s: read WriteSpans REST message: %s\n",
+ req.RemoteAddr, asJson(&msg))
+ }
ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
- for spanIdx := range msg.Spans {
- ing.IngestSpan(msg.Spans[spanIdx])
+ for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ {
+ var span *common.Span
+ err := dec.Decode(&span)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Failed to decode span %d out of %d: ",
+ spanIdx, msg.NumSpans, err.Error()))
+ return
+ }
+ ing.IngestSpan(span)
}
ing.Close(startTime)
+ return
}
type queryHandler struct {
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 9837e94..2eff0a8 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -362,9 +362,7 @@
}
fmt.Printf("\n")
}
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: spans,
- })
+ err = hcl.WriteSpans(spans)
if err != nil {
fmt.Println(err.Error())
return EXIT_FAILURE
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
index 3206dd6..e5059f7 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
@@ -113,11 +113,11 @@
*/
final static String BUFFER_SIZE_KEY =
"htraced.receiver.buffer.size";
- final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024;
+ final static int BUFFER_SIZE_DEFAULT = 16 * 1024 * 1024;
static int BUFFER_SIZE_MIN = 4 * 1024 * 1024;
// The maximum buffer size should not be longer than
// PackedBuffer.MAX_HRPC_BODY_LENGTH.
- final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024;
+ final static int BUFFER_SIZE_MAX = 32 * 1024 * 1024;
/**
* Set the fraction of the span buffer which needs to fill up before we
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
index f867ad7..dd0a4b9 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
@@ -78,7 +78,7 @@
private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
private static final Charset UTF8 = StandardCharsets.UTF_8;
- private static final byte SPANS[] = "Spans".getBytes(UTF8);
+ private static final byte NUM_SPANS[] = "NumSpans".getBytes(UTF8);
private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8);
private static final byte A[] = "a".getBytes(UTF8);
private static final byte B[] = "b".getBytes(UTF8);
@@ -401,9 +401,9 @@
packer.writePayload(DEFAULT_PID);
packer.packString(defaultPid);
}
- packer.packRawStringHeader(SPANS.length);
- packer.writePayload(SPANS);
- packer.packArrayHeader(numSpans);
+ packer.packRawStringHeader(NUM_SPANS.length);
+ packer.writePayload(NUM_SPANS);
+ packer.packInt(numSpans);
packer.flush();
success = true;
} finally {
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
index 377d19f..39e5f99 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
@@ -42,15 +42,12 @@
class RestBufferManager implements BufferManager {
private static final Log LOG = LogFactory.getLog(RestBufferManager.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
- private static final byte COMMA_BYTE = (byte)0x2c;
private static final int MAX_PREQUEL_LENGTH = 512;
- private static final int MAX_EPILOGUE_LENGTH = 32;
private final Conf conf;
private final HttpClient httpClient;
private final String urlString;
private final ByteBuffer prequel;
private final ByteBuffer spans;
- private final ByteBuffer epilogue;
private int numSpans;
private static class RestBufferManagerContentProvider
@@ -122,7 +119,6 @@
conf.endpoint.getPort(), "/writeSpans").toString();
this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH);
this.spans = ByteBuffer.allocate(conf.bufferSize);
- this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH);
clear();
this.httpClient.start();
}
@@ -130,11 +126,10 @@
@Override
public void writeSpan(Span span) throws IOException {
byte[] spanJsonBytes = span.toString().getBytes(UTF8);
- if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) {
- // Make sure we have enough space for the span JSON and a comma.
+ if ((spans.capacity() - spans.position()) < spanJsonBytes.length) {
+ // Make sure we have enough space for the span JSON.
throw new IOException("Not enough space remaining in span buffer.");
}
- spans.put(COMMA_BYTE);
spans.put(spanJsonBytes);
numSpans++;
}
@@ -151,16 +146,14 @@
@Override
public void prepare() throws IOException {
- String prequelString = "{\"Spans\":[";
+ StringBuilder bld = new StringBuilder();
+ bld.append("{\"NumSpans\":").append(numSpans).append("}");
+ String prequelString = bld.toString();
prequel.put(prequelString.getBytes(UTF8));
prequel.flip();
spans.flip();
- String epilogueString = "]}";
- epilogue.put(epilogueString.toString().getBytes(UTF8));
- epilogue.flip();
-
if (LOG.isTraceEnabled()) {
LOG.trace("Preparing to send " + contentLength() + " bytes of span " +
"data to " + conf.endpointStr + ", containing " + numSpans +
@@ -172,12 +165,11 @@
public void flush() throws IOException {
// Position the buffers at the beginning.
prequel.position(0);
- spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma
- epilogue.position(0);
+ spans.position(0);
RestBufferManagerContentProvider contentProvider =
new RestBufferManagerContentProvider(
- new ByteBuffer[] { prequel, spans, epilogue });
+ new ByteBuffer[] { prequel, spans });
long rpcLength = contentProvider.getLength();
try {
Request request = httpClient.
@@ -206,7 +198,6 @@
public void clear() {
prequel.clear();
spans.clear();
- epilogue.clear();
numSpans = 0;
}