HTRACE-150. htraced: add HRPC endpoint for writeSpans (cmccabe)
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go b/htrace-core/src/go/src/org/apache/htrace/client/client.go
index 5e594d9..44e2f69 100644
--- a/htrace-core/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go
@@ -35,13 +35,24 @@
 // TODO: fancier APIs for streaming spans in the background, optimize TCP stuff
 
 func NewClient(cnf *conf.Config) (*Client, error) {
-	hcl := Client{restAddr: cnf.Get(conf.HTRACE_WEB_ADDRESS)}
+	hcl := Client{}
+	hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
+	if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+		var err error
+		hcl.hcr, err = newHClient(cnf)
+		if err != nil {
+			return nil, err
+		}
+	}
 	return &hcl, nil
 }
 
 type Client struct {
 	// REST address of the htraced server.
 	restAddr string
+
+	// The HRPC client, or null if it is not enabled.
+	hcr *hClient
 }
 
 // Get the htraced server information.
@@ -77,12 +88,42 @@
 	return &span, nil
 }
 
-func (hcl *Client) WriteSpan(span *common.Span) error {
-	buf, err := json.Marshal(span)
-	if err != nil {
-		return err
+func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
+	if hcl.hcr != nil {
+		return hcl.hcr.writeSpans(req)
+	} else {
+		return hcl.writeSpansHttp(req)
 	}
-	_, _, err = hcl.makeRestRequest("POST", "writeSpans", bytes.NewReader(buf))
+}
+
+func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
+	var w bytes.Buffer
+	var err error
+	for i := range req.Spans {
+		var buf []byte
+		buf, err = json.Marshal(req.Spans[i])
+		if err != nil {
+			return errors.New(fmt.Sprintf("Error serializing span: %s",
+				err.Error()))
+		}
+		_, err = w.Write(buf)
+		if err != nil {
+			return errors.New(fmt.Sprintf("Error writing span: %s",
+				err.Error()))
+		}
+		_, err = w.Write([]byte{'\n'})
+		//err = io.WriteString(&w, "\n")
+		if err != nil {
+			return errors.New(fmt.Sprintf("Error writing: %s",
+				err.Error()))
+		}
+	}
+	customHeaders := make(map[string]string)
+	if req.DefaultPid != "" {
+		customHeaders["htrace-pid"] = req.DefaultPid
+	}
+	_, _, err = hcl.makeRestRequest("POST", "writeSpans",
+		&w, customHeaders)
 	if err != nil {
 		return err
 	}
@@ -90,7 +131,6 @@
 }
 
 // Find the child IDs of a given span ID.
-// TODO: add offset as well as limit?
 func (hcl *Client) FindChildren(sid common.SpanId, lim int) ([]common.SpanId, error) {
 	buf, _, err := hcl.makeGetRequest(fmt.Sprintf("span/%016x/children?lim=%d",
 		uint64(sid), lim))
@@ -126,17 +166,24 @@
 	return spans, nil
 }
 
+var EMPTY = make(map[string]string)
+
 func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
-	return hcl.makeRestRequest("GET", reqName, nil)
+	return hcl.makeRestRequest("GET", reqName, nil, EMPTY)
 }
 
 // Make a general JSON REST request.
 // Returns the request body, the response code, and the error.
 // Note: if the response code is non-zero, the error will also be non-zero.
-func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader) ([]byte, int, error) {
-	url := fmt.Sprintf("http://%s/%s", hcl.restAddr, reqName)
+func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader,
+	customHeaders map[string]string) ([]byte, int, error) {
+	url := fmt.Sprintf("http://%s/%s",
+		hcl.restAddr, reqName)
 	req, err := http.NewRequest(reqType, url, reqBody)
 	req.Header.Set("Content-Type", "application/json")
+	for k, v := range customHeaders {
+		req.Header.Set(k, v)
+	}
 	client := &http.Client{}
 	resp, err := client.Do(req)
 	if err != nil {
@@ -187,3 +234,11 @@
 		searchId = spans[len(spans)-1].Id + 1
 	}
 }
