/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package main

import (
	"bufio"
	"bytes"
	"encoding/binary"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/ugorji/go/codec"
	"io"
	"net"
	"net/rpc"
	"org/apache/htrace/common"
	"org/apache/htrace/conf"
	"sync"
	"sync/atomic"
	"time"
)

const MAX_HRPC_HANDLERS = 32765

// Handles HRPC calls
type HrpcHandler struct {
	lg    *common.Logger
	store *dataStore
}

// The HRPC server
type HrpcServer struct {
	*rpc.Server
	hand *HrpcHandler

	// The listener we are using to accept new connections.
	listener net.Listener

	// A WaitGroup used to block until the HRPC server has exited.
	exited sync.WaitGroup

	// A channel containing server codecs to use.  This channel is fully
	// buffered.  The number of entries it initially contains determines how
	// many concurrent codecs we will have running at once.
	cdcs chan *HrpcServerCodec

	// Used to shut down
	shutdown chan interface{}

	// The I/O timeout to use when reading requests or sending responses.  This
	// timeout does not apply to the time we spend processing the message.
	ioTimeo time.Duration

	// A count of all I/O errors that we have encountered since the server
	// started.  This counts errors like improperly formatted message frames,
	// but not errors like properly formatted but invalid messages.
	// This count is updated from multiple goroutines via sync/atomic.
	ioErrorCount uint64

	// The test hooks to use, or nil during normal operation.
	testHooks *hrpcTestHooks
}

type hrpcTestHooks struct {
	// A callback we make right after calling Accept() but before reading from
	// the new connection.
	HandleAdmission func()
}

// A codec which encodes HRPC data via JSON.  This structure holds the context
// for a particular client connection.
type HrpcServerCodec struct {
	lg *common.Logger

	// The current connection.
	conn net.Conn

	// The HrpcServer which this connection is part of.
	hsv *HrpcServer

	// The message length we read from the header.
	length uint32

	// The number of messages this connection has handled.
	numHandled int

	// The buffer for reading requests.  These buffers are reused for multiple
	// requests to avoid allocating memory.
	buf []byte

	// Configuration for msgpack decoding
	msgpackHandle codec.MsgpackHandle
}

func asJson(val interface{}) string {
	js, err := json.Marshal(val)
	if err != nil {
		return "encoding error: " + err.Error()
	}
	return string(js)
}

func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
	return newIoError(cdc, val, common.WARN)
}

func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
	if cdc.lg.LevelEnabled(level) {
		cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": "+val+"\n")
	}
	if level >= common.INFO {
		atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
	}
	return errors.New(val)
}

func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
	hdr := common.HrpcRequestHeader{}
	if cdc.lg.TraceEnabled() {
		cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr())
	}
	cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
	err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
	if err != nil {
		if err == io.EOF && cdc.numHandled > 0 {
			return newIoError(cdc, fmt.Sprintf("Remote closed connection "+
				"after writing %d message(s)", cdc.numHandled), common.DEBUG)
		}
		return newIoError(cdc,
			fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN)
	}
	if cdc.lg.TraceEnabled() {
		cdc.lg.Tracef("%s: Read HRPC request header %s\n",
			cdc.conn.RemoteAddr(), asJson(&hdr))
	}
	if hdr.Magic != common.HRPC_MAGIC {
		return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+
			"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
	}
	if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
		return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long.  "+
			"Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
			hdr.Length))
	}
	req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
	if req.ServiceMethod == "" {
		return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x",
			hdr.MethodId))
	}
	req.Seq = hdr.Seq
	cdc.length = hdr.Length
	return nil
}

func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
	remoteAddr := cdc.conn.RemoteAddr().String()
	if cdc.lg.TraceEnabled() {
		cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
			remoteAddr, cdc.length)
	}
	if cap(cdc.buf) < int(cdc.length) {
		var pow uint
		for pow=0;(1<<pow) < int(cdc.length);pow++ {
		}
		cdc.buf = make([]byte, 0, 1<<pow)
	}
	_, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
	if err != nil {
		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+
			"request body: %s", cdc.length, err.Error()))
	}
	var zeroTime time.Time
	cdc.conn.SetDeadline(zeroTime)

	dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
	err = dec.Decode(body)
	if cdc.lg.TraceEnabled() {
		cdc.lg.Tracef("%s: read HRPC message: %s\n",
			remoteAddr, asJson(&body))
	}
	req := body.(*common.WriteSpansReq)
	if req == nil {
		return nil
	}
	// We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage
	// collector with a ton of trace spans all at once.
	startTime := time.Now()
	client, _, err := net.SplitHostPort(remoteAddr)
	if err != nil {
		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+
			"for %s: %s\n", remoteAddr, err.Error()))
	}
	hand := cdc.hsv.hand
	ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
	for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
		var span *common.Span
		err := dec.Decode(&span)
		if err != nil {
			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d " +
				"out of %d: %s\n", spanIdx, req.NumSpans, err.Error()))
		}
		ing.IngestSpan(span)
	}
	ing.Close(startTime)
	return nil
}

