feat: register & run plugin (#3)

diff --git a/README.md b/README.md
index 0b9b041..413bc17 100644
--- a/README.md
+++ b/README.md
@@ -20,3 +20,4 @@
 # Go Plugin Runner for Apache APISIX
 
 [![Go Report Card](https://goreportcard.com/badge/github.com/apache/apisix-go-plugin-runner)](https://goreportcard.com/report/github.com/apache/apisix-go-plugin-runner)
+[![Build Status](https://github.com/apache/apisix-go-plugin-runner/workflows/unit-test-ci/badge.svg?branch=master)](https://github.com/apache/apisix-go-plugin-runner/actions)
diff --git a/internal/http/response.go b/internal/http/response.go
index d0aa8a4..4b2f64b 100644
--- a/internal/http/response.go
+++ b/internal/http/response.go
@@ -31,7 +31,9 @@
 }
 
 func (r *Response) Header() http.Header {
-	r.hdr = http.Header{}
+	if r.hdr == nil {
+		r.hdr = http.Header{}
+	}
 	return r.hdr
 }
 
@@ -58,8 +60,12 @@
 	r.hdr = nil
 }
 
+func (r *Response) HasChange() bool {
+	return !(r.body == nil && r.code == 0 && len(r.hdr) == 0)
+}
+
 func (r *Response) FetchChanges(id uint32, builder *flatbuffers.Builder) bool {
-	if r.body == nil && r.code == 0 && len(r.hdr) == 0 {
+	if !r.HasChange() {
 		return false
 	}
 
diff --git a/internal/log/log.go b/internal/log/log.go
index b68ec2a..90f0026 100644
--- a/internal/log/log.go
+++ b/internal/log/log.go
@@ -33,7 +33,7 @@
 		zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
 		os.Stdout,
 		level)
-	lg := zap.New(core, zap.AddStacktrace(zap.ErrorLevel), zap.AddCaller())
+	lg := zap.New(core, zap.AddStacktrace(zap.ErrorLevel), zap.AddCaller(), zap.AddCallerSkip(1))
 	return lg.Sugar()
 }
 
diff --git a/internal/plugin/conf.go b/internal/plugin/conf.go
index 5c4dccf..20485fe 100644
--- a/internal/plugin/conf.go
+++ b/internal/plugin/conf.go
@@ -54,13 +54,33 @@
 
 func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
 	req := pc.GetRootAsReq(buf, 0)
-	entries := make(RuleConf, req.ConfLength())
+	entries := RuleConf{}
 
 	te := A6.TextEntry{}
 	for i := 0; i < req.ConfLength(); i++ {
 		if req.Conf(&te, i) {
-			entries[i].Name = string(te.Name())
-			entries[i].Value = te.Value
+			name := string(te.Name())
+			plugin := findPlugin(name)
+			if plugin == nil {
+				log.Warnf("can't find plugin %s, skip", name)
+				continue
+			}
+
+			log.Infof("prepare conf for plugin %s", name)
+
+			v := te.Value()
+			conf, err := plugin.ParseConf(v)
+			if err != nil {
+				log.Errorf(
+					"failed to parse configuration for plugin %s, configuration: %s",
+					name, string(v))
+				continue
+			}
+
+			entries = append(entries, ConfEntry{
+				Name:  name,
+				Value: conf,
+			})
 		}
 	}
 
diff --git a/internal/plugin/conf_test.go b/internal/plugin/conf_test.go
index aa2ded4..a84dcea 100644
--- a/internal/plugin/conf_test.go
+++ b/internal/plugin/conf_test.go
@@ -15,6 +15,7 @@
 package plugin
 
 import (
+	"errors"
 	"sort"
 	"sync"
 	"testing"
@@ -47,6 +48,59 @@
 	assert.Equal(t, uint32(2), resp.ConfToken())
 }
 
+func prepareConfWithData(builder *flatbuffers.Builder, arg ...flatbuffers.UOffsetT) {
+	tes := []flatbuffers.UOffsetT{}
+	for i := 0; i < len(arg); i += 2 {
+		A6.TextEntryStart(builder)
+		name := arg[i]
+		value := arg[i+1]
+		A6.TextEntryAddName(builder, name)
+		A6.TextEntryAddValue(builder, value)
+		te := A6.TextEntryEnd(builder)
+		tes = append(tes, te)
+	}
+
+	pc.ReqStartConfVector(builder, len(tes))
+	for i := len(tes) - 1; i >= 0; i-- {
+		builder.PrependUOffsetT(tes[i])
+	}
+	v := builder.EndVector(len(tes))
+
+	pc.ReqStart(builder)
+	pc.ReqAddConf(builder, v)
+	root := pc.ReqEnd(builder)
+	builder.Finish(root)
+	b := builder.FinishedBytes()
+
+	PrepareConf(b)
+}
+
+func TestPrepareConfUnknownPlugin(t *testing.T) {
+	InitConfCache(1 * time.Millisecond)
+	builder := flatbuffers.NewBuilder(1024)
+
+	name := builder.CreateString("xxx")
+	value := builder.CreateString(`{"body":"yes"}`)
+	prepareConfWithData(builder, name, value)
+	res, _ := GetRuleConf(1)
+	assert.Equal(t, 0, len(res))
+}
+
+func TestPrepareConfBadConf(t *testing.T) {
+	InitConfCache(1 * time.Millisecond)
+	builder := flatbuffers.NewBuilder(1024)
+
+	f := func(in []byte) (conf interface{}, err error) {
+		return nil, errors.New("ouch")
+	}
+	RegisterPlugin("bad_conf", f, emptyFilter)
+	name := builder.CreateString("bad_conf")
+	value := builder.CreateString(`{"body":"yes"}`)
+	prepareConfWithData(builder, name, value)
+	res, _ := GetRuleConf(1)
+	assert.Equal(t, 0, len(res))
+}
+
 func TestPrepareConfConcurrently(t *testing.T) {
 	InitConfCache(10 * time.Millisecond)
 
@@ -104,6 +158,7 @@
 }
 
 func TestGetRuleConfCheckConf(t *testing.T) {
+	RegisterPlugin("echo", emptyParseConf, emptyFilter)
 	InitConfCache(1 * time.Millisecond)
 	builder := flatbuffers.NewBuilder(1024)
 
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index b7400b0..7fa6136 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -15,18 +15,89 @@
 package plugin
 
 import (
-	"context"
+	"errors"
+	"fmt"
 	"net/http"
 
 	hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
 	flatbuffers "github.com/google/flatbuffers/go"
 
 	inHTTP "github.com/apache/apisix-go-plugin-runner/internal/http"
+	"github.com/apache/apisix-go-plugin-runner/internal/log"
 	"github.com/apache/apisix-go-plugin-runner/internal/util"
 	pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
 )
 
-func handle(conf RuleConf, ctx context.Context, w http.ResponseWriter, r pkgHTTP.Request) error {
+type ParseConfFunc func(in []byte) (conf interface{}, err error)
+type FilterFunc func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request)
+
+type pluginOpts struct {
+	ParseConf ParseConfFunc
+	Filter    FilterFunc
+}
+
+type ErrPluginRegistered struct {
+	name string
+}
+
+func (err ErrPluginRegistered) Error() string {
+	return fmt.Sprintf("plugin %s registered", err.name)
+}
+
+var (
+	pluginRegistry = map[string]*pluginOpts{}
+
+	ErrMissingName            = errors.New("missing name")
+	ErrMissingParseConfMethod = errors.New("missing ParseConf method")
+	ErrMissingFilterMethod    = errors.New("missing Filter method")
+)
+
+func RegisterPlugin(name string, pc ParseConfFunc, sv FilterFunc) error {
+	if name == "" {
+		return ErrMissingName
+	}
+	if pc == nil {
+		return ErrMissingParseConfMethod
+	}
+	if sv == nil {
+		return ErrMissingFilterMethod
+	}
+
+	opt := &pluginOpts{
+		ParseConf: pc,
+		Filter:    sv,
+	}
+	if _, found := pluginRegistry[name]; found {
+		return ErrPluginRegistered{name}
+	}
+	pluginRegistry[name] = opt
+	return nil
+}
+
+func findPlugin(name string) *pluginOpts {
+	if opt, found := pluginRegistry[name]; found {
+		return opt
+	}
+	return nil
+}
+
+func filter(conf RuleConf, w *inHTTP.Response, r pkgHTTP.Request) error {
+	for _, c := range conf {
+		plugin := findPlugin(c.Name)
+		if plugin == nil {
+			log.Warnf("can't find plugin %s, skip", c.Name)
+			continue
+		}
+
+		log.Infof("run plugin %s", c.Name)
+
+		plugin.Filter(c.Value, w, r)
+
+		if w.HasChange() {
+			// response is generated, no need to continue
+			break
+		}
+	}
 	return nil
 }
 
@@ -61,8 +132,7 @@
 		return nil, err
 	}
 
-	ctx := context.Background()
-	err = handle(conf, ctx, resp, req)
+	err = filter(conf, resp, req)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/plugin/plugin_test.go b/internal/plugin/plugin_test.go
index c433f3f..eb942df 100644
--- a/internal/plugin/plugin_test.go
+++ b/internal/plugin/plugin_test.go
@@ -15,12 +15,26 @@
 package plugin
 
 import (
+	"net/http"
 	"testing"
 	"time"
 
 	hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
 	flatbuffers "github.com/google/flatbuffers/go"
 	"github.com/stretchr/testify/assert"
+
+	inHTTP "github.com/apache/apisix-go-plugin-runner/internal/http"
+	pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+)
+
+var (
+	emptyParseConf = func(in []byte) (conf interface{}, err error) {
+		return string(in), nil
+	}
+
+	emptyFilter = func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
+		return
+	}
 )
 
 func TestHTTPReqCall(t *testing.T) {
@@ -43,3 +57,61 @@
 	assert.Equal(t, uint32(233), resp.Id())
 	assert.Equal(t, hrc.ActionNONE, resp.ActionType())
 }
+
+func TestRegisterPlugin(t *testing.T) {
+	assert.Equal(t, ErrMissingParseConfMethod,
+		RegisterPlugin("bad_conf", nil, emptyFilter))
+	assert.Equal(t, ErrMissingFilterMethod,
+		RegisterPlugin("bad_conf", emptyParseConf, nil))
+}
+
+func TestFilter(t *testing.T) {
+	fooParseConf := func(in []byte) (conf interface{}, err error) {
+		return "foo", nil
+	}
+	fooFilter := func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
+		w.Header().Add("foo", "bar")
+		assert.Equal(t, "foo", conf.(string))
+	}
+	barParseConf := func(in []byte) (conf interface{}, err error) {
+		return "bar", nil
+	}
+	barFilter := func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request) {
+		r.Header().Set("foo", "bar")
+		assert.Equal(t, "bar", conf.(string))
+	}
+
+	RegisterPlugin("foo", fooParseConf, fooFilter)
+	RegisterPlugin("bar", barParseConf, barFilter)
+
+	builder := flatbuffers.NewBuilder(1024)
+	fooName := builder.CreateString("foo")
+	fooConf := builder.CreateString("foo")
+	barName := builder.CreateString("bar")
+	barConf := builder.CreateString("bar")
+	prepareConfWithData(builder, fooName, fooConf, barName, barConf)
+
+	res, _ := GetRuleConf(1)
+	hrc.ReqStart(builder)
+	hrc.ReqAddId(builder, 233)
+	hrc.ReqAddConfToken(builder, 1)
+	r := hrc.ReqEnd(builder)
+	builder.Finish(r)
+	out := builder.FinishedBytes()
+
+	req := inHTTP.CreateRequest(out)
+	resp := inHTTP.CreateResponse()
+	filter(res, resp, req)
+
+	assert.Equal(t, "bar", resp.Header().Get("foo"))
+	assert.Equal(t, "", req.Header().Get("foo"))
+
+	req = inHTTP.CreateRequest(out)
+	resp = inHTTP.CreateResponse()
+	prepareConfWithData(builder, barName, barConf, fooName, fooConf)
+	res, _ = GetRuleConf(2)
+	filter(res, resp, req)
+
+	assert.Equal(t, "bar", resp.Header().Get("foo"))
+	assert.Equal(t, "bar", req.Header().Get("foo"))
+}
diff --git a/internal/server/server.go b/internal/server/server.go
index fe2dcea..25e9718 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -16,7 +16,6 @@
 
 import (
 	"encoding/binary"
-	"errors"
 	"fmt"
 	"io"
 	"net"
@@ -27,10 +26,11 @@
 	"syscall"
 	"time"
 
+	flatbuffers "github.com/google/flatbuffers/go"
+
 	"github.com/apache/apisix-go-plugin-runner/internal/log"
 	"github.com/apache/apisix-go-plugin-runner/internal/plugin"
 	"github.com/apache/apisix-go-plugin-runner/internal/util"
-	flatbuffers "github.com/google/flatbuffers/go"
 )
 
 const (
@@ -48,11 +48,13 @@
 )
 
 func readErr(n int, err error, required int) bool {
-	if n < required {
-		err = errors.New("truncated")
+	if 0 < n && n < required {
+		err = fmt.Errorf("truncated, only get the first %d bytes", n)
 	}
-	if err != nil && err != io.EOF {
-		log.Errorf("read: %s", err)
+	if err != nil {
+		if err != io.EOF {
+			log.Errorf("read: %s", err)
+		}
 		return true
 	}
 	return false
diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go
new file mode 100644
index 0000000..7ea4bd8
--- /dev/null
+++ b/pkg/plugin/plugin.go
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package plugin
+
+import (
+	"net/http"
+
+	"github.com/apache/apisix-go-plugin-runner/internal/plugin"
+	pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+)
+
+// PluginOpts represents the attributes of the Plugin
+type PluginOpts struct {
+	// Name (required) is the plguin name
+	Name string
+	// ParseConf (required) is the method to parse given plugin configuration. When the
+	// configuration can't be parsed, it will be skipped.
+	ParseConf func(in []byte) (conf interface{}, err error)
+	// Filter (required) is the method to handle request.
+	// It is like the `http.ServeHTTP`, plus the ctx and the configuration created by
+	// ParseConf.
+	//
+	// When the `w` is written, the execution of plugin chain will be stopped.
+	// We don't use onion model like Gin/Caddy because we don't serve the whole request lifecycle
+	// inside the runner. The plugin is only a filter running at one stage.
+	Filter func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request)
+}
+
+// RegisterPlugin register a plugin. Plugin which has the same name can't be registered twice.
+func RegisterPlugin(opt *PluginOpts) error {
+	return plugin.RegisterPlugin(opt.Name, opt.ParseConf, opt.Filter)
+}