blob: 4b3a3ada5029966d0b25a712897d082d49a7e688 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package http
import (
chassisCommon ""
var dr = resolver.GetDestinationResolver("http")
var sr = resolver.GetSourceResolver()
//constants for headers
const (
XForwardedPort = "X-Forwarded-Port"
XForwardedHost = "X-Forwarded-Host"
var (
//ErrRestFaultAbort is a variable of type error
ErrRestFaultAbort = errors.New("injecting abort")
//ErrRestFault is a variable of type error
ErrRestFault = errors.New("injecting abort and delay")
//ErrNilResponse is a variable of type error
ErrNilResponse = errors.New("http response is nil")
func preHandler(req *http.Request) *invocation.Invocation {
inv := &invocation.Invocation{}
inv.Args = req
inv.Reply = rest.NewResponse()
inv.Protocol = "rest"
inv.URLPathFormat = req.URL.Path
return inv
func consumerPreHandler(req *http.Request) *invocation.Invocation {
inv := preHandler(req)
inv.SourceServiceID = runtime.ServiceID
req.Header.Set(chassisCommon.HeaderSourceName, runtime.ServiceName)
inv.Ctx = context.TODO()
return inv
func providerPreHandler(req *http.Request) *invocation.Invocation {
inv := preHandler(req)
inv.MicroServiceName = runtime.ServiceName
inv.RouteTags = utiltags.NewDefaultTag(runtime.Version, runtime.App)
inv.SourceMicroService = req.Header.Get(chassisCommon.HeaderSourceName)
inv.Ctx = context.TODO()
return inv
//LocalRequestHandler is for request from local
func LocalRequestHandler(w http.ResponseWriter, r *http.Request) {
inv := consumerPreHandler(r)
remoteIP := stringutil.SplitFirstSep(r.RemoteAddr, ":")
var err error
h := make(map[string]string)
for k := range r.Header {
h[k] = r.Header.Get(k)
//Resolve Destination
destination, port, err := dr.Resolve(remoteIP, r.Host, r.URL.String(), h)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadRequest, err)
inv.MicroServiceName = destination
if port != "" {
h[XForwardedPort] = port
//transfer header into ctx
inv.Ctx = context.WithValue(inv.Ctx, chassisCommon.ContextHeaderKey{}, h)
var c *handler.Chain
ok, egressRule := egress.Match(inv.MicroServiceName)
if ok {
var targetPort int32 = 80
for _, port := range egressRule.Ports {
if strings.EqualFold(port.Protocol, common.HTTPProtocol) {
targetPort = port.Port
inv.Endpoint = inv.MicroServiceName + ":" + strconv.Itoa(int(targetPort))
c, err = handler.GetChain(common.ConsumerEgress, common.ChainConsumerEgress)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, err)
openlogging.Error("Get chain failed" + err.Error())
} else {
c, err = handler.GetChain(chassisCommon.Consumer, common.ChainConsumerOutgoing)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, err)
openlogging.Error("Get chain failed: " + err.Error())
defer func(begin time.Time) {
timeTaken := time.Since(begin).Seconds()
serviceLabelValues := map[string]string{metrics.LServiceName: inv.MicroServiceName, metrics.LApp: inv.RouteTags.AppID(), metrics.LVersion: inv.RouteTags.Version()}
metrics.RecordLatency(serviceLabelValues, timeTaken)
var invRsp *invocation.Response
c.Next(inv, func(ir *invocation.Response) error {
//Send the request to the destination
invRsp = ir
if invRsp != nil {
return invRsp.Err
return nil
resp, err := handleRequest(w, inv, invRsp)
if err != nil {
openlogging.Error("handle request failed: " + err.Error())
RecordStatus(inv, resp.StatusCode)
//RemoteRequestHandler is for request from remote
func RemoteRequestHandler(w http.ResponseWriter, r *http.Request) {
inv := providerPreHandler(r)
if inv.SourceMicroService == "" {
source := stringutil.SplitFirstSep(r.RemoteAddr, ":")
//Resolve Source
si := sr.Resolve(source)
if si != nil {
inv.SourceMicroService = si.Name
h := make(map[string]string)
for k := range r.Header {
h[k] = r.Header.Get(k)
//transfer header into ctx
inv.Ctx = context.WithValue(inv.Ctx, chassisCommon.ContextHeaderKey{}, h)
c, err := handler.GetChain(chassisCommon.Provider, common.ChainProviderIncoming)
if err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, err)
lager.Logger.Error("Get chain failed: " + err.Error())
if err = util.SetLocalServiceAddress(inv, r.Header.Get("X-Forwarded-Port")); err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway,
if r.Header.Get(XForwardedHost) == "" {
r.Header.Set(XForwardedHost, r.Host)
var invRsp *invocation.Response
c.Next(inv, func(ir *invocation.Response) error {
//Send the request to the destination
invRsp = ir
if invRsp != nil {
return invRsp.Err
return nil
if _, err = handleRequest(w, inv, invRsp); err != nil {
lager.Logger.Error("Handle request failed: " + err.Error())
func copyChassisResp2HttpResp(w http.ResponseWriter, resp *http.Response) {
copyHeader(w.Header(), resp.Header)
if resp == nil {
openlogging.GetLogger().Warn("response is nil because of unknown reason")
_, err := io.Copy(w, resp.Body)
if err != nil {
openlogging.Error("can not copy: " + err.Error())
err = resp.Body.Close()
if err != nil {
openlogging.Error("Http response close error: " + err.Error())
func handleRequest(w http.ResponseWriter, inv *invocation.Invocation, ir *invocation.Response) (*http.Response, error) {
if ir != nil {
if ir.Err != nil {
//handler only mesher errors, ignore http response err
switch ir.Err.(type) {
case hystrix.FallbackNullError:
handleErrorResponse(inv, w, http.StatusOK, nil)
case loadbalancer.LBError:
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
case hystrix.CircuitError:
handleErrorResponse(inv, w, http.StatusServiceUnavailable, ir.Err)
case fault.Fault:
handleErrorResponse(inv, w, ir.Status, ir.Err)
default: //for other error, check response and response body, if there is body, just transparent response
resp, ok := inv.Reply.(*http.Response)
if ok { // return raw transport error
if resp != nil {
if resp.Body == nil {
//resp.Resp can be nil, for example network error, must handle it
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
return nil, ir.Err
copyChassisResp2HttpResp(w, resp)
RecordStatus(inv, resp.StatusCode)
} else {
// unknown error, resp is nil, e.g. connection refused
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
} else { // unknown err in handler chain
handleErrorResponse(inv, w, http.StatusInternalServerError, ir.Err)
return nil, ir.Err
if inv.Endpoint == "" {
handleErrorResponse(inv, w, http.StatusBadGateway, protocol.ErrUnknown)
return nil, protocol.ErrUnknown
if ir.Result == nil {
if ir.Err != nil {
handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err)
return nil, ir.Err
handleErrorResponse(inv, w, http.StatusBadGateway, ErrNilResponse)
return nil, protocol.ErrUnknown
resp, ok := ir.Result.(*http.Response)
if !ok {
err := errors.New("invocationResponse result is not type *rest.Response")
handleErrorResponse(inv, w, http.StatusBadGateway, err)
return nil, err
//transparent proxy
copyChassisResp2HttpResp(w, resp)
return resp, nil
} else {
handleErrorResponse(inv, w, http.StatusBadGateway, protocol.ErrUnExpectedHandlerChainResponse)
return nil, protocol.ErrUnExpectedHandlerChainResponse
//handleErrorResponse return proxy errors, not err from real service
func handleErrorResponse(inv *invocation.Invocation, w http.ResponseWriter, statusCode int, err error) {
if err != nil {
_, err := w.Write([]byte(err.Error()))
if err != nil {
openlogging.Error("can not write err to client: " + err.Error())
RecordStatus(inv, statusCode)
//RecordStatus record an operation status
func RecordStatus(inv *invocation.Invocation, statusCode int) {
LabelValues := map[string]string{metrics.LServiceName: inv.MicroServiceName, metrics.LApp: inv.RouteTags.AppID(), metrics.LVersion: inv.RouteTags.Version()}
metrics.RecordStatus(LabelValues, statusCode)
func copyHeader(dst, src http.Header) {
for k, vs := range src {
for _, v := range vs {
dst.Add(k, v)
func prepareRequest(req *http.Request) {
if req.ContentLength == 0 {
req.Body = nil
req.RequestURI = "" // client is forbidden to set RequestURI
req.Close = false
func postProcessResponse(rsp *http.Response) {