var EMPTY []byte = make([]byte, 0)

func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
	cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
	var err error
	buf := EMPTY
	if msg != nil {
		w := bytes.NewBuffer(make([]byte, 0, 128))
		enc := codec.NewEncoder(w, &cdc.msgpackHandle)
		err := enc.Encode(msg)
		if err != nil {
			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
				"response message: %s", err.Error()))
		}
		buf = w.Bytes()
	}
	hdr := common.HrpcResponseHeader{}
	hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
	hdr.Seq = resp.Seq
	hdr.ErrLength = uint32(len(resp.Error))
	hdr.Length = uint32(len(buf))
	writer := bufio.NewWriterSize(cdc.conn, 256)
	err = binary.Write(writer, binary.LittleEndian, &hdr)
	if err != nil {
		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
			"header: %s", err.Error()))
	}
	if hdr.ErrLength > 0 {
		_, err = io.WriteString(writer, resp.Error)
		if err != nil {
			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+
				"string: %s", err.Error()))
		}
	}
	if hdr.Length > 0 {
		var length int
		length, err = writer.Write(buf)
		if err != nil {
			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
				"message: %s", err.Error()))
		}
		if uint32(length) != hdr.Length {
			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+
				"response message: %s", err.Error()))
		}
	}
	err = writer.Flush()
	if err != nil {
		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+
			"bytes: %s", err.Error()))
	}
	cdc.numHandled++
	return nil
}

func (cdc *HrpcServerCodec) Close() error {
	err := cdc.conn.Close()
	cdc.conn = nil
	cdc.length = 0
	cdc.numHandled = 0
	cdc.hsv.cdcs <- cdc
	return err
}

func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
		resp *common.WriteSpansResp) (err error) {
	// Nothing to do here; WriteSpans is handled in ReadRequestBody.
	return nil
}

func CreateHrpcServer(cnf *conf.Config, store *dataStore,
	testHooks *hrpcTestHooks) (*HrpcServer, error) {
	lg := common.NewLogger("hrpc", cnf)
	numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
	if numHandlers < 1 {
		lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS)
		numHandlers = 1
	}
	if numHandlers > MAX_HRPC_HANDLERS {
		lg.Warnf("%s cannot be more than %d: using %d handlers\n",
			conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS)
		numHandlers = MAX_HRPC_HANDLERS
	}
	hsv := &HrpcServer{
		Server: rpc.NewServer(),
		hand: &HrpcHandler{
			lg:    lg,
			store: store,
		},
		cdcs:     make(chan *HrpcServerCodec, numHandlers),
		shutdown: make(chan interface{}),
		ioTimeo: time.Millisecond *
			time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
		testHooks: testHooks,
	}
	for i := 0; i < numHandlers; i++ {
		hsv.cdcs <- &HrpcServerCodec{
			lg:  lg,
			hsv: hsv,
			msgpackHandle: codec.MsgpackHandle {
				WriteExt: true,
			},
		}
	}
	var err error
	hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
	if err != nil {
		return nil, err
	}
	hsv.Server.Register(hsv.hand)
	hsv.exited.Add(1)
	go hsv.run()
	lg.Infof("Started HRPC server on %s with %d handler routines. "+
		"ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
		hsv.ioTimeo.String())
	return hsv, nil
}

func (hsv *HrpcServer) run() {
	lg := hsv.hand.lg
	srvAddr := hsv.listener.Addr().String()
	defer func() {
		lg.Infof("HrpcServer on %s exiting\n", srvAddr)
		hsv.exited.Done()
	}()
	for {
		select {
		case cdc := <-hsv.cdcs:
			conn, err := hsv.listener.Accept()
			if err != nil {
				lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error())
				hsv.cdcs <- cdc // never blocks; there is always sufficient buffer space
				continue
			}
			if lg.TraceEnabled() {
				lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr())
			}
			cdc.conn = conn
			cdc.numHandled = 0
			if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil {
				hsv.testHooks.HandleAdmission()
			}
			go hsv.ServeCodec(cdc)
		case <-hsv.shutdown:
			return
		}
	}
}

func (hsv *HrpcServer) Addr() net.Addr {
	return hsv.listener.Addr()
}

func (hsv *HrpcServer) GetNumIoErrors() uint64 {
	return atomic.LoadUint64(&hsv.ioErrorCount)
}

func (hsv *HrpcServer) Close() {
	close(hsv.shutdown)
	hsv.listener.Close()
	hsv.exited.Wait()
}