+
+func (hcl *Client) Close() {
+	if hcl.hcr != nil {
+		hcl.hcr.Close()
+	}
+	hcl.restAddr = ""
+	hcl.hcr = nil
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/hclient.go b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go
new file mode 100644
index 0000000..1730c02
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+	"encoding/binary"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/rpc"
+	"org/apache/htrace/common"
+	"org/apache/htrace/conf"
+)
+
+type hClient struct {
+	rpcClient *rpc.Client
+}
+
+type HrpcClientCodec struct {
+	rwc    io.ReadWriteCloser
+	length uint32
+}
+
+func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error {
+	methodId := common.HrpcMethodNameToId(req.ServiceMethod)
+	if methodId == common.METHOD_ID_NONE {
+		return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s",
+			req.ServiceMethod))
+	}
+	buf, err := json.Marshal(msg)
+	if err != nil {
+		return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+			"message as JSON: %s", err.Error()))
+	}
+	if len(buf) > common.MAX_HRPC_BODY_LENGTH {
+		return errors.New(fmt.Sprintf("HrpcClientCodec: message body is %d "+
+			"bytes, but the maximum message size is %d bytes.",
+			len(buf), common.MAX_HRPC_BODY_LENGTH))
+	}
+	hdr := common.HrpcRequestHeader{
+		Magic:    common.HRPC_MAGIC,
+		MethodId: methodId,
+		Seq:      req.Seq,
+		Length:   uint32(len(buf)),
+	}
+	err = binary.Write(cdc.rwc, binary.BigEndian, &hdr)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Error writing header bytes: %s",
+			err.Error()))
+	}
+	_, err = cdc.rwc.Write(buf)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Error writing body bytes: %s",
+			err.Error()))
+	}
+	return nil
+}
+
+func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error {
+	hdr := common.HrpcResponseHeader{}
+	err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Error reading response header "+
+			"bytes: %s", err.Error()))
+	}
+	resp.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+	if resp.ServiceMethod == "" {
+		return errors.New(fmt.Sprintf("Error reading response header: "+
+			"invalid method ID %d.", hdr.MethodId))
+	}
+	resp.Seq = hdr.Seq
+	if hdr.ErrLength > 0 {
+		if hdr.ErrLength > common.MAX_HRPC_ERROR_LENGTH {
+			return errors.New(fmt.Sprintf("Error reading response header: "+
+				"error message was %d bytes long, but "+
+				"MAX_HRPC_ERROR_LENGTH is %d.",
+				hdr.ErrLength, common.MAX_HRPC_ERROR_LENGTH))
+		}
+		buf := make([]byte, hdr.ErrLength)
+		var nread int
+		nread, err = cdc.rwc.Read(buf)
+		if uint32(nread) != hdr.ErrLength {
+			return errors.New(fmt.Sprintf("Error reading response header: "+
+				"failed to read %d bytes of error message.", nread))
+		}
+		if err != nil {
+			return errors.New(fmt.Sprintf("Error reading response header: "+
+				"failed to read %d bytes of error message: %s",
+				nread, err.Error()))
+		}
+		resp.Error = string(buf)
+	} else {
+		resp.Error = ""
+	}
+	cdc.length = hdr.Length
+	return nil
+}
+
+func (cdc *HrpcClientCodec) ReadResponseBody(body interface{}) error {
+	dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+	err := dec.Decode(body)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Failed to read response body: %s",
+			err.Error()))
+	}
+	return nil
+}
+
+func (cdc *HrpcClientCodec) Close() error {
+	return cdc.rwc.Close()
+}
+
+func newHClient(cnf *conf.Config) (*hClient, error) {
+	hcr := hClient{}
+	addr := cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+	conn, err := net.Dial("tcp", addr)
+	if err != nil {
+		return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+
+			"at %s: %s", addr, err.Error()))
+	}
+	hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
+	return &hcr, nil
+}
+
+func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error {
+	resp := common.WriteSpansResp{}
+	return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp)
+}
+
+func (hcr *hClient) Close() {
+	hcr.rpcClient.Close()
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/rpc.go b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go
new file mode 100644
index 0000000..cdf7e08
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+// The 4-byte magic number which is sent first in the HRPC header
+const HRPC_MAGIC = 0x48545243
+
+// Method ID codes.  Do not reorder these.
+const (
+	METHOD_ID_NONE        = 0
+	METHOD_ID_WRITE_SPANS = iota
+)
+
+const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans"
+
+// Maximum length of the error message passed in an HRPC response
+const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024
+
+// Maximum length of HRPC message body
+const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
+
+// A request to write spans to htraced.
+type WriteSpansReq struct {
+	DefaultPid string
+	Spans      []*Span
+}
+
+// A response to a WriteSpansReq
+type WriteSpansResp struct {
+}
+
+// The header which is sent over the wire for HRPC
+type HrpcRequestHeader struct {
+	Magic    uint32
+	MethodId uint32
+	Seq      uint64
+	Length   uint32
+}
+
+// The response which is sent over the wire for HRPC
+type HrpcResponseHeader struct {
+	Seq       uint64
+	MethodId  uint32
+	ErrLength uint32
+	Length    uint32
+}
+
+func HrpcMethodIdToMethodName(id uint32) string {
+	switch id {
+	case METHOD_ID_WRITE_SPANS:
+		return METHOD_NAME_WRITE_SPANS
+	default:
+		return ""
+	}
+}
+
+func HrpcMethodNameToId(name string) uint32 {
+	switch name {
+	case METHOD_NAME_WRITE_SPANS:
+		return METHOD_ID_WRITE_SPANS
+	default:
+		return METHOD_ID_NONE
+	}
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
index ba63f2d..ccb09e0 100644
--- a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
@@ -47,6 +47,12 @@
 // The default port for the Htrace web address.
 const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9095
 
+// The web address to start the REST server on.
+const HTRACE_HRPC_ADDRESS = "hrpc.address"
+
+// The default port for the Htrace HRPC address.
+const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075
+
 // The directories to put the data store into.  Separated by PATH_LIST_SEP.
 const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories"
 
@@ -69,7 +75,8 @@
 
 // Default values for HTrace configuration keys.
 var DEFAULTS = map[string]string{
-	HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
+	HTRACE_WEB_ADDRESS:  fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
+	HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT),
 	HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" +
 		PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2",
 	HTRACE_DATA_STORE_CLEAR:            "false",
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
index 0317237..38cdb58 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
@@ -21,6 +21,7 @@
 
 import (
 	"bufio"
+	"bytes"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -203,8 +204,17 @@
 		return EXIT_FAILURE
 	}
 	defer file.Close()
-	in := bufio.NewReader(file)
-	dec := json.NewDecoder(in)
+	return doLoadSpans(hcl, bufio.NewReader(file))
+}
+
+func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
+	return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
+}
+
+func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
+	dec := json.NewDecoder(reader)
+	spans := make([]*common.Span, 0, 32)
+	var err error
 	for {
 		var span common.Span
 		if err = dec.Decode(&span); err != nil {
@@ -214,26 +224,20 @@
 			fmt.Printf("Failed to decode JSON: %s\n", err.Error())
 			return EXIT_FAILURE
 		}
-		if *verbose {
-			fmt.Printf("wrote %s\n", span.ToJson())
-		}
-		if err = hcl.WriteSpan(&span); err != nil {
-			fmt.Println(err.Error())
-			return EXIT_FAILURE
-		}
+		spans = append(spans, &span)
 	}
