blob: 1593fe1593fd244a89df8ab7354689ca7ea3d990 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package http
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
)
import (
dubboConstant "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/dubbo"
"github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
dubbo2 "github.com/apache/dubbo-go-pixiu/pkg/context/dubbo"
"github.com/apache/dubbo-go-pixiu/pkg/server"
)
const (
Kind = constant.DubboHttpFilter
)
func init() {
filter.RegisterDubboFilterPlugin(&Plugin{})
}
type (
// Plugin dubbo to http transform plugin
Plugin struct {
}
// Config config
Config struct {
}
// Filter dubbo to http transform filter
Filter struct {
Config *Config
}
)
// Kind the filter kind
func (p Plugin) Kind() string {
return Kind
}
// CreateFilter return the filter
func (p Plugin) CreateFilter(config interface{}) (filter.DubboFilter, error) {
cfg := config.(*Config)
return Filter{Config: cfg}, nil
}
// Config Expose the config so that Filter Manger can inject it, so it must be a pointer
func (p Plugin) Config() interface{} {
return &Config{}
}
// Handle transform rpcInvocation to http
func (f Filter) Handle(ctx *dubbo2.RpcContext) filter.FilterStatus {
ra := ctx.Route
clusterName := ra.Cluster
clusterManager := server.GetClusterManager()
endpoint := clusterManager.PickEndpoint(clusterName, ctx)
if endpoint == nil {
ctx.SetError(errors.Errorf("Requested dubbo rpc invocation endpoint not found"))
return filter.Stop
}
var (
req *http.Request
err error
)
invoc := ctx.RpcInvocation
// path's format /{service}/{method}
interfaceKey, _ := invoc.GetAttachment(constant.InterfaceKey)
// work when invocation is generic
// when invocation is generic, there are three value in arguments. first is methodName, second is types, third is values
methodName := invoc.Arguments()[0].(string)
path := interfaceKey + "/" + methodName
parsedURL := url.URL{
Host: endpoint.Address.GetAddress(),
Scheme: "http",
Path: path,
}
body := invoc.Arguments()[2]
b, _ := json.Marshal(body)
req, err = http.NewRequest(http.MethodPost, parsedURL.String(), strings.NewReader(string(b)))
if err != nil {
err := errors.New(fmt.Sprintf("create new request failed: %v", err))
ctx.SetError(err)
return filter.Stop
}
versionKey, _ := invoc.GetAttachment(dubboConstant.VersionKey)
groupKey, _ := invoc.GetAttachment(dubboConstant.GroupKey)
req.Header.Set(constant.DubboHttpDubboVersion, "1.0.0")
req.Header.Set(constant.DubboServiceProtocol, dubbo.DUBBO)
req.Header.Set(constant.DubboServiceVersion, versionKey)
req.Header.Set(constant.DubboGroup, groupKey)
resp, err := (&http.Client{}).Do(req)
if err != nil {
ctx.SetError(err)
return filter.Stop
}
if resp.StatusCode != http.StatusOK {
ctx.SetError(errors.New(fmt.Sprintf("upstream http response status code %d", resp.StatusCode)))
return filter.Stop
}
s, _ := io.ReadAll(resp.Body)
result := &protocol.RPCResult{}
result.Rest = string(s)
ctx.SetResult(result)
return filter.Continue
}