feature: implement the real "filter" logics, and cache template and regexps (#2)
- Implement the real "filter" logics.
- Cache RegExp in the filter configs.
- Cache Template in the exporters.
diff --git a/assets/default-config.yaml b/assets/default-config.yaml
index 0038981..556ba43 100644
--- a/assets/default-config.yaml
+++ b/assets/default-config.yaml
@@ -17,8 +17,9 @@
#
filters:
- - namespace: default
- exporter: skywalking
+ - namespace: istio-system
+ exporters:
+ - skywalking
exporters:
skywalking:
diff --git a/cmd/start.go b/cmd/start.go
index b9a54c6..cf7e624 100644
--- a/cmd/start.go
+++ b/cmd/start.go
@@ -25,6 +25,7 @@
"syscall"
"github.com/spf13/cobra"
+ v1 "k8s.io/api/core/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
@@ -39,7 +40,7 @@
Use: "start",
Short: "Start skywalking-kubernetes-event-exporter",
RunE: func(cmd *cobra.Command, args []string) error {
- watcher, err := k8s.WatchEvents("default")
+ watcher, err := k8s.WatchEvents(v1.NamespaceAll)
if err != nil {
return err
}
diff --git a/configs/config.go b/configs/config.go
index 1a93b2a..4a7c262 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -20,6 +20,9 @@
package configs
import (
+ "regexp"
+
+ "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
@@ -27,23 +30,64 @@
)
type FilterConfig struct {
- Reason string `yaml:"reason"`
- Message string `yaml:"message"`
- MinCount int32 `yaml:"min-count"`
- Type string `yaml:"type"`
- Action string `yaml:"action"`
+ Reason string `yaml:"reason"`
+ reasonRegExp *regexp.Regexp
+ Message string `yaml:"message"`
+ messageRegExp *regexp.Regexp
+ MinCount int32 `yaml:"min-count"`
+ Type string `yaml:"type"`
+ typeRegExp *regexp.Regexp
+ Action string `yaml:"action"`
+ actionRegExp *regexp.Regexp
+ Kind string `yaml:"kind"`
+ kindRegExp *regexp.Regexp
+ Namespace string `yaml:"namespace"`
+ namespaceRegExp *regexp.Regexp
+ Name string `yaml:"name"`
+ nameRegExp *regexp.Regexp
- Kind string `yaml:"kind"`
- Namespace string `yaml:"namespace"`
- Name string `yaml:"name"`
-
- Exporter string `yaml:"exporter"`
+ Exporters []string `yaml:"exporters"`
}
+func (filter *FilterConfig) Init() {
+ logger.Log.Debugf("initalizing filter config")
+
+ filter.reasonRegExp = regexp.MustCompile(filter.Reason)
+ filter.messageRegExp = regexp.MustCompile(filter.Message)
+ filter.typeRegExp = regexp.MustCompile(filter.Type)
+ filter.actionRegExp = regexp.MustCompile(filter.Action)
+ filter.kindRegExp = regexp.MustCompile(filter.Kind)
+ filter.namespaceRegExp = regexp.MustCompile(filter.Namespace)
+ filter.nameRegExp = regexp.MustCompile(filter.Name)
+}
+
+// Filter the given event with this filter instance.
+// Return true if the event is filtered, return false otherwise.
func (filter *FilterConfig) Filter(event *v1.Event) bool {
if event == evnt.Stopper {
return false
}
+ if filter.Reason != "" && !filter.reasonRegExp.MatchString(event.Reason) {
+ return true
+ }
+ if filter.Message != "" && !filter.messageRegExp.MatchString(event.Message) {
+ return true
+ }
+ if filter.Type != "" && !filter.typeRegExp.MatchString(event.Type) {
+ return true
+ }
+ if filter.Action != "" && !filter.actionRegExp.MatchString(event.Action) {
+ return true
+ }
+ if filter.Kind != "" && !filter.kindRegExp.MatchString(event.Kind) {
+ return true
+ }
+ if filter.Namespace != "" && !filter.namespaceRegExp.MatchString(event.Namespace) {
+ return true
+ }
+ if filter.Name != "" && !filter.nameRegExp.MatchString(event.Name) {
+ return true
+ }
return false
}
diff --git a/configs/config_test.go b/configs/config_test.go
new file mode 100644
index 0000000..2b9d467
--- /dev/null
+++ b/configs/config_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package configs
+
+import (
+ "testing"
+
+ "k8s.io/api/core/v1"
+)
+
+func TestFilterConfig_Filter(t *testing.T) {
+ type fields struct {
+ Reason string
+ Message string
+ MinCount int32
+ Type string
+ Action string
+ Kind string
+ Namespace string
+ Name string
+ Exporters []string
+ }
+ type args struct {
+ event *v1.Event
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ want bool
+ }{
+ {
+ name: "filter reason exactly",
+ fields: fields{Reason: "Killed"},
+ args: args{event: &v1.Event{Reason: "Killed"}},
+ want: false,
+ },
+ {
+ name: "filter reason by regexp",
+ fields: fields{Reason: "Killed|Killing"},
+ args: args{event: &v1.Event{Reason: "Killing"}},
+ want: false,
+ },
+ {
+ name: "filter reason by regexp",
+ fields: fields{Reason: "Killed|Killing"},
+ args: args{event: &v1.Event{Reason: "Started"}},
+ want: true,
+ },
+
+ {
+ name: "filter message by regexp",
+ fields: fields{Message: "Killing|Killed .*"},
+ args: args{event: &v1.Event{Message: "Killing reviews"}},
+ want: false,
+ },
+ {
+ name: "filter message by regexp",
+ fields: fields{Message: "Killing|Killed .*"},
+ args: args{event: &v1.Event{Message: "Killed reviews"}},
+ want: false,
+ },
+ {
+ name: "filter message by regexp",
+ fields: fields{Message: "Killing|Killed .*"},
+ args: args{event: &v1.Event{Message: "Started reviews"}},
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ filter := &FilterConfig{
+ Reason: tt.fields.Reason,
+ Message: tt.fields.Message,
+ MinCount: tt.fields.MinCount,
+ Type: tt.fields.Type,
+ Action: tt.fields.Action,
+ Kind: tt.fields.Kind,
+ Namespace: tt.fields.Namespace,
+ Name: tt.fields.Name,
+ Exporters: tt.fields.Exporters,
+ }
+ filter.Init()
+ if got := filter.Filter(tt.args.event); got != tt.want {
+ t.Errorf("Filter() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go
index a3f2284..7ec53e3 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/exporter.go
@@ -20,6 +20,8 @@
package exporter
import (
+ "html/template"
+
v1 "k8s.io/api/core/v1"
"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
@@ -33,11 +35,6 @@
Stop()
}
-type MessageTemplate struct {
- Source *event.Source `mapstructure:"source"`
- Message string `mapstructure:"message"`
-}
-
var exporters = map[string]Exporter{}
func RegisterExporter(name string, exporter Exporter) {
@@ -51,3 +48,44 @@
func GetExporter(name string) Exporter {
return exporters[name]
}
+
+type SourceTemplate struct {
+ serviceTemplate *template.Template
+ serviceInstanceTemplate *template.Template
+ endpointTemplate *template.Template
+}
+
+type EventTemplate struct {
+ Source *event.Source `mapstructure:"source"`
+ sourceTemplate *SourceTemplate
+ Message string `mapstructure:"message"`
+ messageTemplate *template.Template
+}
+
+func (tmplt *EventTemplate) Init() (err error) {
+ if tmplt.Message != "" {
+ if tmplt.messageTemplate, err = template.New("EventMessageTemplate").Parse(tmplt.Message); err != nil {
+ return err
+ }
+ }
+
+ if tmplt.Source != nil {
+ if tmplt.Source.Service != "" {
+ if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(tmplt.Source.Service); err != nil {
+ return err
+ }
+ }
+ if tmplt.Source.ServiceInstance != "" {
+ if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(tmplt.Source.ServiceInstance); err != nil {
+ return err
+ }
+ }
+ if tmplt.Source.Endpoint != "" {
+ if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(tmplt.Source.Endpoint); err != nil {
+ return err
+ }
+ }
+ }
+
+ return err
+}
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index 75cabab..f713a14 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,7 +24,6 @@
"context"
"encoding/json"
"fmt"
- "html/template"
"time"
sw "skywalking.apache.org/repo/goapi/collect/event/v3"
@@ -45,8 +44,8 @@
}
type SkyWalkingConfig struct {
- Address string `mapstructure:"address"`
- Template *MessageTemplate `mapstructure:"template"`
+ Address string `mapstructure:"address"`
+ Template *EventTemplate `mapstructure:"template"`
}
func init() {
@@ -67,6 +66,10 @@
return err
}
+ if err := config.Template.Init(); err != nil {
+ return err
+ }
+
conn, err := grpc.Dial(config.Address, grpc.WithInsecure())
if err != nil {
return err
@@ -85,16 +88,13 @@
// TODO error handling
func (exporter *SkyWalking) Export(events chan *k8score.Event) {
+ logger.Log.Debugf("exporting events into %+v", exporter.Name())
+
stream, err := exporter.client.Collect(context.Background())
for err != nil {
select {
case <-exporter.stopper:
- logger.Log.Debugf("draining event channel")
- for e := range events {
- if e == event.Stopper {
- break
- }
- }
+ drain(events)
return
default:
logger.Log.Errorf("failed to connect to SkyWalking server. %+v", err)
@@ -113,12 +113,7 @@
for {
select {
case <-exporter.stopper:
- logger.Log.Debugf("draining event channel")
- for e := range events {
- if e == event.Stopper {
- break
- }
- }
+ drain(events)
return
case kEvent := <-events:
if kEvent == event.Stopper {
@@ -152,16 +147,12 @@
}()
}
-func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error {
+func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
var buf bytes.Buffer
// Render Event Message
- if tmplt.Message != "" {
+ if t := tmplt.messageTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventMsg").Parse(tmplt.Message)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
@@ -171,14 +162,10 @@
}
// Render Event Source
- if tmplt.Source != nil {
+ if tmplt.sourceTemplate != nil {
// Render Event Source Service
- if tmplt.Source.Service != "" {
+ if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventSourceService").Parse(tmplt.Source.Service)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
@@ -187,12 +174,8 @@
}
}
// Render Event Source Service
- if tmplt.Source.ServiceInstance != "" {
+ if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventSourceServiceInstance").Parse(tmplt.Source.ServiceInstance)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
@@ -201,12 +184,8 @@
}
}
// Render Event Source Endpoint
- if tmplt.Source.Endpoint != "" {
+ if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventSourceEndpoint").Parse(tmplt.Source.Endpoint)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
diff --git a/pkg/exporter/util.go b/pkg/exporter/util.go
new file mode 100644
index 0000000..d02859e
--- /dev/null
+++ b/pkg/exporter/util.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package exporter
+
+import (
+ "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+ "github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
+ k8score "k8s.io/api/core/v1"
+)
+
+func drain(events chan *k8score.Event) {
+ logger.Log.Debugf("draining event channel")
+ for e := range events {
+ if e == event.Stopper {
+ break
+ }
+ }
+}
diff --git a/pkg/k8s/event.go b/pkg/k8s/event.go
index 47b7f2c..4d54502 100644
--- a/pkg/k8s/event.go
+++ b/pkg/k8s/event.go
@@ -20,6 +20,7 @@
package k8s
import (
+ "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
@@ -44,10 +45,14 @@
}
func (w EventWatcher) Start() {
+ logger.Log.Debugf("starting event watcher")
+
go w.informer.Run(w.stopCh)
}
func (w EventWatcher) Stop() {
+ logger.Log.Debugf("stopping event watcher")
+
w.stopCh <- struct{}{}
close(w.stopCh)
}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 8ff7d02..4b258e0 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -44,35 +44,44 @@
}
func (p *Pipe) Init() error {
+ logger.Log.Debugf("initializing pipe")
+
p.stopper = make(chan struct{})
p.workflows = []workflow{}
initialized := map[string]bool{}
for _, filter := range configs.GlobalConfig.Filters {
- if _, ok := configs.GlobalConfig.Exporters[filter.Exporter]; !ok {
- return fmt.Errorf("exporter %v is not defined", filter.Exporter)
- }
- exporter := exp.GetExporter(filter.Exporter)
- if exporter == nil {
- return fmt.Errorf("exporter %v is not defined", filter.Exporter)
- }
- if initialized[filter.Exporter] {
- continue
- }
- if err := exporter.Init(); err != nil {
- return err
- }
- initialized[filter.Exporter] = true
+ filter.Init()
- events := make(chan *v1.Event)
+ for _, name := range filter.Exporters {
+ if _, ok := configs.GlobalConfig.Exporters[name]; !ok {
+ return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+ }
+ exporter := exp.GetExporter(name)
+ if exporter == nil {
+ return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+ }
+ if initialized[name] {
+ logger.Log.Debugf("exporter %+v has been initialized, skip", name)
+ continue
+ }
+ if err := exporter.Init(); err != nil {
+ return err
+ }
+ initialized[name] = true
- p.workflows = append(p.workflows, workflow{
- filter: filter,
- exporter: exporter,
- events: events,
- })
+ events := make(chan *v1.Event)
+
+ p.workflows = append(p.workflows, workflow{
+ filter: filter,
+ exporter: exporter,
+ events: events,
+ })
+ }
}
+ logger.Log.Debugf("pipe has been initialized")
+
return nil
}