| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package http |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "net" |
| "net/http" |
| "net/url" |
| "reflect" |
| "sync" |
| "time" |
| |
| "github.com/api7/ext-plugin-proto/go/A6" |
| ei "github.com/api7/ext-plugin-proto/go/A6/ExtraInfo" |
| hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall" |
| flatbuffers "github.com/google/flatbuffers/go" |
| |
| "github.com/apache/apisix-go-plugin-runner/internal/util" |
| "github.com/apache/apisix-go-plugin-runner/pkg/common" |
| pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http" |
| "github.com/apache/apisix-go-plugin-runner/pkg/log" |
| ) |
| |
| type Request struct { |
| // the root of the flatbuffers HTTPReqCall Request msg |
| r *hrc.Req |
| |
| conn net.Conn |
| extraInfoHeader []byte |
| |
| path []byte |
| |
| hdr *Header |
| rawHdr http.Header |
| |
| args url.Values |
| rawArgs url.Values |
| |
| vars map[string][]byte |
| body []byte |
| |
| ctx context.Context |
| cancel context.CancelFunc |
| |
| respHdr http.Header |
| } |
| |
| func (r *Request) ConfToken() uint32 { |
| return r.r.ConfToken() |
| } |
| |
| func (r *Request) ID() uint32 { |
| return r.r.Id() |
| } |
| |
| func (r *Request) SrcIP() net.IP { |
| return r.r.SrcIpBytes() |
| } |
| |
| func (r *Request) Method() string { |
| return r.r.Method().String() |
| } |
| |
| func (r *Request) Path() []byte { |
| if r.path == nil { |
| return r.r.Path() |
| } |
| return r.path |
| } |
| |
| func (r *Request) SetPath(path []byte) { |
| r.path = path |
| } |
| |
| func (r *Request) Header() pkgHTTP.Header { |
| if r.hdr == nil { |
| hdr := newHeader() |
| hh := hdr.View() |
| size := r.r.HeadersLength() |
| obj := A6.TextEntry{} |
| for i := 0; i < size; i++ { |
| if r.r.Headers(&obj, i) { |
| hh.Add(string(obj.Name()), string(obj.Value())) |
| } |
| } |
| r.hdr = hdr |
| r.rawHdr = hdr.Clone() |
| } |
| return r.hdr |
| } |
| |
| func (r *Request) RespHeader() http.Header { |
| if r.respHdr == nil { |
| r.respHdr = http.Header{} |
| } |
| return r.respHdr |
| } |
| |
| func cloneUrlValues(oldV url.Values) url.Values { |
| nv := 0 |
| for _, vv := range oldV { |
| nv += len(vv) |
| } |
| sv := make([]string, nv) |
| newV := make(url.Values, len(oldV)) |
| for k, vv := range oldV { |
| n := copy(sv, vv) |
| newV[k] = sv[:n:n] |
| sv = sv[n:] |
| } |
| return newV |
| } |
| |
| func (r *Request) Args() url.Values { |
| if r.args == nil { |
| args := url.Values{} |
| size := r.r.ArgsLength() |
| obj := A6.TextEntry{} |
| for i := 0; i < size; i++ { |
| if r.r.Args(&obj, i) { |
| args.Add(string(obj.Name()), string(obj.Value())) |
| } |
| } |
| r.args = args |
| r.rawArgs = cloneUrlValues(args) |
| } |
| return r.args |
| } |
| |
| func (r *Request) Var(name string) ([]byte, error) { |
| if r.vars == nil { |
| r.vars = map[string][]byte{} |
| } |
| |
| var v []byte |
| var found bool |
| |
| if v, found = r.vars[name]; !found { |
| var err error |
| |
| builder := util.GetBuilder() |
| varName := builder.CreateString(name) |
| ei.VarStart(builder) |
| ei.VarAddName(builder, varName) |
| varInfo := ei.VarEnd(builder) |
| v, err = r.askExtraInfo(builder, ei.InfoVar, varInfo) |
| util.PutBuilder(builder) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| r.vars[name] = v |
| } |
| return v, nil |
| } |
| |
| func (r *Request) Body() ([]byte, error) { |
| if len(r.body) > 0 { |
| return r.body, nil |
| } |
| |
| builder := util.GetBuilder() |
| ei.ReqBodyStart(builder) |
| bodyInfo := ei.ReqBodyEnd(builder) |
| v, err := r.askExtraInfo(builder, ei.InfoReqBody, bodyInfo) |
| if err != nil { |
| return nil, err |
| } |
| |
| r.body = v |
| return v, nil |
| } |
| |
| func (r *Request) Reset() { |
| defer r.cancel() |
| r.path = nil |
| r.hdr = nil |
| r.args = nil |
| |
| r.vars = nil |
| r.body = nil |
| r.conn = nil |
| r.ctx = nil |
| r.respHdr = nil |
| // Keep the fields below |
| // r.extraInfoHeader = nil |
| } |
| |
| func (r *Request) FetchChanges(id uint32, builder *flatbuffers.Builder) bool { |
| if r.path == nil && r.hdr == nil && r.args == nil && r.respHdr == nil { |
| return false |
| } |
| |
| var path flatbuffers.UOffsetT |
| if r.path != nil { |
| path = builder.CreateByteString(r.path) |
| } |
| |
| var hdrVec, respHdrVec flatbuffers.UOffsetT |
| if r.hdr != nil { |
| hdrs := []flatbuffers.UOffsetT{} |
| oldHdr := r.rawHdr |
| newHdr := r.hdr.View() |
| for n := range oldHdr { |
| if _, ok := newHdr[n]; !ok { |
| // deleted |
| name := builder.CreateString(n) |
| A6.TextEntryStart(builder) |
| A6.TextEntryAddName(builder, name) |
| te := A6.TextEntryEnd(builder) |
| hdrs = append(hdrs, te) |
| } |
| } |
| for n, v := range newHdr { |
| if raw, ok := oldHdr[n]; !ok || raw[0] != v[0] { |
| // set |
| name := builder.CreateString(n) |
| value := builder.CreateString(v[0]) |
| A6.TextEntryStart(builder) |
| A6.TextEntryAddName(builder, name) |
| A6.TextEntryAddValue(builder, value) |
| te := A6.TextEntryEnd(builder) |
| hdrs = append(hdrs, te) |
| } |
| } |
| size := len(hdrs) |
| hrc.RewriteStartHeadersVector(builder, size) |
| for i := size - 1; i >= 0; i-- { |
| te := hdrs[i] |
| builder.PrependUOffsetT(te) |
| } |
| hdrVec = builder.EndVector(size) |
| } |
| |
| if r.respHdr != nil { |
| respHdrs := []flatbuffers.UOffsetT{} |
| for n, arr := range r.respHdr { |
| for _, v := range arr { |
| name := builder.CreateString(n) |
| value := builder.CreateString(v) |
| A6.TextEntryStart(builder) |
| A6.TextEntryAddName(builder, name) |
| A6.TextEntryAddValue(builder, value) |
| te := A6.TextEntryEnd(builder) |
| respHdrs = append(respHdrs, te) |
| } |
| } |
| size := len(respHdrs) |
| hrc.RewriteStartRespHeadersVector(builder, size) |
| for i := size - 1; i >= 0; i-- { |
| te := respHdrs[i] |
| builder.PrependUOffsetT(te) |
| } |
| respHdrVec = builder.EndVector(size) |
| } |
| |
| var argsVec flatbuffers.UOffsetT |
| if r.args != nil { |
| args := []flatbuffers.UOffsetT{} |
| oldArgs := r.rawArgs |
| newArgs := r.args |
| for n := range oldArgs { |
| if _, ok := newArgs[n]; !ok { |
| // deleted |
| name := builder.CreateString(n) |
| A6.TextEntryStart(builder) |
| A6.TextEntryAddName(builder, name) |
| te := A6.TextEntryEnd(builder) |
| args = append(args, te) |
| } |
| } |
| for n, v := range newArgs { |
| if raw, ok := oldArgs[n]; !ok || !reflect.DeepEqual(raw, v) { |
| // set / add |
| for _, vv := range v { |
| name := builder.CreateString(n) |
| value := builder.CreateString(vv) |
| A6.TextEntryStart(builder) |
| A6.TextEntryAddName(builder, name) |
| A6.TextEntryAddValue(builder, value) |
| te := A6.TextEntryEnd(builder) |
| args = append(args, te) |
| } |
| } |
| } |
| size := len(args) |
| hrc.RewriteStartArgsVector(builder, size) |
| for i := size - 1; i >= 0; i-- { |
| te := args[i] |
| builder.PrependUOffsetT(te) |
| } |
| argsVec = builder.EndVector(size) |
| } |
| |
| hrc.RewriteStart(builder) |
| if path > 0 { |
| hrc.RewriteAddPath(builder, path) |
| } |
| if hdrVec > 0 { |
| hrc.RewriteAddHeaders(builder, hdrVec) |
| } |
| if respHdrVec > 0 { |
| hrc.RewriteAddRespHeaders(builder, respHdrVec) |
| } |
| if argsVec > 0 { |
| hrc.RewriteAddArgs(builder, argsVec) |
| } |
| rewrite := hrc.RewriteEnd(builder) |
| |
| hrc.RespStart(builder) |
| hrc.RespAddId(builder, id) |
| hrc.RespAddActionType(builder, hrc.ActionRewrite) |
| hrc.RespAddAction(builder, rewrite) |
| res := hrc.RespEnd(builder) |
| builder.Finish(res) |
| |
| return true |
| } |
| |
| func (r *Request) BindConn(c net.Conn) { |
| r.conn = c |
| } |
| |
| func (r *Request) Context() context.Context { |
| if r.ctx != nil { |
| return r.ctx |
| } |
| return context.Background() |
| } |
| |
| func (r *Request) askExtraInfo(builder *flatbuffers.Builder, |
| infoType ei.Info, info flatbuffers.UOffsetT) ([]byte, error) { |
| |
| ei.ReqStart(builder) |
| ei.ReqAddInfoType(builder, infoType) |
| ei.ReqAddInfo(builder, info) |
| eiRes := ei.ReqEnd(builder) |
| builder.Finish(eiRes) |
| |
| c := r.conn |
| if len(r.extraInfoHeader) == 0 { |
| r.extraInfoHeader = make([]byte, util.HeaderLen) |
| } |
| header := r.extraInfoHeader |
| |
| out := builder.FinishedBytes() |
| size := len(out) |
| binary.BigEndian.PutUint32(header, uint32(size)) |
| header[0] = util.RPCExtraInfo |
| |
| n, err := util.WriteBytes(c, header, len(header)) |
| if err != nil { |
| util.WriteErr(n, err) |
| return nil, common.ErrConnClosed |
| } |
| |
| n, err = util.WriteBytes(c, out, size) |
| if err != nil { |
| util.WriteErr(n, err) |
| return nil, common.ErrConnClosed |
| } |
| |
| n, err = util.ReadBytes(c, header, util.HeaderLen) |
| if util.ReadErr(n, err, util.HeaderLen) { |
| return nil, common.ErrConnClosed |
| } |
| |
| ty := header[0] |
| header[0] = 0 |
| length := binary.BigEndian.Uint32(header) |
| |
| log.Infof("receive rpc type: %d data length: %d", ty, length) |
| |
| buf := make([]byte, length) |
| n, err = util.ReadBytes(c, buf, int(length)) |
| if util.ReadErr(n, err, int(length)) { |
| return nil, common.ErrConnClosed |
| } |
| |
| resp := ei.GetRootAsResp(buf, 0) |
| res := resp.ResultBytes() |
| return res, nil |
| } |
| |
| var reqPool = sync.Pool{ |
| New: func() interface{} { |
| return &Request{} |
| }, |
| } |
| |
| func CreateRequest(buf []byte) *Request { |
| req := reqPool.Get().(*Request) |
| req.r = hrc.GetRootAsReq(buf, 0) |
| // because apisix has an implicit 60s timeout, so set the timeout to 56 seconds(smaller than 60s) |
| // so plugin writer can still break the execution with a custom response before the apisix implicit timeout. |
| ctx, cancel := context.WithTimeout(context.Background(), 56*time.Second) |
| req.ctx = ctx |
| req.cancel = cancel |
| return req |
| } |
| |
| func ReuseRequest(r *Request) { |
| r.Reset() |
| reqPool.Put(r) |
| } |