-	return EXIT_SUCCESS
-}
-
-func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
-	spanBytes := []byte(spanJson)
-	var span common.Span
-	err := json.Unmarshal(spanBytes, &span)
-	if err != nil {
-		fmt.Printf("Error parsing provided JSON: %s\n", err.Error())
-		return EXIT_FAILURE
+	if *verbose {
+		fmt.Printf("Writing ")
+		prefix := ""
+		for i := range spans {
+			fmt.Printf("%s%s", prefix, spans[i].ToJson())
+			prefix = ", "
+		}
+		fmt.Printf("\n")
 	}
-	err = hcl.WriteSpan(&span)
+	err = hcl.WriteSpans(&common.WriteSpansReq{
+		Spans: spans,
+	})
 	if err != nil {
 		fmt.Println(err.Error())
 		return EXIT_FAILURE
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
index f9e66ce..218c1c8 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
@@ -79,10 +79,12 @@
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
 
 	// Write half of the spans to htraced via the client.
-	for i := 0; i < NUM_TEST_SPANS/2; i++ {
-		if err := hcl.WriteSpan(allSpans[i]); err != nil {
-			t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error())
-		}
+	err = hcl.WriteSpans(&common.WriteSpansReq{
+		Spans: allSpans[0 : NUM_TEST_SPANS/2],
+	})
+	if err != nil {
+		t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
+			err.Error())
 	}
 
 	// Look up the first half of the spans.  They should be found.
@@ -165,11 +167,11 @@
 	NUM_TEST_SPANS := 100
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
 	sort.Sort(allSpans)
-	for i := range allSpans {
-		err = hcl.WriteSpan(allSpans[i])
-		if err != nil {
-			t.Fatalf("failed to write span %d: %s", i, err.Error())
-		}
+	err = hcl.WriteSpans(&common.WriteSpansReq{
+		Spans: allSpans,
+	})
+	if err != nil {
+		t.Fatalf("WriteSpans failed: %s\n", err.Error())
 	}
 	out := make(chan *common.Span, 50)
 	var dumpErr error
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
index 77497c5..79a7c4f 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -374,10 +374,11 @@
 	// Create some random trace spans.
 	NUM_TEST_SPANS := 5
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-	for i := 0; i < NUM_TEST_SPANS; i++ {
-		if err := hcl.WriteSpan(allSpans[i]); err != nil {
-			t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error())
-		}
+	err = hcl.WriteSpans(&common.WriteSpansReq{
+		Spans: allSpans,
+	})
+	if err != nil {
+		t.Fatalf("WriteSpans failed: %s\n", err.Error())
 	}
 
 	// Look up the spans we wrote.
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
new file mode 100644
index 0000000..9696cbc
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"bufio"
+	"encoding/binary"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/rpc"
+	"org/apache/htrace/common"
+	"org/apache/htrace/conf"
+)
+
+// Handles HRPC calls
+type HrpcHandler struct {
+	lg    *common.Logger
+	store *dataStore
+}
+
+// The HRPC server
+type HrpcServer struct {
+	*rpc.Server
+	hand     *HrpcHandler
+	listener net.Listener
+}
+
+// Codec which encodes HRPC data via JSON
+type HrpcServerCodec struct {
+	rwc    io.ReadWriteCloser
+	length uint32
+}
+
+func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
+	hdr := common.HrpcRequestHeader{}
+	err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Error reading header bytes: %s", err.Error()))
+	}
+	if hdr.Magic != common.HRPC_MAGIC {
+		return errors.New(fmt.Sprintf("Invalid request header: expected "+
+			"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
+	}
+	if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
+		return errors.New(fmt.Sprintf("Length prefix was too long.  Maximum "+
+			"length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length))
+	}
+	req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+	if req.ServiceMethod == "" {
+		return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x",
+			hdr.MethodId))
+	}
+	req.Seq = hdr.Seq
+	cdc.length = hdr.Length
+	return nil
+}
+
+func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+	dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+	err := dec.Decode(body)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Failed to read request body: %s",
+			err.Error()))
+	}
+	return nil
+}
+
+var EMPTY []byte = make([]byte, 0)
+
+func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
+	var err error
+	buf := EMPTY
+	if msg != nil {
+		buf, err = json.Marshal(msg)
+		if err != nil {
+			return errors.New(fmt.Sprintf("Failed to marshal response message: %s",
+				err.Error()))
+		}
+	}
+	hdr := common.HrpcResponseHeader{}
+	hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
+	hdr.Seq = resp.Seq
+	hdr.ErrLength = uint32(len(resp.Error))
+	hdr.Length = uint32(len(buf))
+	writer := bufio.NewWriterSize(cdc.rwc, 256)
+	err = binary.Write(writer, binary.BigEndian, &hdr)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Failed to write response header: %s",
+			err.Error()))
+	}
+	if hdr.ErrLength > 0 {
+		_, err = io.WriteString(writer, resp.Error)
+		if err != nil {
+			return errors.New(fmt.Sprintf("Failed to write error string: %s",
+				err.Error()))
+		}
+	}
+	if hdr.Length > 0 {
+		var length int
+		length, err = writer.Write(buf)
+		if err != nil {
+			return errors.New(fmt.Sprintf("Failed to write response "+
+				"message: %s", err.Error()))
+		}
+		if uint32(length) != hdr.Length {
+			return errors.New(fmt.Sprintf("Failed to write all of response "+
+				"message: %s", err.Error()))
+		}
+	}
+	err = writer.Flush()
+	if err != nil {
+		return errors.New(fmt.Sprintf("Failed to write the response bytes: "+
+			"%s", err.Error()))
+	}
+	return nil
+}
+
+func (cdc *HrpcServerCodec) Close() error {
+	return cdc.rwc.Close()
+}
+
+func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
+	resp *common.WriteSpansResp) (err error) {
+	hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s).  "+
+		"defaultPid = %s\n", len(req.Spans), req.DefaultPid)
+	for i := range req.Spans {
+		span := req.Spans[i]
+		if span.ProcessId == "" {
+			span.ProcessId = req.DefaultPid
+		}
+		hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
+		hand.store.WriteSpan(span)
+	}
+	return nil
+}
+
+func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) {
+	lg := common.NewLogger("hrpc", cnf)
+	hsv := &HrpcServer{
+		Server: rpc.NewServer(),
+		hand: &HrpcHandler{
+			lg:    lg,
+			store: store,
+		},
+	}
+	var err error
+	hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
+	if err != nil {
+		return nil, err
+	}
+	hsv.Server.Register(hsv.hand)
+	go hsv.run()
+	lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
+	return hsv, nil
+}
+
+func (hsv *HrpcServer) run() {
+	lg := hsv.hand.lg
+	for {
+		conn, err := hsv.listener.Accept()
+		if err != nil {
+			lg.Errorf("HRPC Accept error: %s\n", err.Error())
+			continue
+		}
+		go hsv.ServeCodec(&HrpcServerCodec{
+			rwc: conn,
+		})
+	}
+}
+
+func (hsv *HrpcServer) Addr() net.Addr {
+	return hsv.listener.Addr()
+}
+
+func (hsv *HrpcServer) Close() {
+	hsv.listener.Close()
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
index c3432b4..64da457 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
@@ -78,12 +78,26 @@
 		lg.Errorf("Error creating REST server: %s\n", err.Error())
 		os.Exit(1)
 	}
