blob: 571b19e6719ebc862df63b71ecbab06558602fea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dubboproxy
import (
"context"
"encoding/json"
"fmt"
"github.com/apache/servicecomb-mesher/proxy/cmd"
"net/http"
"net/url"
mesherCommon "github.com/apache/servicecomb-mesher/proxy/common"
mesherRuntime "github.com/apache/servicecomb-mesher/proxy/pkg/runtime"
"github.com/apache/servicecomb-mesher/proxy/protocol"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/client"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/dubbo"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/schema"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/utils"
"github.com/apache/servicecomb-mesher/proxy/resolver"
"github.com/go-chassis/go-chassis/client/rest"
"github.com/go-chassis/go-chassis/core/common"
chassisCommon "github.com/go-chassis/go-chassis/core/common"
chassisconfig "github.com/go-chassis/go-chassis/core/config"
"github.com/go-chassis/go-chassis/core/handler"
"github.com/go-chassis/go-chassis/core/invocation"
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/core/loadbalancer"
"github.com/go-chassis/go-chassis/pkg/runtime"
"github.com/go-chassis/go-chassis/pkg/string"
"github.com/go-chassis/go-chassis/pkg/util/httputil"
"github.com/go-chassis/go-chassis/pkg/util/tags"
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
"github.com/go-mesh/openlogging"
)
var dr = resolver.GetDestinationResolver("http")
var sr = resolver.GetSourceResolver()
const (
ProxyTag = "mesherproxy"
)
//IsProvider is variable of type boolean used for tag proxyed dubbo service as provider(true) or consumer(false)
var IsProvider bool
// DubboListenAddr is a variable of type string used for storing listen address
var DubboListenAddr string
//ProxyError is a struct
type ProxyError struct {
Message string
}
//Error is a method which returns error
func (e ProxyError) Error() string {
return e.Message
}
//ConvertDubboReqToHTTPReq is a method which converts dubbo requesto to http request
func ConvertDubboReqToHTTPReq(ctx *dubbo.InvokeContext, dubboReq *dubbo.Request) *http.Request {
restReq := &http.Request{
URL: &url.URL{},
Header: make(http.Header),
}
args := dubboReq.GetArguments()
operateID := dubboReq.GetMethodName()
iName := dubboReq.GetAttachment(dubbo.PathKey, "")
methd := schema.GetMethodByInterface(iName, operateID)
if methd == nil {
lager.Logger.Error("GetMethodByInterface failed: Cannot find the method")
return nil
}
ctx.Method = methd
restReq.Method = methd.Verb
var (
i = 0
qureyNum = 0
paramsStr = "?"
body = []byte{}
)
for i = 0; i < len(args); i++ {
_, in := methd.GetParamNameAndWhere(i)
paraSchema := methd.GetParamSchema(i)
v := args[i]
if in == schema.InBody {
b, _ := json.Marshal(v.GetValue())
body = append(body, b...)
} else {
var fmtStr string
var value string
if paraSchema.Dtype == util.SchemaArray {
value = util.ArrayToQueryString(paraSchema.Name, v.GetValue())
fmtStr += value
} else {
value, _ = util.ObjectToString(paraSchema.Dtype, v.GetValue()) // (v.GetValue()).(string)
if qureyNum == 0 {
fmtStr = fmt.Sprintf("%s=%s", paraSchema.Name, url.QueryEscape(value))
qureyNum++
} else {
fmtStr = fmt.Sprintf("&%s=%s", paraSchema.Name, url.QueryEscape(value))
}
}
paramsStr += fmtStr
}
}
httputil.SetBody(restReq, body)
uri := methd.Path
if paramsStr != "?" {
uri += paramsStr
}
httputil.SetURI(restReq, uri)
tmpName := schema.GetSvcNameByInterface(iName)
if tmpName == "" {
lager.Logger.Error("GetSvcNameByInterface failed: Cannot find the svc")
return nil
}
restReq.URL.Host = tmpName // must after setURI
return restReq
}
//ConvertRestRspToDubboRsp is a function which converts rest response to dubbo response
func ConvertRestRspToDubboRsp(ctx *dubbo.InvokeContext, resp *http.Response, dubboRsp *dubbo.DubboRsp) {
var v interface{}
var err error
status := resp.StatusCode
body := httputil.ReadBody(resp)
if status >= http.StatusBadRequest {
dubboRsp.SetStatus(dubbo.ServerError)
if dubboRsp.GetErrorMsg() == "" && body != nil {
dubboRsp.SetErrorMsg(string(body))
}
return
}
dubboRsp.SetStatus(dubbo.Ok)
if body != nil {
rspSchema := (*(ctx.Method)).GetRspSchema(status)
if rspSchema != nil {
v, err = util.RestByteToValue(rspSchema.DType, body)
if err != nil {
dubboRsp.SetStatus(dubbo.BadResponse)
dubboRsp.SetErrorMsg(err.Error())
} else {
dubboRsp.SetValue(v)
}
} else {
dubboRsp.SetErrorMsg(string(body))
dubboRsp.SetStatus(dubbo.ServerError)
}
}
}
//SetLocalServiceAddress assign invocation endpoint a local service address
// it uses config in cmd or env fi
// if it is empty, then try to use original port from client as local port
func SetLocalServiceAddress(inv *invocation.Invocation) error {
inv.Endpoint = cmd.Configs.PortsMap[inv.Protocol]
if inv.Endpoint == "" {
if inv.Port != "" {
inv.Endpoint = "127.0.0.1:" + inv.Port
cmd.Configs.PortsMap[inv.Protocol] = inv.Endpoint
return nil
} else {
return fmt.Errorf("[%s] is not supported, [%s] didn't set env [%s] or cmd parameter --service-ports before mesher start",
inv.Protocol, inv.MicroServiceName, mesherCommon.EnvServicePorts)
}
}
return nil
}
//Handle is a function
func Handle(ctx *dubbo.InvokeContext) error {
interfaceName := ctx.Req.GetAttachment(dubbo.PathKey, "")
svc := schema.GetSvcByInterface(interfaceName)
if svc == nil {
return &util.BaseError{ErrMsg: "can't find the svc by " + interfaceName}
}
inv := new(invocation.Invocation)
inv.SourceServiceID = runtime.ServiceID
inv.SourceMicroService = ctx.Req.GetAttachment(common.HeaderSourceName, "")
inv.Args = ctx.Req
inv.Ctx = context.WithValue(context.Background(), chassisCommon.ContextHeaderKey{}, ctx.Req.GetAttachments())
inv.MicroServiceName = svc.ServiceName
inv.RouteTags = utiltags.NewDefaultTag(svc.Version, svc.AppID)
value := ctx.Req.GetAttachment(ProxyTag, "")
if value == "" { //come from proxyedDubboSvc
inv.Protocol = schema.GetSupportProto(svc)
} else {
inv.Protocol = "dubbo"
}
inv.URLPathFormat = ""
inv.Reply = &dubboclient.WrapResponse{nil} //&rest.Response{Resp: &ctx.Response}
var err error
err = SetLocalServiceAddress(inv) //select local service
if err != nil {
openlogging.Error(err.Error())
return err
}
var c *handler.Chain
if inv.Protocol == "dubbo" {
//发送请求
//value := ctx.Req.GetAttachment(ProxyTag, "")
if !IsProvider { //come from proxyedDubboSvc
ctx.Req.SetAttachment(common.HeaderSourceName, chassisconfig.SelfServiceName)
ctx.Req.SetAttachment(ProxyTag, "true")
if mesherRuntime.Role == mesherCommon.RoleSidecar {
c, err = handler.GetChain(common.Consumer, mesherCommon.ChainConsumerOutgoing)
if err != nil {
openlogging.Error("Get Consumer chain failed: " + err.Error())
return err
}
}
c.Next(inv, func(ir *invocation.Response) error {
return handleDubboRequest(inv, ctx, ir)
})
} else { //come from other mesher
ctx.Req.SetAttachment(ProxyTag, "")
c, err = handler.GetChain(common.Provider, mesherCommon.ChainProviderIncoming)
if err != nil {
openlogging.Error("Get Provider Chain failed: " + err.Error())
return err
}
c.Next(inv, func(ir *invocation.Response) error {
return handleDubboRequest(inv, ctx, ir)
})
}
} else {
return ProxyRestHandler(ctx)
}
return nil
}
func handleDubboRequest(inv *invocation.Invocation, ctx *dubbo.InvokeContext, ir *invocation.Response) error {
if ir != nil {
if ir.Err != nil {
switch ir.Err.(type) {
case hystrix.FallbackNullError:
ctx.Rsp.SetStatus(dubbo.Ok)
case hystrix.CircuitError:
ctx.Rsp.SetStatus(dubbo.ServiceError)
case loadbalancer.LBError:
ctx.Rsp.SetStatus(dubbo.ServiceNotFound)
default:
ctx.Rsp.SetStatus(dubbo.ServerError)
}
ctx.Rsp.SetErrorMsg(ir.Err.Error())
return ir.Err
}
if inv.Endpoint == "" {
ctx.Rsp.SetStatus(dubbo.ServerError)
ctx.Rsp.SetErrorMsg(protocol.ErrUnknown.Error())
return protocol.ErrUnknown
}
} else {
ctx.Rsp.SetStatus(dubbo.ServerError)
ctx.Rsp.SetErrorMsg(protocol.ErrUnExpectedHandlerChainResponse.Error())
return protocol.ErrUnExpectedHandlerChainResponse
}
if ir.Result != nil {
ctx.Rsp = ir.Result.(*dubboclient.WrapResponse).Resp
} else {
err := protocol.ErrNilResult
lager.Logger.Error("CAll Chain failed: " + err.Error())
return err
}
return nil
}
func preHandleToRest(ctx *dubbo.InvokeContext) (*http.Request, *invocation.Invocation, string) {
restReq := ConvertDubboReqToHTTPReq(ctx, ctx.Req)
if restReq == nil {
return nil, nil, ""
}
inv := new(invocation.Invocation)
inv.SourceServiceID = runtime.ServiceID
inv.Args = restReq
inv.Protocol = "rest"
inv.Reply = rest.NewResponse()
inv.URLPathFormat = restReq.URL.String()
inv.SchemaID = ""
inv.OperationID = ""
inv.Ctx = context.Background()
err := SetLocalServiceAddress(inv) //select local service
if err != nil {
openlogging.Error(err.Error())
}
source := stringutil.SplitFirstSep(ctx.RemoteAddr, ":")
return restReq, inv, source
}
//ProxyRestHandler is a function
func ProxyRestHandler(ctx *dubbo.InvokeContext) error {
var err error
var c *handler.Chain
req, inv, source := preHandleToRest(ctx)
if req == nil {
return &util.BaseError{ErrMsg: "request is invalid "}
}
source = "127.0.0.1" //"10.57.75.87"
//Resolve Source
si := sr.Resolve(source)
h := make(map[string]string)
for k := range req.Header {
h[k] = req.Header.Get(k)
}
//Resolve Destination
destination, _, err := dr.Resolve(source, "", inv.URLPathFormat, h)
if err != nil {
return err
}
inv.MicroServiceName = destination
if mesherRuntime.Role == mesherCommon.RoleSidecar {
c, err = handler.GetChain(common.Consumer, mesherCommon.ChainConsumerOutgoing)
if err != nil {
lager.Logger.Error("Get chain failed: " + err.Error())
return err
}
if si == nil {
lager.Logger.Info("Can not resolve " + source + " to Source info")
}
}
c.Next(inv, func(ir *invocation.Response) error {
//Send the request to the destination
return handleRequest(ctx, req, inv.Reply.(*http.Response), ctx.Rsp, inv, ir)
})
ConvertRestRspToDubboRsp(ctx, inv.Reply.(*http.Response), ctx.Rsp)
return nil
}
func handleRequest(ctx *dubbo.InvokeContext, req *http.Request, resp *http.Response,
dubboRsp *dubbo.DubboRsp, inv *invocation.Invocation, ir *invocation.Response) error {
if ir != nil {
if ir.Err != nil {
switch ir.Err.(type) {
case hystrix.FallbackNullError:
resp.StatusCode = http.StatusOK
dubboRsp.SetErrorMsg(ir.Err.Error())
case hystrix.CircuitError:
ir.Status = http.StatusServiceUnavailable
resp.StatusCode = http.StatusServiceUnavailable
dubboRsp.SetErrorMsg(ir.Err.Error())
case loadbalancer.LBError:
ir.Status = http.StatusBadGateway
resp.StatusCode = http.StatusBadGateway
dubboRsp.SetErrorMsg(ir.Err.Error())
default:
ir.Status = http.StatusInternalServerError
resp.StatusCode = http.StatusInternalServerError
dubboRsp.SetErrorMsg(ir.Err.Error())
}
return ir.Err
}
if inv.Endpoint == "" {
ir.Status = http.StatusInternalServerError
resp.StatusCode = http.StatusInternalServerError
dubboRsp.SetErrorMsg(ir.Err.Error())
return protocol.ErrUnknown
}
} else {
dubboRsp.SetErrorMsg(protocol.ErrUnExpectedHandlerChainResponse.Error())
return protocol.ErrUnExpectedHandlerChainResponse
}
ir.Status = resp.StatusCode
return nil
}