feat: register & run plugin (#3)
diff --git a/README.md b/README.md
index 0b9b041..413bc17 100644
--- a/README.md
+++ b/README.md
@@ -20,3 +20,4 @@
# Go Plugin Runner for Apache APISIX
[![Go Report Card](https://goreportcard.com/badge/github.com/apache/apisix-go-plugin-runner)](https://goreportcard.com/report/github.com/apache/apisix-go-plugin-runner)
+[![Build Status](https://github.com/apache/apisix-go-plugin-runner/workflows/unit-test-ci/badge.svg?branch=master)](https://github.com/apache/apisix-go-plugin-runner/actions)
diff --git a/internal/http/response.go b/internal/http/response.go
index d0aa8a4..4b2f64b 100644
--- a/internal/http/response.go
+++ b/internal/http/response.go
@@ -31,7 +31,9 @@
}
func (r *Response) Header() http.Header {
- r.hdr = http.Header{}
+ if r.hdr == nil {
+ r.hdr = http.Header{}
+ }
return r.hdr
}
@@ -58,8 +60,12 @@
r.hdr = nil
}
+func (r *Response) HasChange() bool {
+ return !(r.body == nil && r.code == 0 && len(r.hdr) == 0)
+}
+
func (r *Response) FetchChanges(id uint32, builder *flatbuffers.Builder) bool {
- if r.body == nil && r.code == 0 && len(r.hdr) == 0 {
+ if !r.HasChange() {
return false
}
diff --git a/internal/log/log.go b/internal/log/log.go
index b68ec2a..90f0026 100644
--- a/internal/log/log.go
+++ b/internal/log/log.go
@@ -33,7 +33,7 @@
zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
os.Stdout,
level)
- lg := zap.New(core, zap.AddStacktrace(zap.ErrorLevel), zap.AddCaller())
+ lg := zap.New(core, zap.AddStacktrace(zap.ErrorLevel), zap.AddCaller(), zap.AddCallerSkip(1))
return lg.Sugar()
}
diff --git a/internal/plugin/conf.go b/internal/plugin/conf.go
index 5c4dccf..20485fe 100644
--- a/internal/plugin/conf.go
+++ b/internal/plugin/conf.go
@@ -54,13 +54,33 @@
func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
req := pc.GetRootAsReq(buf, 0)
- entries := make(RuleConf, req.ConfLength())
+ entries := RuleConf{}
te := A6.TextEntry{}
for i := 0; i < req.ConfLength(); i++ {
if req.Conf(&te, i) {
- entries[i].Name = string(te.Name())
- entries[i].Value = te.Value
+ name := string(te.Name())
+ plugin := findPlugin(name)
+ if plugin == nil {
+ log.Warnf("can't find plugin %s, skip", name)
+ continue
+ }
+
+ log.Infof("prepare conf for plugin %s", name)
+
+ v := te.Value()
+ conf, err := plugin.ParseConf(v)
+ if err != nil {
+ log.Errorf(
+ "failed to parse configuration for plugin %s, configuration: %s",
+ name, string(v))
+ continue
+ }
+
+ entries = append(entries, ConfEntry{
+ Name: name,
+ Value: conf,
+ })
}
}
diff --git a/internal/plugin/conf_test.go b/internal/plugin/conf_test.go
index aa2ded4..a84dcea 100644
--- a/internal/plugin/conf_test.go
+++ b/internal/plugin/conf_test.go
@@ -15,6 +15,7 @@
package plugin
import (
+ "errors"
"sort"
"sync"
"testing"
@@ -47,6 +48,59 @@
assert.Equal(t, uint32(2), resp.ConfToken())
}
+func prepareConfWithData(builder *flatbuffers.Builder, arg ...flatbuffers.UOffsetT) {
+ tes := []flatbuffers.UOffsetT{}
+ for i := 0; i < len(arg); i += 2 {
+ A6.TextEntryStart(builder)
+ name := arg[i]
+ value := arg[i+1]
+ A6.TextEntryAddName(builder, name)
+ A6.TextEntryAddValue(builder, value)
+ te := A6.TextEntryEnd(builder)
+ tes = append(tes, te)
+ }
+
+ pc.ReqStartConfVector(builder, len(tes))
+ for i := len(tes) - 1; i >= 0; i-- {
+ builder.PrependUOffsetT(tes[i])
+ }
+ v := builder.EndVector(len(tes))
+
+ pc.ReqStart(builder)
+ pc.ReqAddConf(builder, v)
+ root := pc.ReqEnd(builder)
+ builder.Finish(root)
+ b := builder.FinishedBytes()
+
+ PrepareConf(b)
+}
+
+func TestPrepareConfUnknownPlugin(t *testing.T) {
+ InitConfCache(1 * time.Millisecond)
+ builder := flatbuffers.NewBuilder(1024)
+
+ name := builder.CreateString("xxx")
+ value := builder.CreateString(`{"body":"yes"}`)
+ prepareConfWithData(builder, name, value)
+ res, _ := GetRuleConf(1)
+ assert.Equal(t, 0, len(res))
+}
+
+func TestPrepareConfBadConf(t *testing.T) {
+ InitConfCache(1 * time.Millisecond)
+ builder := flatbuffers.NewBuilder(1024)
+
+ f := func(in []byte) (conf interface{}, err error) {
+ return nil, errors.New("ouch")
+ }
+ RegisterPlugin("bad_conf", f, emptyFilter)
+ name := builder.CreateString("bad_conf")
+ value := builder.CreateString(`{"body":"yes"}`)
+ prepareConfWithData(builder, name, value)
+ res, _ := GetRuleConf(1)
+ assert.Equal(t, 0, len(res))
+}
+
func TestPrepareConfConcurrently(t *testing.T) {
InitConfCache(10 * time.Millisecond)
@@ -104,6 +158,7 @@
}
func TestGetRuleConfCheckConf(t *testing.T) {
+ RegisterPlugin("echo", emptyParseConf, emptyFilter)
InitConfCache(1 * time.Millisecond)
builder := flatbuffers.NewBuilder(1024)
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index b7400b0..7fa6136 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -15,18 +15,89 @@
package plugin
import (
- "context"
+ "errors"
+ "fmt"
"net/http"
hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
flatbuffers "github.com/google/flatbuffers/go"
inHTTP "github.com/apache/apisix-go-plugin-runner/internal/http"
+ "github.com/apache/apisix-go-plugin-runner/internal/log"
"github.com/apache/apisix-go-plugin-runner/internal/util"
pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
)
-func handle(conf RuleConf, ctx context.Context, w http.ResponseWriter, r pkgHTTP.Request) error {
+type ParseConfFunc func(in []byte) (conf interface{}, err error)
+type FilterFunc func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request)
+
+type pluginOpts struct {
+ ParseConf ParseConfFunc
+ Filter FilterFunc
+}
+
+type ErrPluginRegistered struct {
+ name string
+}
+
+func (err ErrPluginRegistered) Error() string {
+ return fmt.Sprintf("plugin %s registered", err.name)
+}
+
+var (
+ pluginRegistry = map[string]*pluginOpts{}
+
+ ErrMissingName = errors.New("missing name")
+ ErrMissingParseConfMethod = errors.New("missing ParseConf method")
+ ErrMissingFilterMethod = errors.New("missing Filter method")
+)
+
+func RegisterPlugin(name string, pc ParseConfFunc, sv FilterFunc) error {
+ if name == "" {
+ return ErrMissingName
+ }
+ if pc == nil {
+ return ErrMissingParseConfMethod
+ }
+ if sv == nil {
+ return ErrMissingFilterMethod
+ }
+
+ opt := &pluginOpts{
+ ParseConf: pc,
+ Filter: sv,
+ }
+ if _, found := pluginRegistry[name]; found {
+ return ErrPluginRegistered{name}
+ }
+ pluginRegistry[name] = opt
+ return nil
+}
+
+func findPlugin(name string) *pluginOpts {
+ if opt, found := pluginRegistry[name]; found {
+ return opt
+ }
+ return nil
+}
+
+func filter(conf RuleConf, w *inHTTP.Response, r pkgHTTP.Request) error {
+ for _, c := range conf {
+ plugin := findPlugin(c.Name)
+ if plugin == nil {
+ log.Warnf("can't find plugin %s, skip", c.Name)
+ continue
+ }
+
+ log.Infof("run plugin %s", c.Name)
+
+ plugin.Filter(c.Value, w, r)
+
+ if w.HasChange() {
+ // response is generated, no need to continue
+ break
+ }
+ }
return nil
}
@@ -61,8 +132,7 @@
return nil, err
}
- ctx := context.Background()
- err = handle(conf, ctx, resp, req)
+ err = filter(conf, resp, req)
if err != nil {
return nil, err
}
diff --git a/internal/plugin/plugin_test.go b/internal/plugin/plugin_test.go
index c433f3f..eb942df 100644
--- a/internal/plugin/plugin_test.go
+++ b/internal/plugin/plugin_test.go
@@ -15,12 +15,26 @@
package plugin
import (
+ "net/http"
"testing"
"time"
hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/stretchr/testify/assert"
+
+ inHTTP "github.com/apache/apisix-go-plugin-runner/internal/http"
+ pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+)
+
+var (
+ emptyParseConf = func(in []byte) (conf interface{}, err error) {
+ return string(in), nil
+ }
+
+ emptyFilter = func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
+ return
+ }
)
func TestHTTPReqCall(t *testing.T) {
@@ -43,3 +57,61 @@
assert.Equal(t, uint32(233), resp.Id())
assert.Equal(t, hrc.ActionNONE, resp.ActionType())
}
+
+func TestRegisterPlugin(t *testing.T) {
+ assert.Equal(t, ErrMissingParseConfMethod,
+ RegisterPlugin("bad_conf", nil, emptyFilter))
+ assert.Equal(t, ErrMissingFilterMethod,
+ RegisterPlugin("bad_conf", emptyParseConf, nil))
+}
+
+func TestFilter(t *testing.T) {
+ fooParseConf := func(in []byte) (conf interface{}, err error) {
+ return "foo", nil
+ }
+ fooFilter := func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
+ w.Header().Add("foo", "bar")
+ assert.Equal(t, "foo", conf.(string))
+ }
+ barParseConf := func(in []byte) (conf interface{}, err error) {
+ return "bar", nil
+ }
+ barFilter := func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
+ r.Header().Set("foo", "bar")
+ assert.Equal(t, "bar", conf.(string))
+ }
+
+ RegisterPlugin("foo", fooParseConf, fooFilter)
+ RegisterPlugin("bar", barParseConf, barFilter)
+
+ builder := flatbuffers.NewBuilder(1024)
+ fooName := builder.CreateString("foo")
+ fooConf := builder.CreateString("foo")
+ barName := builder.CreateString("bar")
+ barConf := builder.CreateString("bar")
+ prepareConfWithData(builder, fooName, fooConf, barName, barConf)
+
+ res, _ := GetRuleConf(1)
+ hrc.ReqStart(builder)
+ hrc.ReqAddId(builder, 233)
+ hrc.ReqAddConfToken(builder, 1)
+ r := hrc.ReqEnd(builder)
+ builder.Finish(r)
+ out := builder.FinishedBytes()
+
+ req := inHTTP.CreateRequest(out)
+ resp := inHTTP.CreateResponse()
+ filter(res, resp, req)
+
+ assert.Equal(t, "bar", resp.Header().Get("foo"))
+ assert.Equal(t, "", req.Header().Get("foo"))
+
+ req = inHTTP.CreateRequest(out)
+ resp = inHTTP.CreateResponse()
+ prepareConfWithData(builder, barName, barConf, fooName, fooConf)
+ res, _ = GetRuleConf(2)
+ filter(res, resp, req)
+
+ assert.Equal(t, "bar", resp.Header().Get("foo"))
+ assert.Equal(t, "bar", req.Header().Get("foo"))
+}
diff --git a/internal/server/server.go b/internal/server/server.go
index fe2dcea..25e9718 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -16,7 +16,6 @@
import (
"encoding/binary"
- "errors"
"fmt"
"io"
"net"
@@ -27,10 +26,11 @@
"syscall"
"time"
+ flatbuffers "github.com/google/flatbuffers/go"
+
"github.com/apache/apisix-go-plugin-runner/internal/log"
"github.com/apache/apisix-go-plugin-runner/internal/plugin"
"github.com/apache/apisix-go-plugin-runner/internal/util"
- flatbuffers "github.com/google/flatbuffers/go"
)
const (
@@ -48,11 +48,13 @@
)
func readErr(n int, err error, required int) bool {
- if n < required {
- err = errors.New("truncated")
+ if 0 < n && n < required {
+ err = fmt.Errorf("truncated, only get the first %d bytes", n)
}
- if err != nil && err != io.EOF {
- log.Errorf("read: %s", err)
+ if err != nil {
+ if err != io.EOF {
+ log.Errorf("read: %s", err)
+ }
return true
}
return false
diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go
new file mode 100644
index 0000000..7ea4bd8
--- /dev/null
+++ b/pkg/plugin/plugin.go
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package plugin
+
+import (
+ "net/http"
+
+ "github.com/apache/apisix-go-plugin-runner/internal/plugin"
+ pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+)
+
+// PluginOpts represents the attributes of the Plugin
+type PluginOpts struct {
+ // Name (required) is the plguin name
+ Name string
+ // ParseConf (required) is the method to parse given plugin configuration. When the
+ // configuration can't be parsed, it will be skipped.
+ ParseConf func(in []byte) (conf interface{}, err error)
+ // Filter (required) is the method to handle request.
+ // It is like the `http.ServeHTTP`, plus the ctx and the configuration created by
+ // ParseConf.
+ //
+ // When the `w` is written, the execution of plugin chain will be stopped.
+ // We don't use onion model like Gin/Caddy because we don't serve the whole request lifecycle
+ // inside the runner. The plugin is only a filter running at one stage.
+ Filter func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request)
+}
+
+// RegisterPlugin register a plugin. Plugin which has the same name can't be registered twice.
+func RegisterPlugin(opt *PluginOpts) error {
+ return plugin.RegisterPlugin(opt.Name, opt.ParseConf, opt.Filter)
+}