feature: implement the real "filter" logics, and cache template and regexps (#2)

- Implement the real "filter" logics.
- Cache RegExp in the filter configs.
- Cache Template in the exporters.
diff --git a/assets/default-config.yaml b/assets/default-config.yaml
index 0038981..556ba43 100644
--- a/assets/default-config.yaml
+++ b/assets/default-config.yaml
@@ -17,8 +17,9 @@
 #
 
 filters:
-  - namespace: default
-    exporter: skywalking
+  - namespace: istio-system
+    exporters:
+      - skywalking
 
 exporters:
   skywalking:
diff --git a/cmd/start.go b/cmd/start.go
index b9a54c6..cf7e624 100644
--- a/cmd/start.go
+++ b/cmd/start.go
@@ -25,6 +25,7 @@
 	"syscall"
 
 	"github.com/spf13/cobra"
+	v1 "k8s.io/api/core/v1"
 	_ "k8s.io/client-go/plugin/pkg/client/auth"
 
 	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
@@ -39,7 +40,7 @@
 	Use:   "start",
 	Short: "Start skywalking-kubernetes-event-exporter",
 	RunE: func(cmd *cobra.Command, args []string) error {
-		watcher, err := k8s.WatchEvents("default")
+		watcher, err := k8s.WatchEvents(v1.NamespaceAll)
 		if err != nil {
 			return err
 		}
diff --git a/configs/config.go b/configs/config.go
index 1a93b2a..4a7c262 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -20,6 +20,9 @@
 package configs
 
 import (
+	"regexp"
+
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	"gopkg.in/yaml.v3"
 	v1 "k8s.io/api/core/v1"
 
@@ -27,23 +30,64 @@
 )
 
 type FilterConfig struct {
-	Reason   string `yaml:"reason"`
-	Message  string `yaml:"message"`
-	MinCount int32  `yaml:"min-count"`
-	Type     string `yaml:"type"`
-	Action   string `yaml:"action"`
+	Reason          string `yaml:"reason"`
+	reasonRegExp    *regexp.Regexp
+	Message         string `yaml:"message"`
+	messageRegExp   *regexp.Regexp
+	MinCount        int32  `yaml:"min-count"`
+	Type            string `yaml:"type"`
+	typeRegExp      *regexp.Regexp
+	Action          string `yaml:"action"`
+	actionRegExp    *regexp.Regexp
+	Kind            string `yaml:"kind"`
+	kindRegExp      *regexp.Regexp
+	Namespace       string `yaml:"namespace"`
+	namespaceRegExp *regexp.Regexp
+	Name            string `yaml:"name"`
+	nameRegExp      *regexp.Regexp
 
-	Kind      string `yaml:"kind"`
-	Namespace string `yaml:"namespace"`
-	Name      string `yaml:"name"`
-
-	Exporter string `yaml:"exporter"`
+	Exporters []string `yaml:"exporters"`
 }
 
+func (filter *FilterConfig) Init() {
+	logger.Log.Debugf("initalizing filter config")
+
+	filter.reasonRegExp = regexp.MustCompile(filter.Reason)
+	filter.messageRegExp = regexp.MustCompile(filter.Message)
+	filter.typeRegExp = regexp.MustCompile(filter.Type)
+	filter.actionRegExp = regexp.MustCompile(filter.Action)
+	filter.kindRegExp = regexp.MustCompile(filter.Kind)
+	filter.namespaceRegExp = regexp.MustCompile(filter.Namespace)
+	filter.nameRegExp = regexp.MustCompile(filter.Name)
+}
+
+// Filter the given event with this filter instance.
+// Return true if the event is filtered, return false otherwise.
 func (filter *FilterConfig) Filter(event *v1.Event) bool {
 	if event == evnt.Stopper {
 		return false
 	}
+	if filter.Reason != "" && !filter.reasonRegExp.MatchString(event.Reason) {
+		return true
+	}
+	if filter.Message != "" && !filter.messageRegExp.MatchString(event.Message) {
+		return true
+	}
+	if filter.Type != "" && !filter.typeRegExp.MatchString(event.Type) {
+		return true
+	}
+	if filter.Action != "" && !filter.actionRegExp.MatchString(event.Action) {
+		return true
+	}
+	if filter.Kind != "" && !filter.kindRegExp.MatchString(event.Kind) {
+		return true
+	}
+	if filter.Namespace != "" && !filter.namespaceRegExp.MatchString(event.Namespace) {
+		return true
+	}
+	if filter.Name != "" && !filter.nameRegExp.MatchString(event.Name) {
+		return true
+	}
 	return false
 }
 
diff --git a/configs/config_test.go b/configs/config_test.go
new file mode 100644
index 0000000..2b9d467
--- /dev/null
+++ b/configs/config_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package configs
+
+import (
+	"testing"
+
+	"k8s.io/api/core/v1"
+)
+
+func TestFilterConfig_Filter(t *testing.T) {
+	type fields struct {
+		Reason    string
+		Message   string
+		MinCount  int32
+		Type      string
+		Action    string
+		Kind      string
+		Namespace string
+		Name      string
+		Exporters []string
+	}
+	type args struct {
+		event *v1.Event
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+		want   bool
+	}{
+		{
+			name:   "filter reason exactly",
+			fields: fields{Reason: "Killed"},
+			args:   args{event: &v1.Event{Reason: "Killed"}},
+			want:   false,
+		},
+		{
+			name:   "filter reason by regexp",
+			fields: fields{Reason: "Killed|Killing"},
+			args:   args{event: &v1.Event{Reason: "Killing"}},
+			want:   false,
+		},
+		{
+			name:   "filter reason by regexp",
+			fields: fields{Reason: "Killed|Killing"},
+			args:   args{event: &v1.Event{Reason: "Started"}},
+			want:   true,
+		},
+
+		{
+			name:   "filter message by regexp",
+			fields: fields{Message: "Killing|Killed .*"},
+			args:   args{event: &v1.Event{Message: "Killing reviews"}},
+			want:   false,
+		},
+		{
+			name:   "filter message by regexp",
+			fields: fields{Message: "Killing|Killed .*"},
+			args:   args{event: &v1.Event{Message: "Killed reviews"}},
+			want:   false,
+		},
+		{
+			name:   "filter message by regexp",
+			fields: fields{Message: "Killing|Killed .*"},
+			args:   args{event: &v1.Event{Message: "Started reviews"}},
+			want:   true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			filter := &FilterConfig{
+				Reason:    tt.fields.Reason,
+				Message:   tt.fields.Message,
+				MinCount:  tt.fields.MinCount,
+				Type:      tt.fields.Type,
+				Action:    tt.fields.Action,
+				Kind:      tt.fields.Kind,
+				Namespace: tt.fields.Namespace,
+				Name:      tt.fields.Name,
+				Exporters: tt.fields.Exporters,
+			}
+			filter.Init()
+			if got := filter.Filter(tt.args.event); got != tt.want {
+				t.Errorf("Filter() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go
index a3f2284..7ec53e3 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/exporter.go
@@ -20,6 +20,8 @@
 package exporter
 
 import (
+	"html/template"
+
 	v1 "k8s.io/api/core/v1"
 
 	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
@@ -33,11 +35,6 @@
 	Stop()
 }
 
-type MessageTemplate struct {
-	Source  *event.Source `mapstructure:"source"`
-	Message string        `mapstructure:"message"`
-}
-
 var exporters = map[string]Exporter{}
 
 func RegisterExporter(name string, exporter Exporter) {
@@ -51,3 +48,44 @@
 func GetExporter(name string) Exporter {
 	return exporters[name]
 }
+
+type SourceTemplate struct {
+	serviceTemplate         *template.Template
+	serviceInstanceTemplate *template.Template
+	endpointTemplate        *template.Template
+}
+
+type EventTemplate struct {
+	Source          *event.Source `mapstructure:"source"`
+	sourceTemplate  *SourceTemplate
+	Message         string `mapstructure:"message"`
+	messageTemplate *template.Template
+}
+
+func (tmplt *EventTemplate) Init() (err error) {
+	if tmplt.Message != "" {
+		if tmplt.messageTemplate, err = template.New("EventMessageTemplate").Parse(tmplt.Message); err != nil {
+			return err
+		}
+	}
+
+	if tmplt.Source != nil {
+		if tmplt.Source.Service != "" {
+			if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(tmplt.Source.Service); err != nil {
+				return err
+			}
+		}
+		if tmplt.Source.ServiceInstance != "" {
+			if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(tmplt.Source.ServiceInstance); err != nil {
+				return err
+			}
+		}
+		if tmplt.Source.Endpoint != "" {
+			if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(tmplt.Source.Endpoint); err != nil {
+				return err
+			}
+		}
+	}
+
+	return err
+}
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index 75cabab..f713a14 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,7 +24,6 @@
 	"context"
 	"encoding/json"
 	"fmt"
-	"html/template"
 	"time"
 
 	sw "skywalking.apache.org/repo/goapi/collect/event/v3"
@@ -45,8 +44,8 @@
 }
 
 type SkyWalkingConfig struct {
-	Address  string           `mapstructure:"address"`
-	Template *MessageTemplate `mapstructure:"template"`
+	Address  string         `mapstructure:"address"`
+	Template *EventTemplate `mapstructure:"template"`
 }
 
 func init() {
@@ -67,6 +66,10 @@
 		return err
 	}
 
+	if err := config.Template.Init(); err != nil {
+		return err
+	}
+
 	conn, err := grpc.Dial(config.Address, grpc.WithInsecure())
 	if err != nil {
 		return err
@@ -85,16 +88,13 @@
 
 // TODO error handling
 func (exporter *SkyWalking) Export(events chan *k8score.Event) {
+	logger.Log.Debugf("exporting events into %+v", exporter.Name())
+
 	stream, err := exporter.client.Collect(context.Background())
 	for err != nil {
 		select {
 		case <-exporter.stopper:
-			logger.Log.Debugf("draining event channel")
-			for e := range events {
-				if e == event.Stopper {
-					break
-				}
-			}
+			drain(events)
 			return
 		default:
 			logger.Log.Errorf("failed to connect to SkyWalking server. %+v", err)
@@ -113,12 +113,7 @@
 		for {
 			select {
 			case <-exporter.stopper:
-				logger.Log.Debugf("draining event channel")
-				for e := range events {
-					if e == event.Stopper {
-						break
-					}
-				}
+				drain(events)
 				return
 			case kEvent := <-events:
 				if kEvent == event.Stopper {
@@ -152,16 +147,12 @@
 	}()
 }
 
-func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error {
+func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
 	var buf bytes.Buffer
 
 	// Render Event Message
-	if tmplt.Message != "" {
+	if t := tmplt.messageTemplate; t != nil {
 		buf.Reset()
-		t, err := template.New("EventMsg").Parse(tmplt.Message)
-		if err != nil {
-			return err
-		}
 		if err := t.Execute(&buf, data); err != nil {
 			return err
 		}
@@ -171,14 +162,10 @@
 	}
 
 	// Render Event Source
-	if tmplt.Source != nil {
+	if tmplt.sourceTemplate != nil {
 		// Render Event Source Service
-		if tmplt.Source.Service != "" {
+		if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
 			buf.Reset()
-			t, err := template.New("EventSourceService").Parse(tmplt.Source.Service)
-			if err != nil {
-				return err
-			}
 			if err := t.Execute(&buf, data); err != nil {
 				return err
 			}
@@ -187,12 +174,8 @@
 			}
 		}
 		// Render Event Source Service
-		if tmplt.Source.ServiceInstance != "" {
+		if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
 			buf.Reset()
-			t, err := template.New("EventSourceServiceInstance").Parse(tmplt.Source.ServiceInstance)
-			if err != nil {
-				return err
-			}
 			if err := t.Execute(&buf, data); err != nil {
 				return err
 			}
@@ -201,12 +184,8 @@
 			}
 		}
 		// Render Event Source Endpoint
-		if tmplt.Source.Endpoint != "" {
+		if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
 			buf.Reset()
-			t, err := template.New("EventSourceEndpoint").Parse(tmplt.Source.Endpoint)
-			if err != nil {
-				return err
-			}
 			if err := t.Execute(&buf, data); err != nil {
 				return err
 			}
diff --git a/pkg/exporter/util.go b/pkg/exporter/util.go
new file mode 100644
index 0000000..d02859e
--- /dev/null
+++ b/pkg/exporter/util.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package exporter
+
+import (
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
+	k8score "k8s.io/api/core/v1"
+)
+
+func drain(events chan *k8score.Event) {
+	logger.Log.Debugf("draining event channel")
+	for e := range events {
+		if e == event.Stopper {
+			break
+		}
+	}
+}
diff --git a/pkg/k8s/event.go b/pkg/k8s/event.go
index 47b7f2c..4d54502 100644
--- a/pkg/k8s/event.go
+++ b/pkg/k8s/event.go
@@ -20,6 +20,7 @@
 package k8s
 
 import (
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
@@ -44,10 +45,14 @@
 }
 
 func (w EventWatcher) Start() {
+	logger.Log.Debugf("starting event watcher")
+
 	go w.informer.Run(w.stopCh)
 }
 
 func (w EventWatcher) Stop() {
+	logger.Log.Debugf("stopping event watcher")
+
 	w.stopCh <- struct{}{}
 	close(w.stopCh)
 }
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 8ff7d02..4b258e0 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -44,35 +44,44 @@
 }
 
 func (p *Pipe) Init() error {
+	logger.Log.Debugf("initializing pipe")
+
 	p.stopper = make(chan struct{})
 	p.workflows = []workflow{}
 
 	initialized := map[string]bool{}
 	for _, filter := range configs.GlobalConfig.Filters {
-		if _, ok := configs.GlobalConfig.Exporters[filter.Exporter]; !ok {
-			return fmt.Errorf("exporter %v is not defined", filter.Exporter)
-		}
-		exporter := exp.GetExporter(filter.Exporter)
-		if exporter == nil {
-			return fmt.Errorf("exporter %v is not defined", filter.Exporter)
-		}
-		if initialized[filter.Exporter] {
-			continue
-		}
-		if err := exporter.Init(); err != nil {
-			return err
-		}
-		initialized[filter.Exporter] = true
+		filter.Init()
 
-		events := make(chan *v1.Event)
+		for _, name := range filter.Exporters {
+			if _, ok := configs.GlobalConfig.Exporters[name]; !ok {
+				return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+			}
+			exporter := exp.GetExporter(name)
+			if exporter == nil {
+				return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+			}
+			if initialized[name] {
+				logger.Log.Debugf("exporter %+v has been initialized, skip", name)
+				continue
+			}
+			if err := exporter.Init(); err != nil {
+				return err
+			}
+			initialized[name] = true
 
-		p.workflows = append(p.workflows, workflow{
-			filter:   filter,
-			exporter: exporter,
-			events:   events,
-		})
+			events := make(chan *v1.Event)
+
+			p.workflows = append(p.workflows, workflow{
+				filter:   filter,
+				exporter: exporter,
+				events:   events,
+			})
+		}
 	}
 
+	logger.Log.Debugf("pipe has been initialized")
+
 	return nil
 }