blob: 35bff871bcbfc682fe576aedf279fe5a95947b37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package http
import (
"context"
"errors"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/apache/servicecomb-mesher/proxy/common"
"github.com/apache/servicecomb-mesher/proxy/pkg/egress"
"github.com/apache/servicecomb-mesher/proxy/pkg/metrics"
"github.com/apache/servicecomb-mesher/proxy/protocol"
"github.com/apache/servicecomb-mesher/proxy/resolver"
"github.com/apache/servicecomb-mesher/proxy/util"
"github.com/go-chassis/go-chassis/client/rest"
chassisCommon "github.com/go-chassis/go-chassis/core/common"
"github.com/go-chassis/go-chassis/core/fault"
"github.com/go-chassis/go-chassis/core/handler"
"github.com/go-chassis/go-chassis/core/invocation"
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/core/loadbalancer"
"github.com/go-chassis/go-chassis/pkg/runtime"
"github.com/go-chassis/go-chassis/pkg/string"
"github.com/go-chassis/go-chassis/pkg/util/tags"
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
"github.com/go-mesh/openlogging"
)
var dr = resolver.GetDestinationResolver("http")
var sr = resolver.GetSourceResolver()
//constants for headers
const (
XForwardedPort = "X-Forwarded-Port"
XForwardedHost = "X-Forwarded-Host"
)
var (
//ErrRestFaultAbort is a variable of type error
ErrRestFaultAbort = errors.New("injecting abort")
//ErrRestFault is a variable of type error
ErrRestFault = errors.New("injecting abort and delay")
//ErrNilResponse is a variable of type error
ErrNilResponse = errors.New("http response is nil")
)
func preHandler(req *http.Request) *invocation.Invocation {
inv := &invocation.Invocation{}
inv.Args = req
inv.Reply = rest.NewResponse()
inv.Protocol = "rest"
inv.URLPathFormat = req.URL.Path
return inv
}
func consumerPreHandler(req *http.Request) *invocation.Invocation {
inv := preHandler(req)
inv.SourceServiceID = runtime.ServiceID
req.Header.Set(chassisCommon.HeaderSourceName, runtime.ServiceName)
inv.Ctx = context.TODO()
return inv
}
func providerPreHandler(req *http.Request) *invocation.Invocation {
inv := preHandler(req)
inv.MicroServiceName = runtime.ServiceName
inv.RouteTags = utiltags.NewDefaultTag(runtime.Version, runtime.App)
inv.SourceMicroService = req.Header.Get(chassisCommon.HeaderSourceName)
inv.Ctx = context.TODO()
return inv
}
//LocalRequestHandler is for request from local
func LocalRequestHandler(w http.ResponseWriter, r *http.Request) {
prepareRequest(r)
inv := consumerPreHandler(r)
remoteIP := stringutil.SplitFirstSep(r.RemoteAddr, ":")
var err error
h := make(map[string]string)
for k := range r.Header {
h[k] = r.Header.Get(k)
}
//Resolve Destination
destination, port, err := dr.Resolve(remoteIP, r.Host, r.URL.String(), h)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadRequest, err)
return
}
inv.MicroServiceName = destination
if port != "" {
h[XForwardedPort] = port
}
//transfer header into ctx
inv.Ctx = context.WithValue(inv.Ctx, chassisCommon.ContextHeaderKey{}, h)
var c *handler.Chain
ok, egressRule := egress.Match(inv.MicroServiceName)
if ok {
var targetPort int32 = 80
for _, port := range egressRule.Ports {
if strings.EqualFold(port.Protocol, common.HTTPProtocol) {
targetPort = port.Port
break
}
}
inv.Endpoint = inv.MicroServiceName + ":" + strconv.Itoa(int(targetPort))
c, err = handler.GetChain(common.ConsumerEgress, common.ChainConsumerEgress)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, err)
openlogging.Error("Get chain failed" + err.Error())
return
}
} else {
c, err = handler.GetChain(chassisCommon.Consumer, common.ChainConsumerOutgoing)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, err)
openlogging.Error("Get chain failed: " + err.Error())
return
}
}
defer func(begin time.Time) {
timeTaken := time.Since(begin).Seconds()
serviceLabelValues := map[string]string{metrics.LServiceName: inv.MicroServiceName, metrics.LApp: inv.RouteTags.AppID(), metrics.LVersion: inv.RouteTags.Version()}
metrics.RecordLatency(serviceLabelValues, timeTaken)
}(time.Now())
var invRsp *invocation.Response
c.Next(inv, func(ir *invocation.Response) {
//Send the request to the destination
invRsp = ir
})
resp, err := handleRequest(w, inv, invRsp)
if err != nil {
openlogging.Error("handle request failed: " + err.Error())
return
}
RecordStatus(inv, resp.StatusCode)
}
//RemoteRequestHandler is for request from remote
func RemoteRequestHandler(w http.ResponseWriter, r *http.Request) {
prepareRequest(r)
inv := providerPreHandler(r)
if inv.SourceMicroService == "" {
source := stringutil.SplitFirstSep(r.RemoteAddr, ":")
//Resolve Source
si := sr.Resolve(source)
if si != nil {
inv.SourceMicroService = si.Name
}
}
h := make(map[string]string)
for k := range r.Header {
h[k] = r.Header.Get(k)
}
//transfer header into ctx
inv.Ctx = context.WithValue(inv.Ctx, chassisCommon.ContextHeaderKey{}, h)
c, err := handler.GetChain(chassisCommon.Provider, common.ChainProviderIncoming)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, err)
lager.Logger.Error("Get chain failed: " + err.Error())
return
}
if err = util.SetLocalServiceAddress(inv, r.Header.Get("X-Forwarded-Port")); err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway,
err)
}
if r.Header.Get(XForwardedHost) == "" {
r.Header.Set(XForwardedHost, r.Host)
}
var invRsp *invocation.Response
c.Next(inv, func(ir *invocation.Response) {
//Send the request to the destination
invRsp = ir
})
if _, err = handleRequest(w, inv, invRsp); err != nil {
lager.Logger.Error("Handle request failed: " + err.Error())
}
}
func copyChassisResp2HttpResp(w http.ResponseWriter, resp *http.Response) {
postProcessResponse(resp)
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
if resp == nil {
openlogging.GetLogger().Warn("response is nil because of unknown reason")
return
}
_, err := io.Copy(w, resp.Body)
if err != nil {
openlogging.Error("can not copy: " + err.Error())
}
err = resp.Body.Close()
if err != nil {
openlogging.Error("Http response close error: " + err.Error())
}
}
func handleRequest(w http.ResponseWriter, inv *invocation.Invocation, ir *invocation.Response) (*http.Response, error) {
if ir != nil {
if ir.Err != nil {
//handler only mesher errors, ignore http response err
switch ir.Err.(type) {
case hystrix.FallbackNullError:
handleErrorResponse(inv, w, http.StatusOK, nil)
case loadbalancer.LBError:
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
case hystrix.CircuitError:
handleErrorResponse(inv, w, http.StatusServiceUnavailable, ir.Err)
case fault.Fault:
handleErrorResponse(inv, w, ir.Status, ir.Err)
default: //for other error, check response and response body, if there is body, just transparent response
resp, ok := inv.Reply.(*http.Response)
if ok { // return raw transport error
if resp != nil {
if resp.Body == nil {
//resp.Resp can be nil, for example network error, must handle it
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
return nil, ir.Err
}
copyChassisResp2HttpResp(w, resp)
RecordStatus(inv, resp.StatusCode)
} else {
// unknown error, resp is nil, e.g. connection refused
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
}
} else { // unknown err in handler chain
handleErrorResponse(inv, w, http.StatusInternalServerError, ir.Err)
}
}
return nil, ir.Err
}
if inv.Endpoint == "" {
handleErrorResponse(inv, w, http.StatusBadGateway, protocol.ErrUnknown)
return nil, protocol.ErrUnknown
}
if ir.Result == nil {
if ir.Err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
return nil, ir.Err
}
handleErrorResponse(inv, w, http.StatusBadGateway, ErrNilResponse)
return nil, protocol.ErrUnknown
}
resp, ok := ir.Result.(*http.Response)
if !ok {
err := errors.New("invocationResponse result is not type *rest.Response")
handleErrorResponse(inv, w, http.StatusBadGateway, err)
return nil, err
}
//transparent proxy
copyChassisResp2HttpResp(w, resp)
return resp, nil
} else {
handleErrorResponse(inv, w, http.StatusBadGateway, protocol.ErrUnExpectedHandlerChainResponse)
return nil, protocol.ErrUnExpectedHandlerChainResponse
}
}
//handleErrorResponse return proxy errors, not err from real service
func handleErrorResponse(inv *invocation.Invocation, w http.ResponseWriter, statusCode int, err error) {
w.WriteHeader(statusCode)
if err != nil {
_, err := w.Write([]byte(err.Error()))
if err != nil {
openlogging.Error("can not write err to client: " + err.Error())
}
}
RecordStatus(inv, statusCode)
}
//RecordStatus record an operation status
func RecordStatus(inv *invocation.Invocation, statusCode int) {
LabelValues := map[string]string{metrics.LServiceName: inv.MicroServiceName, metrics.LApp: inv.RouteTags.AppID(), metrics.LVersion: inv.RouteTags.Version()}
metrics.RecordStatus(LabelValues, statusCode)
}
func copyHeader(dst, src http.Header) {
for k, vs := range src {
for _, v := range vs {
dst.Add(k, v)
}
}
}
func prepareRequest(req *http.Request) {
if req.ContentLength == 0 {
req.Body = nil
}
req.RequestURI = "" // client is forbidden to set RequestURI
req.Close = false
req.Header.Del("Connection")
}
func postProcessResponse(rsp *http.Response) {
rsp.Header.Del("Connection")
}