/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package jsonrpc

import (
	"bufio"
	"bytes"
	"context"
	"io"
	"io/ioutil"
	"net"
	"net/http"
	"runtime"
	"runtime/debug"
	"sync"
	"time"
)

import (
	"github.com/opentracing/opentracing-go"
	perrors "github.com/pkg/errors"
)

import (
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/protocol/invocation"
)

var (
	// A value sent as a placeholder for the server's response value when the server
	// receives an invalid request. It is never decoded by the client since the Response
	// contains an error when it is used.
	invalidRequest = struct{}{}
)

const (
	// DefaultMaxSleepTime: max sleep interval in accept
	DefaultMaxSleepTime = 1 * time.Second
	// DefaultHTTPRspBufferSize ...
	DefaultHTTPRspBufferSize = 1024
	// PathPrefix ...
	PathPrefix = byte('/')
)

// Server ...
type Server struct {
	done chan struct{}
	once sync.Once

	sync.RWMutex
	wg      sync.WaitGroup
	timeout time.Duration
}

// NewServer ...
func NewServer() *Server {
	return &Server{
		done: make(chan struct{}),
	}
}

func (s *Server) handlePkg(conn net.Conn) {
	defer func() {
		if r := recover(); r != nil {
			logger.Warnf("connection{local:%v, remote:%v} panic error:%#v, debug stack:%s",
				conn.LocalAddr(), conn.RemoteAddr(), r, string(debug.Stack()))
		}

		conn.Close()
	}()

	setTimeout := func(conn net.Conn, timeout time.Duration) {
		t := time.Time{}
		if timeout > time.Duration(0) {
			t = time.Now().Add(timeout)
		}

		conn.SetDeadline(t)
	}

	sendErrorResp := func(header http.Header, body []byte) error {
		rsp := &http.Response{
			Header:        header,
			StatusCode:    500,
			ProtoMajor:    1,
			ProtoMinor:    1,
			ContentLength: int64(len(body)),
			Body:          ioutil.NopCloser(bytes.NewReader(body)),
		}
		rsp.Header.Del("Content-Type")
		rsp.Header.Del("Content-Length")
		rsp.Header.Del("Timeout")

		rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
		rspBuf.Reset()
		err := rsp.Write(rspBuf)
		if err != nil {
			return perrors.WithStack(err)
		}
		_, err = rspBuf.WriteTo(conn)
		return perrors.WithStack(err)
	}

	for {
		bufReader := bufio.NewReader(conn)
		r, err := http.ReadRequest(bufReader)
		if err != nil {
			logger.Warnf("[ReadRequest] error: %v", err)
			return
		}

		reqBody, err := ioutil.ReadAll(r.Body)
		if err != nil {
			return
		}
		r.Body.Close()

		reqHeader := make(map[string]string)
		for k := range r.Header {
			reqHeader[k] = r.Header.Get(k)
		}
		reqHeader["Path"] = r.URL.Path[1:] // to get service name
		if r.URL.Path[0] != PathPrefix {
			reqHeader["Path"] = r.URL.Path
		}
		reqHeader["HttpMethod"] = r.Method

		httpTimeout := s.timeout
		contentType := reqHeader["Content-Type"]
		if contentType != "application/json" && contentType != "application/json-rpc" {
			setTimeout(conn, httpTimeout)
			r.Header.Set("Content-Type", "text/plain")
			if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil {
				logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s",
					r.Header, perrors.WithStack(err), errRsp)
			}
			return
		}

		ctx := context.Background()

		spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders,
			opentracing.HTTPHeadersCarrier(r.Header))
		if err == nil {
			ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
		}

		if len(reqHeader["Timeout"]) > 0 {
			timeout, err := time.ParseDuration(reqHeader["Timeout"])
			if err == nil {
				httpTimeout = timeout
				var cancel context.CancelFunc
				ctx, cancel = context.WithTimeout(ctx, httpTimeout)
				defer cancel()
			}
			delete(reqHeader, "Timeout")
		}
		setTimeout(conn, httpTimeout)

		if err := serveRequest(ctx, reqHeader, reqBody, conn); err != nil {
			if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil {
				logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s",
					r.Header, perrors.WithStack(err), errRsp)
			}

			logger.Infof("Unexpected error serving request, closing socket: %v", err)
			return
		}
	}
}

func accept(listener net.Listener, fn func(net.Conn)) error {
	var (
		ok       bool
		ne       net.Error
		tmpDelay time.Duration
	)

	for {
		c, err := listener.Accept()
		if err != nil {
			if ne, ok = err.(net.Error); ok && ne.Temporary() {
				if tmpDelay != 0 {
					tmpDelay <<= 1
				} else {
					tmpDelay = 5 * time.Millisecond
				}
				if tmpDelay > DefaultMaxSleepTime {
					tmpDelay = DefaultMaxSleepTime
				}
				logger.Infof("http: Accept error: %v; retrying in %v\n", err, tmpDelay)
				time.Sleep(tmpDelay)
				continue
			}
			return perrors.WithStack(err)
		}

		go func() {
			defer func() {
				if r := recover(); r != nil {
					const size = 64 << 10
					buf := make([]byte, size)
					buf = buf[:runtime.Stack(buf, false)]
					logger.Errorf("http: panic serving %v: %v\n%s", c.RemoteAddr(), r, buf)
					c.Close()
				}
			}()

			fn(c)
		}()
	}
}

