blob: ecd13d43e78d9d2bb1e29b5643a851ac03e7befd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/ugorji/go/codec"
"io"
"net"
"net/rpc"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"sync"
"sync/atomic"
"time"
)
const MAX_HRPC_HANDLERS = 32765
// Handles HRPC calls
type HrpcHandler struct {
lg *common.Logger
store *dataStore
}
// The HRPC server
type HrpcServer struct {
*rpc.Server
hand *HrpcHandler
// The listener we are using to accept new connections.
listener net.Listener
// A WaitGroup used to block until the HRPC server has exited.
exited sync.WaitGroup
// A channel containing server codecs to use. This channel is fully
// buffered. The number of entries it initially contains determines how
// many concurrent codecs we will have running at once.
cdcs chan *HrpcServerCodec
// Used to shut down
shutdown chan interface{}
// The I/O timeout to use when reading requests or sending responses. This
// timeout does not apply to the time we spend processing the message.
ioTimeo time.Duration
// A count of all I/O errors that we have encountered since the server
// started. This counts errors like improperly formatted message frames,
// but not errors like properly formatted but invalid messages.
// This count is updated from multiple goroutines via sync/atomic.
ioErrorCount uint64
// The test hooks to use, or nil during normal operation.
testHooks *hrpcTestHooks
}
type hrpcTestHooks struct {
// A callback we make right after calling Accept() but before reading from
// the new connection.
HandleAdmission func()
}
// A codec which encodes HRPC data via JSON. This structure holds the context
// for a particular client connection.
type HrpcServerCodec struct {
lg *common.Logger
// The current connection.
conn net.Conn
// The HrpcServer which this connection is part of.
hsv *HrpcServer
// The message length we read from the header.
length uint32
// The number of messages this connection has handled.
numHandled int
// The buffer for reading requests. These buffers are reused for multiple
// requests to avoid allocating memory.
buf []byte
// Configuration for msgpack decoding
msgpackHandle codec.MsgpackHandle
}
func asJson(val interface{}) string {
js, err := json.Marshal(val)
if err != nil {
return "encoding error: " + err.Error()
}
return string(js)
}
func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
return newIoError(cdc, val, common.WARN)
}
func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
if cdc.lg.LevelEnabled(level) {
cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": "+val+"\n")
}
if level >= common.INFO {
atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
}
return errors.New(val)
}
func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
hdr := common.HrpcRequestHeader{}
if cdc.lg.TraceEnabled() {
cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr())
}
cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
if err != nil {
if err == io.EOF && cdc.numHandled > 0 {
return newIoError(cdc, fmt.Sprintf("Remote closed connection "+
"after writing %d message(s)", cdc.numHandled), common.DEBUG)
}
return newIoError(cdc,
fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN)
}
if cdc.lg.TraceEnabled() {
cdc.lg.Tracef("%s: Read HRPC request header %s\n",
cdc.conn.RemoteAddr(), asJson(&hdr))
}
if hdr.Magic != common.HRPC_MAGIC {
return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+
"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
}
if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long. "+
"Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
hdr.Length))
}
req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
if req.ServiceMethod == "" {
return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x",
hdr.MethodId))
}
req.Seq = hdr.Seq
cdc.length = hdr.Length
return nil
}
func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
remoteAddr := cdc.conn.RemoteAddr().String()
if cdc.lg.TraceEnabled() {
cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
remoteAddr, cdc.length)
}
if cap(cdc.buf) < int(cdc.length) {
var pow uint
for pow=0;(1<<pow) < int(cdc.length);pow++ {
}
cdc.buf = make([]byte, 0, 1<<pow)
}
_, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+
"request body: %s", cdc.length, err.Error()))
}
var zeroTime time.Time
cdc.conn.SetDeadline(zeroTime)
dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
err = dec.Decode(body)
if cdc.lg.TraceEnabled() {
cdc.lg.Tracef("%s: read HRPC message: %s\n",
remoteAddr, asJson(&body))
}
req := body.(*common.WriteSpansReq)
if req == nil {
return nil
}
// We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage
// collector with a ton of trace spans all at once.
startTime := time.Now()
client, _, err := net.SplitHostPort(remoteAddr)
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+
"for %s: %s\n", remoteAddr, err.Error()))
}
hand := cdc.hsv.hand
ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
var span *common.Span
err := dec.Decode(&span)
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d " +
"out of %d: %s\n", spanIdx, req.NumSpans, err.Error()))
}
ing.IngestSpan(span)
}
ing.Close(startTime)
return nil
}
var EMPTY []byte = make([]byte, 0)
func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
var err error
buf := EMPTY
if msg != nil {
w := bytes.NewBuffer(make([]byte, 0, 128))
enc := codec.NewEncoder(w, &cdc.msgpackHandle)
err := enc.Encode(msg)
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
"response message: %s", err.Error()))
}
buf = w.Bytes()
}
hdr := common.HrpcResponseHeader{}
hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
hdr.Seq = resp.Seq
hdr.ErrLength = uint32(len(resp.Error))
hdr.Length = uint32(len(buf))
writer := bufio.NewWriterSize(cdc.conn, 256)
err = binary.Write(writer, binary.LittleEndian, &hdr)
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
"header: %s", err.Error()))
}
if hdr.ErrLength > 0 {
_, err = io.WriteString(writer, resp.Error)
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+
"string: %s", err.Error()))
}
}
if hdr.Length > 0 {
var length int
length, err = writer.Write(buf)
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
"message: %s", err.Error()))
}
if uint32(length) != hdr.Length {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+
"response message: %s", err.Error()))
}
}
err = writer.Flush()
if err != nil {
return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+
"bytes: %s", err.Error()))
}
cdc.numHandled++
return nil
}
func (cdc *HrpcServerCodec) Close() error {
err := cdc.conn.Close()
cdc.conn = nil
cdc.length = 0
cdc.numHandled = 0
cdc.hsv.cdcs <- cdc
return err
}
func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
resp *common.WriteSpansResp) (err error) {
// Nothing to do here; WriteSpans is handled in ReadRequestBody.
return nil
}
func CreateHrpcServer(cnf *conf.Config, store *dataStore,
testHooks *hrpcTestHooks) (*HrpcServer, error) {
lg := common.NewLogger("hrpc", cnf)
numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
if numHandlers < 1 {
lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS)
numHandlers = 1
}
if numHandlers > MAX_HRPC_HANDLERS {
lg.Warnf("%s cannot be more than %d: using %d handlers\n",
conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS)
numHandlers = MAX_HRPC_HANDLERS
}
hsv := &HrpcServer{
Server: rpc.NewServer(),
hand: &HrpcHandler{
lg: lg,
store: store,
},
cdcs: make(chan *HrpcServerCodec, numHandlers),
shutdown: make(chan interface{}),
ioTimeo: time.Millisecond *
time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
testHooks: testHooks,
}
for i := 0; i < numHandlers; i++ {
hsv.cdcs <- &HrpcServerCodec{
lg: lg,
hsv: hsv,
msgpackHandle: codec.MsgpackHandle {
WriteExt: true,
},
}
}
var err error
hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
if err != nil {
return nil, err
}
hsv.Server.Register(hsv.hand)
hsv.exited.Add(1)
go hsv.run()
lg.Infof("Started HRPC server on %s with %d handler routines. "+
"ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
hsv.ioTimeo.String())
return hsv, nil
}
func (hsv *HrpcServer) run() {
lg := hsv.hand.lg
srvAddr := hsv.listener.Addr().String()
defer func() {
lg.Infof("HrpcServer on %s exiting\n", srvAddr)
hsv.exited.Done()
}()
for {
select {
case cdc := <-hsv.cdcs:
conn, err := hsv.listener.Accept()
if err != nil {
lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error())
hsv.cdcs <- cdc // never blocks; there is always sufficient buffer space
continue
}
if lg.TraceEnabled() {
lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr())
}
cdc.conn = conn
cdc.numHandled = 0
if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil {
hsv.testHooks.HandleAdmission()
}
go hsv.ServeCodec(cdc)
case <-hsv.shutdown:
return
}
}
}
func (hsv *HrpcServer) Addr() net.Addr {
return hsv.listener.Addr()
}
func (hsv *HrpcServer) GetNumIoErrors() uint64 {
return atomic.LoadUint64(&hsv.ioErrorCount)
}
func (hsv *HrpcServer) Close() {
close(hsv.shutdown)
hsv.listener.Close()
hsv.exited.Wait()
}