+	var hsv *HrpcServer
+	if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+		hsv, err = CreateHrpcServer(cnf, store)
+		if err != nil {
+			lg.Errorf("Error creating HRPC server: %s\n", err.Error())
+			os.Exit(1)
+		}
+	} else {
+		lg.Infof("Not starting HRPC server because no value was given for %s.\n",
+			conf.HTRACE_HRPC_ADDRESS)
+	}
 	naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
 	if naddr != "" {
 		notif := StartupNotification{
 			HttpAddr:  rsv.Addr().String(),
 			ProcessId: os.Getpid(),
 		}
+		if hsv != nil {
+			notif.HrpcAddr = hsv.Addr().String()
+		}
 		err = sendStartupNotification(naddr, &notif)
 		if err != nil {
 			fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
@@ -100,6 +114,7 @@
 // Used by unit tests.
 type StartupNotification struct {
 	HttpAddr  string
+	HrpcAddr  string
 	ProcessId int
 }
 
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
index be7e284..a54f2cb 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -62,6 +62,7 @@
 	DataDirs            []string
 	Store               *dataStore
 	Rsv                 *RestServer
+	Hsv                 *HrpcServer
 	Lg                  *common.Logger
 	KeepDataDirsOnClose bool
 }
@@ -70,6 +71,7 @@
 	var err error
 	var store *dataStore
 	var rsv *RestServer
+	var hsv *HrpcServer
 	if bld.Name == "" {
 		bld.Name = "HTraceTest"
 	}
@@ -90,7 +92,8 @@
 	}
 	bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
 		strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
-	bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server
+	bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0"  // use a random port for the REST server
+	bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the HRPC server
 	bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE"
 	cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
 	cnf, err := cnfBld.Build()
@@ -123,6 +126,11 @@
 	if err != nil {
 		return nil, err
 	}
+	hsv, err = CreateHrpcServer(cnf, store)
+	if err != nil {
+		return nil, err
+	}
+
 	lg.Infof("Created MiniHTraced %s\n", bld.Name)
 	return &MiniHTraced{
 		Name:                bld.Name,
@@ -130,6 +138,7 @@
 		DataDirs:            bld.DataDirs,
 		Store:               store,
 		Rsv:                 rsv,
+		Hsv:                 hsv,
 		Lg:                  lg,
 		KeepDataDirsOnClose: bld.KeepDataDirsOnClose,
 	}, nil
@@ -137,7 +146,8 @@
 
 // Return a Config object that clients can use to connect to this MiniHTraceD.
 func (ht *MiniHTraced) ClientConf() *conf.Config {
-	return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String())
+	return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+		conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
 }
 
 func (ht *MiniHTraced) Close() {