feat: add mqtt-proxy plugin in ApisixRoute (#1056)
diff --git a/.github/workflows/spell-checker.yml b/.github/workflows/spell-checker.yml
index ef1d159..074f3bd 100644
--- a/.github/workflows/spell-checker.yml
+++ b/.github/workflows/spell-checker.yml
@@ -35,5 +35,5 @@
           wget -O - -q https://git.io/misspell | sh -s -- -b .
       - name: Misspell
         run: |
-          find . -name "*.go" -type f | xargs ./misspell -error
+          find . -name "*.go" -type f | xargs ./misspell -i mosquitto -error
           find docs -type f | xargs ./misspell -error
diff --git a/docs/en/latest/references/apisix_route_v2.md b/docs/en/latest/references/apisix_route_v2.md
index 341b9cb..6deb9c2 100644
--- a/docs/en/latest/references/apisix_route_v2.md
+++ b/docs/en/latest/references/apisix_route_v2.md
@@ -73,6 +73,10 @@
 | stream[].backend.servicePort         | integer or string  | The backend service port, can be the port number or the name defined in the service object.                                                                                                                                       |
 | stream[].backend.resolveGranularity  | string             | See [Service Resolve Granularity](#service-resolve-granularity) for the details.                                                                                                                                                  |
 | stream[].backend.subset              | string             | Subset specifies a subset for the target Service. The subset should be pre-definedin ApisixUpstream about this service.                                                                                                           |
+| stream[].plugins                     | array              | A series of APISIX plugins that will be executed once this route rule is matched                                                                                                                                                  |
+| stream[].plugins[].name              | string             | The plugin name, see [docs](http://apisix.apache.org/docs/apisix/getting-started) for learning the available plugins.                                                                                                             |
+| stream[].plugins[].enable            | boolean            | Whether the plugin would be used                                                                                                                                                                                                  |
+| stream[].plugins[].config            | object             | The configuration of the plugin that must have the same fields as in APISIX.                                                                                                                                                      |
 
 ## Expression Operators
 
diff --git a/pkg/api/validation/apisix_route_test.go b/pkg/api/validation/apisix_route_test.go
index 7249aa9..f51ba0a 100644
--- a/pkg/api/validation/apisix_route_test.go
+++ b/pkg/api/validation/apisix_route_test.go
@@ -124,7 +124,7 @@
 	fakeClient := newFakeSchemaClient()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			gotValid, _ := validatePlugin(fakeClient, tt.pluginName, v2.ApisixRouteHTTPPluginConfig(tt.pluginConfig))
+			gotValid, _ := validatePlugin(fakeClient, tt.pluginName, v2.ApisixRoutePluginConfig(tt.pluginConfig))
 			if gotValid != tt.wantValid {
 				t.Errorf("validatePlugin() gotValid = %v, want %v", gotValid, tt.wantValid)
 			}
diff --git a/pkg/kube/apisix/apis/config/v2/types.go b/pkg/kube/apisix/apis/config/v2/types.go
index 6adc5c0..666d7b6 100644
--- a/pkg/kube/apisix/apis/config/v2/types.go
+++ b/pkg/kube/apisix/apis/config/v2/types.go
@@ -70,7 +70,7 @@
 	Backends         []ApisixRouteHTTPBackend  `json:"backends,omitempty" yaml:"backends,omitempty"`
 	Websocket        bool                      `json:"websocket" yaml:"websocket"`
 	PluginConfigName string                    `json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"`
-	Plugins          []ApisixRouteHTTPPlugin   `json:"plugins,omitempty" yaml:"plugins,omitempty"`
+	Plugins          []ApisixRoutePlugin       `json:"plugins,omitempty" yaml:"plugins,omitempty"`
 	Authentication   ApisixRouteAuthentication `json:"authentication,omitempty" yaml:"authentication,omitempty"`
 }
 
@@ -152,19 +152,19 @@
 	Name string `json:"name" yaml:"name"`
 }
 
-// ApisixRouteHTTPPlugin represents an APISIX plugin.
-type ApisixRouteHTTPPlugin struct {
+// ApisixRoutePlugin represents an APISIX plugin.
+type ApisixRoutePlugin struct {
 	// The plugin name.
 	Name string `json:"name" yaml:"name"`
 	// Whether this plugin is in use, default is true.
 	Enable bool `json:"enable" yaml:"enable"`
 	// Plugin configuration.
-	Config ApisixRouteHTTPPluginConfig `json:"config" yaml:"config"`
+	Config ApisixRoutePluginConfig `json:"config" yaml:"config"`
 }
 
-// ApisixRouteHTTPPluginConfig is the configuration for
+// ApisixRoutePluginConfig is the configuration for
 // any plugins.
-type ApisixRouteHTTPPluginConfig map[string]interface{}
+type ApisixRoutePluginConfig map[string]interface{}
 
 // ApisixRouteAuthentication is the authentication-related
 // configuration in ApisixRoute.
@@ -189,16 +189,16 @@
 	Cookie string `json:"cookie,omitempty" yaml:"cookie,omitempty"`
 }
 
-func (p ApisixRouteHTTPPluginConfig) DeepCopyInto(out *ApisixRouteHTTPPluginConfig) {
+func (p ApisixRoutePluginConfig) DeepCopyInto(out *ApisixRoutePluginConfig) {
 	b, _ := json.Marshal(&p)
 	_ = json.Unmarshal(b, out)
 }
 
-func (p *ApisixRouteHTTPPluginConfig) DeepCopy() *ApisixRouteHTTPPluginConfig {
+func (p *ApisixRoutePluginConfig) DeepCopy() *ApisixRoutePluginConfig {
 	if p == nil {
 		return nil
 	}
-	out := new(ApisixRouteHTTPPluginConfig)
+	out := new(ApisixRoutePluginConfig)
 	p.DeepCopyInto(out)
 	return out
 }
@@ -210,6 +210,7 @@
 	Protocol string                   `json:"protocol" yaml:"protocol"`
 	Match    ApisixRouteStreamMatch   `json:"match" yaml:"match"`
 	Backend  ApisixRouteStreamBackend `json:"backend" yaml:"backend"`
+	Plugins  []ApisixRoutePlugin      `json:"plugins,omitempty" yaml:"plugins,omitempty"`
 }
 
 // ApisixRouteStreamMatch represents the match conditions of stream route.
@@ -691,9 +692,9 @@
 
 // ApisixPluginConfigSpec defines the desired state of ApisixPluginConfigSpec.
 type ApisixPluginConfigSpec struct {
-	// Plugins contains a list of ApisixRouteHTTPPlugin
+	// Plugins contains a list of ApisixRoutePlugin
 	// +required
-	Plugins []ApisixRouteHTTPPlugin `json:"plugins" yaml:"plugins"`
+	Plugins []ApisixRoutePlugin `json:"plugins" yaml:"plugins"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
index 1b94ae8..63c3ea1 100644
--- a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
@@ -669,7 +669,7 @@
 	*out = *in
 	if in.Plugins != nil {
 		in, out := &in.Plugins, &out.Plugins
-		*out = make([]ApisixRouteHTTPPlugin, len(*in))
+		*out = make([]ApisixRoutePlugin, len(*in))
 		for i := range *in {
 			(*in)[i].DeepCopyInto(&(*out)[i])
 		}
@@ -783,7 +783,7 @@
 	}
 	if in.Plugins != nil {
 		in, out := &in.Plugins, &out.Plugins
-		*out = make([]ApisixRouteHTTPPlugin, len(*in))
+		*out = make([]ApisixRoutePlugin, len(*in))
 		for i := range *in {
 			(*in)[i].DeepCopyInto(&(*out)[i])
 		}
@@ -911,23 +911,6 @@
 }
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *ApisixRouteHTTPPlugin) DeepCopyInto(out *ApisixRouteHTTPPlugin) {
-	*out = *in
-	in.Config.DeepCopyInto(&out.Config)
-	return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteHTTPPlugin.
-func (in *ApisixRouteHTTPPlugin) DeepCopy() *ApisixRouteHTTPPlugin {
-	if in == nil {
-		return nil
-	}
-	out := new(ApisixRouteHTTPPlugin)
-	in.DeepCopyInto(out)
-	return out
-}
-
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *ApisixRouteList) DeepCopyInto(out *ApisixRouteList) {
 	*out = *in
 	out.TypeMeta = in.TypeMeta
@@ -961,6 +944,23 @@
 }
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ApisixRoutePlugin) DeepCopyInto(out *ApisixRoutePlugin) {
+	*out = *in
+	in.Config.DeepCopyInto(&out.Config)
+	return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRoutePlugin.
+func (in *ApisixRoutePlugin) DeepCopy() *ApisixRoutePlugin {
+	if in == nil {
+		return nil
+	}
+	out := new(ApisixRoutePlugin)
+	in.DeepCopyInto(out)
+	return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *ApisixRouteSpec) DeepCopyInto(out *ApisixRouteSpec) {
 	*out = *in
 	if in.HTTP != nil {
@@ -973,7 +973,9 @@
 	if in.Stream != nil {
 		in, out := &in.Stream, &out.Stream
 		*out = make([]ApisixRouteStream, len(*in))
-		copy(*out, *in)
+		for i := range *in {
+			(*in)[i].DeepCopyInto(&(*out)[i])
+		}
 	}
 	return
 }
@@ -993,6 +995,13 @@
 	*out = *in
 	out.Match = in.Match
 	out.Backend = in.Backend
+	if in.Plugins != nil {
+		in, out := &in.Plugins, &out.Plugins
+		*out = make([]ApisixRoutePlugin, len(*in))
+		for i := range *in {
+			(*in)[i].DeepCopyInto(&(*out)[i])
+		}
+	}
 	return
 }
 
diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go
index bf32f73..f91ea12 100644
--- a/pkg/providers/apisix/translation/apisix_route.go
+++ b/pkg/providers/apisix/translation/apisix_route.go
@@ -818,6 +818,20 @@
 			)
 			return err
 		}
+
+		// add stream route plugins
+		pluginMap := make(apisixv1.Plugins)
+		for _, plugin := range part.Plugins {
+			if !plugin.Enable {
+				continue
+			}
+			if plugin.Config != nil {
+				pluginMap[plugin.Name] = plugin.Config
+			} else {
+				pluginMap[plugin.Name] = make(map[string]interface{})
+			}
+		}
+
 		sr := apisixv1.NewDefaultStreamRoute()
 		name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
 		sr.ID = id.GenID(name)
@@ -827,6 +841,7 @@
 			return err
 		}
 		sr.UpstreamId = ups.ID
+		sr.Plugins = pluginMap
 		ctx.AddStreamRoute(sr)
 		if !ctx.CheckUpstreamExist(ups.Name) {
 			ctx.AddUpstream(ups)
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 8ffcd0d..ea326ad 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -372,6 +372,7 @@
 	SNI        string            `json:"sni,omitempty" yaml:"sni,omitempty"`
 	UpstreamId string            `json:"upstream_id,omitempty" yaml:"upstream_id,omitempty"`
 	Upstream   *Upstream         `json:"upstream,omitempty" yaml:"upstream,omitempty"`
+	Plugins    Plugins           `json:"plugins,omitempty" yaml:"plugins,omitempty"`
 }
 
 // GlobalRule represents the global_rule object in APISIX.
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 8da11e6..a353718 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -465,6 +465,7 @@
 		*out = new(Upstream)
 		(*in).DeepCopyInto(*out)
 	}
+	in.Plugins.DeepCopyInto(&out.Plugins)
 	return
 }
 
diff --git a/samples/deploy/crd/v1/ApisixRoute.yaml b/samples/deploy/crd/v1/ApisixRoute.yaml
index bccb647..cf0f8e5 100644
--- a/samples/deploy/crd/v1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1/ApisixRoute.yaml
@@ -821,6 +821,22 @@
                         required:
                           - serviceName
                           - servicePort
+                      plugins:
+                        type: array
+                        items:
+                          type: object
+                          properties:
+                            name:
+                              type: string
+                              minLength: 1
+                            enable:
+                              type: boolean
+                            config:
+                              type: object
+                              x-kubernetes-preserve-unknown-fields: true # we have to enable it since plugin config
+                        required:
+                          - name
+                          - enable
             status:
               type: object
               properties:
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index 2c7fef4..846ff78 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -5,6 +5,7 @@
 require (
 	github.com/apache/apisix-ingress-controller v0.0.0-20210105024109-72e53386de5a
 	github.com/apache/apisix-ingress-controller/test/e2e/testbackend v0.0.0
+	github.com/eclipse/paho.mqtt.golang v1.3.5
 	github.com/gavv/httpexpect/v2 v2.3.1
 	github.com/gorilla/websocket v1.5.0
 	github.com/gruntwork-io/terratest v0.40.19
diff --git a/test/e2e/go.sum b/test/e2e/go.sum
index deab836..287a6e5 100644
--- a/test/e2e/go.sum
+++ b/test/e2e/go.sum
@@ -101,6 +101,11 @@
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
+github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
+github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1 h1:yY9rWGoXv1U5pl4gxqlULARMQD7x0QG85lqEXTWysik=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
@@ -574,6 +579,7 @@
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index a8b97be..b11a33d 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -34,6 +34,7 @@
 	"time"
 
 	"github.com/apache/apisix-ingress-controller/pkg/config"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gavv/httpexpect/v2"
 	"github.com/gruntwork-io/terratest/modules/k8s"
 	"github.com/gruntwork-io/terratest/modules/testing"
@@ -283,6 +284,13 @@
 	})
 }
 
+func (s *Scaffold) NewMQTTClient() mqtt.Client {
+	opts := mqtt.NewClientOptions()
+	opts.AddBroker(fmt.Sprintf("tcp://%s", s.apisixTCPTunnel.Endpoint()))
+	client := mqtt.NewClient(opts)
+	return client
+}
+
 func (s *Scaffold) DNSResolver() *net.Resolver {
 	return &net.Resolver{
 		PreferGo: false,
diff --git a/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go b/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go
new file mode 100644
index 0000000..b906a28
--- /dev/null
+++ b/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package plugins
+
+import (
+	"time"
+
+	ginkgo "github.com/onsi/ginkgo/v2"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("suite-plugins-other: mqtt-proxy plugin", func() {
+	opts := &scaffold.Options{
+		Name:                  "mqtt-proxy",
+		IngressAPISIXReplicas: 1,
+		ApisixResourceVersion: scaffold.ApisixResourceVersion().V2,
+	}
+	s := scaffold.NewScaffold(opts)
+	// setup mosquito service
+	ginkgo.It("stream mqtt proxy", func() {
+		assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(`
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: mosquito
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: mosquito
+  template:
+    metadata:
+      labels:
+        app: mosquito
+    spec:
+      containers:
+      - name: mosquito
+        image: eclipse-mosquitto:1.6
+        livenessProbe:
+          tcpSocket:
+            port: 1883
+          initialDelaySeconds: 5
+          periodSeconds: 10
+        readinessProbe:
+          tcpSocket:
+            port: 1883
+          initialDelaySeconds: 5
+          periodSeconds: 10
+        ports:
+        - name: mosquito
+          containerPort: 1883
+          protocol: TCP
+`))
+		assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(`
+apiVersion: v1
+kind: Service
+metadata:
+  name: mosquito
+spec:
+  selector:
+    app: mosquito
+  type: ClusterIP
+  ports:
+  - port: 1883
+    targetPort: 1883
+    protocol: TCP
+`))
+		s.EnsureNumEndpointsReady(ginkgo.GinkgoT(), "mosquito", 1)
+		// setup Apisix Route for mqtt proxy
+		apisixRoute := `
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+  name: mqtt-route
+spec:
+  stream:
+  - name: rule1
+    protocol: TCP
+    match:
+      ingressPort: 9100
+    backend:
+      serviceName: mosquito
+      servicePort: 1883
+    plugins:
+    - name: mqtt-proxy
+      enable: true
+      config:
+        protocol_name: MQTT
+        protocol_level: 4
+`
+
+		assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(apisixRoute))
+
+		err := s.EnsureNumApisixStreamRoutesCreated(1)
+		assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+
+		sr, err := s.ListApisixStreamRoutes()
+		assert.Nil(ginkgo.GinkgoT(), err)
+		assert.Len(ginkgo.GinkgoT(), sr, 1)
+		assert.Equal(ginkgo.GinkgoT(), sr[0].ServerPort, int32(9100))
+		// test mqtt protocol
+		c := s.NewMQTTClient()
+		token := c.Connect()
+		token.WaitTimeout(3 * time.Second)
+		assert.Nil(ginkgo.GinkgoT(), token.Error(), "Checking mqtt connection")
+	})
+})