// Start ...
func (s *Server) Start(url common.URL) {
	listener, err := net.Listen("tcp", url.Location)
	if err != nil {
		logger.Errorf("jsonrpc server [%s] start failed: %v", url.Path, err)
		return
	}
	logger.Infof("rpc server start to listen on %s", listener.Addr())

	s.wg.Add(1)
	go func() {
		accept(listener, func(conn net.Conn) { s.handlePkg(conn) })
		s.wg.Done()
	}()

	s.wg.Add(1)
	go func() { // Server done goroutine
		var err error
		<-s.done               // step1: block to wait for done channel(wait Server.Stop step2)
		err = listener.Close() // step2: and then close listener
		if err != nil {
			logger.Warnf("listener{addr:%s}.Close() = error{%#v}", listener.Addr(), err)
		}
		s.wg.Done()
	}()
}

// Stop ...
func (s *Server) Stop() {
	s.once.Do(func() {
		close(s.done)
		s.wg.Wait()
	})
}

func serveRequest(ctx context.Context,
	header map[string]string, body []byte, conn net.Conn) error {
	sendErrorResp := func(header map[string]string, body []byte) error {
		rsp := &http.Response{
			Header:        make(http.Header),
			StatusCode:    500,
			ProtoMajor:    1,
			ProtoMinor:    1,
			ContentLength: int64(len(body)),
			Body:          ioutil.NopCloser(bytes.NewReader(body)),
		}
		rsp.Header.Del("Content-Type")
		rsp.Header.Del("Content-Length")
		rsp.Header.Del("Timeout")
		for k, v := range header {
			rsp.Header.Set(k, v)
		}

		rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
		rspBuf.Reset()
		err := rsp.Write(rspBuf)
		if err != nil {
			return perrors.WithStack(err)
		}
		_, err = rspBuf.WriteTo(conn)
		return perrors.WithStack(err)
	}

	sendResp := func(header map[string]string, body []byte) error {
		rsp := &http.Response{
			Header:        make(http.Header),
			StatusCode:    200,
			ProtoMajor:    1,
			ProtoMinor:    1,
			ContentLength: int64(len(body)),
			Body:          ioutil.NopCloser(bytes.NewReader(body)),
		}
		rsp.Header.Del("Content-Type")
		rsp.Header.Del("Content-Length")
		rsp.Header.Del("Timeout")
		for k, v := range header {
			rsp.Header.Set(k, v)
		}

		rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
		rspBuf.Reset()
		err := rsp.Write(rspBuf)
		if err != nil {
			return perrors.WithStack(err)
		}
		_, err = rspBuf.WriteTo(conn)
		return perrors.WithStack(err)
	}

	// read request header
	codec := newServerCodec()
	err := codec.ReadHeader(header, body)
	if err != nil {
		if err == io.EOF || err == io.ErrUnexpectedEOF {
			return perrors.WithStack(err)
		}

		return perrors.New("server cannot decode request: " + err.Error())
	}
	path := header["Path"]
	methodName := codec.req.Method
	if len(path) == 0 || len(methodName) == 0 {
		codec.ReadBody(nil)
		return perrors.New("service/method request ill-formed: " + path + "/" + methodName)
	}

	// read body
	var args []interface{}
	if err = codec.ReadBody(&args); err != nil {
		return perrors.WithStack(err)
	}
	logger.Debugf("args: %v", args)

	// exporter invoke
	exporter, _ := jsonrpcProtocol.ExporterMap().Load(path)
	invoker := exporter.(*JsonrpcExporter).GetInvoker()
	if invoker != nil {
		result := invoker.Invoke(ctx, invocation.NewRPCInvocation(methodName, args, map[string]string{
			constant.PATH_KEY:    path,
			constant.VERSION_KEY: codec.req.Version}))
		if err := result.Error(); err != nil {
			rspStream, err := codec.Write(err.Error(), invalidRequest)
			if err != nil {
				return perrors.WithStack(err)
			}
			if errRsp := sendErrorResp(header, rspStream); errRsp != nil {
				logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s",
					header, err, errRsp)
			}
		} else {
			res := result.Result()
			rspStream, err := codec.Write("", res)
			if err != nil {
				return perrors.WithStack(err)
			}
			if errRsp := sendResp(header, rspStream); errRsp != nil {
				logger.Warnf("Exporter: sendResp(header:%#v, error:%v) = error:%s",
					header, err, errRsp)
			}
		}
	}

	return nil
}
