HTRACE-150. htraced: add HRPC endpoint for writeSpans (cmccabe)
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go b/htrace-core/src/go/src/org/apache/htrace/client/client.go
index 5e594d9..44e2f69 100644
--- a/htrace-core/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go
@@ -35,13 +35,24 @@
// TODO: fancier APIs for streaming spans in the background, optimize TCP stuff
func NewClient(cnf *conf.Config) (*Client, error) {
- hcl := Client{restAddr: cnf.Get(conf.HTRACE_WEB_ADDRESS)}
+ hcl := Client{}
+ hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
+ if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+ var err error
+ hcl.hcr, err = newHClient(cnf)
+ if err != nil {
+ return nil, err
+ }
+ }
return &hcl, nil
}
type Client struct {
// REST address of the htraced server.
restAddr string
+
+ // The HRPC client, or null if it is not enabled.
+ hcr *hClient
}
// Get the htraced server information.
@@ -77,12 +88,42 @@
return &span, nil
}
-func (hcl *Client) WriteSpan(span *common.Span) error {
- buf, err := json.Marshal(span)
- if err != nil {
- return err
+func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
+ if hcl.hcr != nil {
+ return hcl.hcr.writeSpans(req)
+ } else {
+ return hcl.writeSpansHttp(req)
}
- _, _, err = hcl.makeRestRequest("POST", "writeSpans", bytes.NewReader(buf))
+}
+
+func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
+ var w bytes.Buffer
+ var err error
+ for i := range req.Spans {
+ var buf []byte
+ buf, err = json.Marshal(req.Spans[i])
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error serializing span: %s",
+ err.Error()))
+ }
+ _, err = w.Write(buf)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error writing span: %s",
+ err.Error()))
+ }
+ _, err = w.Write([]byte{'\n'})
+ //err = io.WriteString(&w, "\n")
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error writing: %s",
+ err.Error()))
+ }
+ }
+ customHeaders := make(map[string]string)
+ if req.DefaultPid != "" {
+ customHeaders["htrace-pid"] = req.DefaultPid
+ }
+ _, _, err = hcl.makeRestRequest("POST", "writeSpans",
+ &w, customHeaders)
if err != nil {
return err
}
@@ -90,7 +131,6 @@
}
// Find the child IDs of a given span ID.
-// TODO: add offset as well as limit?
func (hcl *Client) FindChildren(sid common.SpanId, lim int) ([]common.SpanId, error) {
buf, _, err := hcl.makeGetRequest(fmt.Sprintf("span/%016x/children?lim=%d",
uint64(sid), lim))
@@ -126,17 +166,24 @@
return spans, nil
}
+var EMPTY = make(map[string]string)
+
func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
- return hcl.makeRestRequest("GET", reqName, nil)
+ return hcl.makeRestRequest("GET", reqName, nil, EMPTY)
}
// Make a general JSON REST request.
// Returns the request body, the response code, and the error.
// Note: if the response code is non-zero, the error will also be non-zero.
-func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader) ([]byte, int, error) {
- url := fmt.Sprintf("http://%s/%s", hcl.restAddr, reqName)
+func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader,
+ customHeaders map[string]string) ([]byte, int, error) {
+ url := fmt.Sprintf("http://%s/%s",
+ hcl.restAddr, reqName)
req, err := http.NewRequest(reqType, url, reqBody)
req.Header.Set("Content-Type", "application/json")
+ for k, v := range customHeaders {
+ req.Header.Set(k, v)
+ }
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
@@ -187,3 +234,11 @@
searchId = spans[len(spans)-1].Id + 1
}
}
+
+func (hcl *Client) Close() {
+ if hcl.hcr != nil {
+ hcl.hcr.Close()
+ }
+ hcl.restAddr = ""
+ hcl.hcr = nil
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/hclient.go b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go
new file mode 100644
index 0000000..1730c02
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/rpc"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+)
+
+type hClient struct {
+ rpcClient *rpc.Client
+}
+
+type HrpcClientCodec struct {
+ rwc io.ReadWriteCloser
+ length uint32
+}
+
+func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error {
+ methodId := common.HrpcMethodNameToId(req.ServiceMethod)
+ if methodId == common.METHOD_ID_NONE {
+ return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s",
+ req.ServiceMethod))
+ }
+ buf, err := json.Marshal(msg)
+ if err != nil {
+ return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+ "message as JSON: %s", err.Error()))
+ }
+ if len(buf) > common.MAX_HRPC_BODY_LENGTH {
+ return errors.New(fmt.Sprintf("HrpcClientCodec: message body is %d "+
+ "bytes, but the maximum message size is %d bytes.",
+ len(buf), common.MAX_HRPC_BODY_LENGTH))
+ }
+ hdr := common.HrpcRequestHeader{
+ Magic: common.HRPC_MAGIC,
+ MethodId: methodId,
+ Seq: req.Seq,
+ Length: uint32(len(buf)),
+ }
+ err = binary.Write(cdc.rwc, binary.BigEndian, &hdr)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error writing header bytes: %s",
+ err.Error()))
+ }
+ _, err = cdc.rwc.Write(buf)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error writing body bytes: %s",
+ err.Error()))
+ }
+ return nil
+}
+
+func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error {
+ hdr := common.HrpcResponseHeader{}
+ err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error reading response header "+
+ "bytes: %s", err.Error()))
+ }
+ resp.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+ if resp.ServiceMethod == "" {
+ return errors.New(fmt.Sprintf("Error reading response header: "+
+ "invalid method ID %d.", hdr.MethodId))
+ }
+ resp.Seq = hdr.Seq
+ if hdr.ErrLength > 0 {
+ if hdr.ErrLength > common.MAX_HRPC_ERROR_LENGTH {
+ return errors.New(fmt.Sprintf("Error reading response header: "+
+ "error message was %d bytes long, but "+
+ "MAX_HRPC_ERROR_LENGTH is %d.",
+ hdr.ErrLength, common.MAX_HRPC_ERROR_LENGTH))
+ }
+ buf := make([]byte, hdr.ErrLength)
+ var nread int
+ nread, err = cdc.rwc.Read(buf)
+ if uint32(nread) != hdr.ErrLength {
+ return errors.New(fmt.Sprintf("Error reading response header: "+
+ "failed to read %d bytes of error message.", nread))
+ }
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error reading response header: "+
+ "failed to read %d bytes of error message: %s",
+ nread, err.Error()))
+ }
+ resp.Error = string(buf)
+ } else {
+ resp.Error = ""
+ }
+ cdc.length = hdr.Length
+ return nil
+}
+
+func (cdc *HrpcClientCodec) ReadResponseBody(body interface{}) error {
+ dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+ err := dec.Decode(body)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to read response body: %s",
+ err.Error()))
+ }
+ return nil
+}
+
+func (cdc *HrpcClientCodec) Close() error {
+ return cdc.rwc.Close()
+}
+
+func newHClient(cnf *conf.Config) (*hClient, error) {
+ hcr := hClient{}
+ addr := cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+ conn, err := net.Dial("tcp", addr)
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+
+ "at %s: %s", addr, err.Error()))
+ }
+ hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
+ return &hcr, nil
+}
+
+func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error {
+ resp := common.WriteSpansResp{}
+ return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp)
+}
+
+func (hcr *hClient) Close() {
+ hcr.rpcClient.Close()
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/rpc.go b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go
new file mode 100644
index 0000000..cdf7e08
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+// The 4-byte magic number which is sent first in the HRPC header
+const HRPC_MAGIC = 0x48545243
+
+// Method ID codes. Do not reorder these.
+const (
+ METHOD_ID_NONE = 0
+ METHOD_ID_WRITE_SPANS = iota
+)
+
+const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans"
+
+// Maximum length of the error message passed in an HRPC response
+const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024
+
+// Maximum length of HRPC message body
+const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
+
+// A request to write spans to htraced.
+type WriteSpansReq struct {
+ DefaultPid string
+ Spans []*Span
+}
+
+// A response to a WriteSpansReq
+type WriteSpansResp struct {
+}
+
+// The header which is sent over the wire for HRPC
+type HrpcRequestHeader struct {
+ Magic uint32
+ MethodId uint32
+ Seq uint64
+ Length uint32
+}
+
+// The response which is sent over the wire for HRPC
+type HrpcResponseHeader struct {
+ Seq uint64
+ MethodId uint32
+ ErrLength uint32
+ Length uint32
+}
+
+func HrpcMethodIdToMethodName(id uint32) string {
+ switch id {
+ case METHOD_ID_WRITE_SPANS:
+ return METHOD_NAME_WRITE_SPANS
+ default:
+ return ""
+ }
+}
+
+func HrpcMethodNameToId(name string) uint32 {
+ switch name {
+ case METHOD_NAME_WRITE_SPANS:
+ return METHOD_ID_WRITE_SPANS
+ default:
+ return METHOD_ID_NONE
+ }
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
index ba63f2d..ccb09e0 100644
--- a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
@@ -47,6 +47,12 @@
// The default port for the Htrace web address.
const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9095
+// The web address to start the REST server on.
+const HTRACE_HRPC_ADDRESS = "hrpc.address"
+
+// The default port for the Htrace HRPC address.
+const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075
+
// The directories to put the data store into. Separated by PATH_LIST_SEP.
const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories"
@@ -69,7 +75,8 @@
// Default values for HTrace configuration keys.
var DEFAULTS = map[string]string{
- HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
+ HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
+ HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT),
HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" +
PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2",
HTRACE_DATA_STORE_CLEAR: "false",
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
index 0317237..38cdb58 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
@@ -21,6 +21,7 @@
import (
"bufio"
+ "bytes"
"encoding/json"
"errors"
"fmt"
@@ -203,8 +204,17 @@
return EXIT_FAILURE
}
defer file.Close()
- in := bufio.NewReader(file)
- dec := json.NewDecoder(in)
+ return doLoadSpans(hcl, bufio.NewReader(file))
+}
+
+func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
+ return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
+}
+
+func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
+ dec := json.NewDecoder(reader)
+ spans := make([]*common.Span, 0, 32)
+ var err error
for {
var span common.Span
if err = dec.Decode(&span); err != nil {
@@ -214,26 +224,20 @@
fmt.Printf("Failed to decode JSON: %s\n", err.Error())
return EXIT_FAILURE
}
- if *verbose {
- fmt.Printf("wrote %s\n", span.ToJson())
- }
- if err = hcl.WriteSpan(&span); err != nil {
- fmt.Println(err.Error())
- return EXIT_FAILURE
- }
+ spans = append(spans, &span)
}
- return EXIT_SUCCESS
-}
-
-func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
- spanBytes := []byte(spanJson)
- var span common.Span
- err := json.Unmarshal(spanBytes, &span)
- if err != nil {
- fmt.Printf("Error parsing provided JSON: %s\n", err.Error())
- return EXIT_FAILURE
+ if *verbose {
+ fmt.Printf("Writing ")
+ prefix := ""
+ for i := range spans {
+ fmt.Printf("%s%s", prefix, spans[i].ToJson())
+ prefix = ", "
+ }
+ fmt.Printf("\n")
}
- err = hcl.WriteSpan(&span)
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: spans,
+ })
if err != nil {
fmt.Println(err.Error())
return EXIT_FAILURE
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
index f9e66ce..218c1c8 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
@@ -79,10 +79,12 @@
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
// Write half of the spans to htraced via the client.
- for i := 0; i < NUM_TEST_SPANS/2; i++ {
- if err := hcl.WriteSpan(allSpans[i]); err != nil {
- t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error())
- }
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans[0 : NUM_TEST_SPANS/2],
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
+ err.Error())
}
// Look up the first half of the spans. They should be found.
@@ -165,11 +167,11 @@
NUM_TEST_SPANS := 100
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
sort.Sort(allSpans)
- for i := range allSpans {
- err = hcl.WriteSpan(allSpans[i])
- if err != nil {
- t.Fatalf("failed to write span %d: %s", i, err.Error())
- }
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans,
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
out := make(chan *common.Span, 50)
var dumpErr error
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
index 77497c5..79a7c4f 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -374,10 +374,11 @@
// Create some random trace spans.
NUM_TEST_SPANS := 5
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- for i := 0; i < NUM_TEST_SPANS; i++ {
- if err := hcl.WriteSpan(allSpans[i]); err != nil {
- t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error())
- }
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans,
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
// Look up the spans we wrote.
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
new file mode 100644
index 0000000..9696cbc
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/rpc"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+)
+
+// Handles HRPC calls
+type HrpcHandler struct {
+ lg *common.Logger
+ store *dataStore
+}
+
+// The HRPC server
+type HrpcServer struct {
+ *rpc.Server
+ hand *HrpcHandler
+ listener net.Listener
+}
+
+// Codec which encodes HRPC data via JSON
+type HrpcServerCodec struct {
+ rwc io.ReadWriteCloser
+ length uint32
+}
+
+func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
+ hdr := common.HrpcRequestHeader{}
+ err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error reading header bytes: %s", err.Error()))
+ }
+ if hdr.Magic != common.HRPC_MAGIC {
+ return errors.New(fmt.Sprintf("Invalid request header: expected "+
+ "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
+ }
+ if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
+ return errors.New(fmt.Sprintf("Length prefix was too long. Maximum "+
+ "length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length))
+ }
+ req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+ if req.ServiceMethod == "" {
+ return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x",
+ hdr.MethodId))
+ }
+ req.Seq = hdr.Seq
+ cdc.length = hdr.Length
+ return nil
+}
+
+func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+ dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+ err := dec.Decode(body)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to read request body: %s",
+ err.Error()))
+ }
+ return nil
+}
+
+var EMPTY []byte = make([]byte, 0)
+
+func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
+ var err error
+ buf := EMPTY
+ if msg != nil {
+ buf, err = json.Marshal(msg)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to marshal response message: %s",
+ err.Error()))
+ }
+ }
+ hdr := common.HrpcResponseHeader{}
+ hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
+ hdr.Seq = resp.Seq
+ hdr.ErrLength = uint32(len(resp.Error))
+ hdr.Length = uint32(len(buf))
+ writer := bufio.NewWriterSize(cdc.rwc, 256)
+ err = binary.Write(writer, binary.BigEndian, &hdr)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to write response header: %s",
+ err.Error()))
+ }
+ if hdr.ErrLength > 0 {
+ _, err = io.WriteString(writer, resp.Error)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to write error string: %s",
+ err.Error()))
+ }
+ }
+ if hdr.Length > 0 {
+ var length int
+ length, err = writer.Write(buf)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to write response "+
+ "message: %s", err.Error()))
+ }
+ if uint32(length) != hdr.Length {
+ return errors.New(fmt.Sprintf("Failed to write all of response "+
+ "message: %s", err.Error()))
+ }
+ }
+ err = writer.Flush()
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to write the response bytes: "+
+ "%s", err.Error()))
+ }
+ return nil
+}
+
+func (cdc *HrpcServerCodec) Close() error {
+ return cdc.rwc.Close()
+}
+
+func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
+ resp *common.WriteSpansResp) (err error) {
+ hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+
+ "defaultPid = %s\n", len(req.Spans), req.DefaultPid)
+ for i := range req.Spans {
+ span := req.Spans[i]
+ if span.ProcessId == "" {
+ span.ProcessId = req.DefaultPid
+ }
+ hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
+ hand.store.WriteSpan(span)
+ }
+ return nil
+}
+
+func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) {
+ lg := common.NewLogger("hrpc", cnf)
+ hsv := &HrpcServer{
+ Server: rpc.NewServer(),
+ hand: &HrpcHandler{
+ lg: lg,
+ store: store,
+ },
+ }
+ var err error
+ hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
+ if err != nil {
+ return nil, err
+ }
+ hsv.Server.Register(hsv.hand)
+ go hsv.run()
+ lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
+ return hsv, nil
+}
+
+func (hsv *HrpcServer) run() {
+ lg := hsv.hand.lg
+ for {
+ conn, err := hsv.listener.Accept()
+ if err != nil {
+ lg.Errorf("HRPC Accept error: %s\n", err.Error())
+ continue
+ }
+ go hsv.ServeCodec(&HrpcServerCodec{
+ rwc: conn,
+ })
+ }
+}
+
+func (hsv *HrpcServer) Addr() net.Addr {
+ return hsv.listener.Addr()
+}
+
+func (hsv *HrpcServer) Close() {
+ hsv.listener.Close()
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
index c3432b4..64da457 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
@@ -78,12 +78,26 @@
lg.Errorf("Error creating REST server: %s\n", err.Error())
os.Exit(1)
}
+ var hsv *HrpcServer
+ if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+ hsv, err = CreateHrpcServer(cnf, store)
+ if err != nil {
+ lg.Errorf("Error creating HRPC server: %s\n", err.Error())
+ os.Exit(1)
+ }
+ } else {
+ lg.Infof("Not starting HRPC server because no value was given for %s.\n",
+ conf.HTRACE_HRPC_ADDRESS)
+ }
naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
if naddr != "" {
notif := StartupNotification{
HttpAddr: rsv.Addr().String(),
ProcessId: os.Getpid(),
}
+ if hsv != nil {
+ notif.HrpcAddr = hsv.Addr().String()
+ }
err = sendStartupNotification(naddr, ¬if)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
@@ -100,6 +114,7 @@
// Used by unit tests.
type StartupNotification struct {
HttpAddr string
+ HrpcAddr string
ProcessId int
}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
index be7e284..a54f2cb 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -62,6 +62,7 @@
DataDirs []string
Store *dataStore
Rsv *RestServer
+ Hsv *HrpcServer
Lg *common.Logger
KeepDataDirsOnClose bool
}
@@ -70,6 +71,7 @@
var err error
var store *dataStore
var rsv *RestServer
+ var hsv *HrpcServer
if bld.Name == "" {
bld.Name = "HTraceTest"
}
@@ -90,7 +92,8 @@
}
bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
- bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server
+ bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server
+ bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the HRPC server
bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE"
cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
cnf, err := cnfBld.Build()
@@ -123,6 +126,11 @@
if err != nil {
return nil, err
}
+ hsv, err = CreateHrpcServer(cnf, store)
+ if err != nil {
+ return nil, err
+ }
+
lg.Infof("Created MiniHTraced %s\n", bld.Name)
return &MiniHTraced{
Name: bld.Name,
@@ -130,6 +138,7 @@
DataDirs: bld.DataDirs,
Store: store,
Rsv: rsv,
+ Hsv: hsv,
Lg: lg,
KeepDataDirsOnClose: bld.KeepDataDirsOnClose,
}, nil
@@ -137,7 +146,8 @@
// Return a Config object that clients can use to connect to this MiniHTraceD.
func (ht *MiniHTraced) ClientConf() *conf.Config {
- return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String())
+ return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+ conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
}
func (ht *MiniHTraced) Close() {