feature: provide more template context objects (#3)
diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml
index fb60450..7f2913f 100644
--- a/.github/workflows/build-and-test.yaml
+++ b/.github/workflows/build-and-test.yaml
@@ -43,7 +43,7 @@
run: make test
- name: Build
- run: make build
+ run: make
gateway:
name: Gateway
diff --git a/assets/default-config.yaml b/assets/default-config.yaml
index 556ba43..fa8fa32 100644
--- a/assets/default-config.yaml
+++ b/assets/default-config.yaml
@@ -25,8 +25,8 @@
skywalking:
template:
source:
- service: ""
- service-instance: ""
+ service: "{{ .Service.Name }}"
+ serviceInstance: "{{ .Pod.Name }}"
endpoint: ""
- message: ""
+ message: "{{ .Event.Message }}" # this is default, just to demonstrate the context
address: "127.0.0.1:11800"
diff --git a/configs/config.go b/configs/config.go
index 4a7c262..13a69f4 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -22,10 +22,11 @@
import (
"regexp"
- "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
+ "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+
evnt "github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
)
@@ -50,7 +51,7 @@
}
func (filter *FilterConfig) Init() {
- logger.Log.Debugf("initalizing filter config")
+ logger.Log.Debugf("initializing filter config")
filter.reasonRegExp = regexp.MustCompile(filter.Reason)
filter.messageRegExp = regexp.MustCompile(filter.Message)
diff --git a/configs/config_test.go b/configs/config_test.go
index 2b9d467..288edf3 100644
--- a/configs/config_test.go
+++ b/configs/config_test.go
@@ -22,7 +22,7 @@
import (
"testing"
- "k8s.io/api/core/v1"
+ v1 "k8s.io/api/core/v1"
)
func TestFilterConfig_Filter(t *testing.T) {
diff --git a/pkg/event/event.go b/pkg/event/event.go
index 7720ce9..b461205 100644
--- a/pkg/event/event.go
+++ b/pkg/event/event.go
@@ -22,9 +22,9 @@
import v1 "k8s.io/api/core/v1"
type Source struct {
- Service string `json:"service"`
- ServiceInstance string `json:"serviceInstance"`
- Endpoint string `json:"endpoint"`
+ Service string `mapstructure:"service"`
+ ServiceInstance string `mapstructure:"serviceInstance"`
+ Endpoint string `mapstructure:"endpoint"`
}
var Stopper = (*v1.Event)(nil)
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go
index 7ec53e3..bc3ba15 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/exporter.go
@@ -56,8 +56,8 @@
}
type EventTemplate struct {
- Source *event.Source `mapstructure:"source"`
- sourceTemplate *SourceTemplate
+ Source event.Source `mapstructure:"source"`
+ sourceTemplate SourceTemplate
Message string `mapstructure:"message"`
messageTemplate *template.Template
}
@@ -69,21 +69,19 @@
}
}
- if tmplt.Source != nil {
- if tmplt.Source.Service != "" {
- if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(tmplt.Source.Service); err != nil {
- return err
- }
+ if t := tmplt.Source.Service; t != "" {
+ if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(t); err != nil {
+ return err
}
- if tmplt.Source.ServiceInstance != "" {
- if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(tmplt.Source.ServiceInstance); err != nil {
- return err
- }
+ }
+ if t := tmplt.Source.ServiceInstance; t != "" {
+ if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(t); err != nil {
+ return err
}
- if tmplt.Source.Endpoint != "" {
- if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(tmplt.Source.Endpoint); err != nil {
- return err
- }
+ }
+ if t := tmplt.Source.Endpoint; t != "" {
+ if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(t); err != nil {
+ return err
}
}
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index f713a14..015408b 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,10 +24,13 @@
"context"
"encoding/json"
"fmt"
+ "html/template"
"time"
sw "skywalking.apache.org/repo/goapi/collect/event/v3"
+ "github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
+
"google.golang.org/grpc"
k8score "k8s.io/api/core/v1"
@@ -135,9 +138,8 @@
EndTime: kEvent.LastTimestamp.Unix() / 1000000,
}
if exporter.config.Template != nil {
- if err := exporter.config.Template.Render(swEvent, kEvent); err != nil {
- logger.Log.Warnf("failed to render the template, using the default event content. %+v", err)
- }
+ exporter.config.Template.Render(swEvent, kEvent)
+ logger.Log.Debugf("rendered event is: %+v", swEvent)
}
if err := stream.Send(swEvent); err != nil {
logger.Log.Errorf("failed to send event to %+v. %+v", exporter.Name(), err)
@@ -147,55 +149,42 @@
}()
}
-func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
- var buf bytes.Buffer
+func (tmplt *EventTemplate) Render(swEvent *sw.Event, kEvent *k8score.Event) {
+ templateCtx := k8s.Registry.GetContext(kEvent)
- // Render Event Message
- if t := tmplt.messageTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
+ logger.Log.Debugf("template context %+v", templateCtx)
+
+ render := func(t *template.Template, destination *string) error {
+ if t == nil {
+ return nil
+ }
+
+ var buf bytes.Buffer
+
+ if err := t.Execute(&buf, templateCtx); err != nil {
return err
}
+
if buf.Len() > 0 {
- event.Message = buf.String()
+ *destination = buf.String()
}
+
+ return nil
}
- // Render Event Source
- if tmplt.sourceTemplate != nil {
- // Render Event Source Service
- if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
- return err
- }
- if buf.Len() > 0 {
- event.Source.Service = buf.String()
- }
- }
- // Render Event Source Service
- if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
- return err
- }
- if buf.Len() > 0 {
- event.Source.ServiceInstance = buf.String()
- }
- }
- // Render Event Source Endpoint
- if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
- return err
- }
- if buf.Len() > 0 {
- event.Source.Endpoint = buf.String()
- }
- }
+ if err := render(tmplt.messageTemplate, &swEvent.Message); err != nil {
+ logger.Log.Debugf("failed to render the template, using the default event content. %+v", err)
}
- return nil
+ if err := render(tmplt.sourceTemplate.serviceTemplate, &swEvent.Source.Service); err != nil {
+ logger.Log.Debugf("failed to render service template, using the default event content. %+v", err)
+ }
+ if err := render(tmplt.sourceTemplate.serviceInstanceTemplate, &swEvent.Source.ServiceInstance); err != nil {
+ logger.Log.Debugf("failed to render service instance template, using the default event content. %+v", err)
+ }
+ if err := render(tmplt.sourceTemplate.endpointTemplate, &swEvent.Source.Endpoint); err != nil {
+ logger.Log.Debugf("failed to render endpoin template, using the default event content. %+v", err)
+ }
}
func (exporter *SkyWalking) Stop() {
diff --git a/pkg/exporter/util.go b/pkg/exporter/util.go
index d02859e..b389f2d 100644
--- a/pkg/exporter/util.go
+++ b/pkg/exporter/util.go
@@ -20,9 +20,10 @@
package exporter
import (
+ k8score "k8s.io/api/core/v1"
+
"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
"github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
- k8score "k8s.io/api/core/v1"
)
func drain(events chan *k8score.Event) {
diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go
deleted file mode 100644
index e23e6fa..0000000
--- a/pkg/filter/filter.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package filter
-
-type Filter struct {
- Reason string `yaml:"reason"`
- Message string `yaml:"message"`
- MinCount int32 `yaml:"min-count"`
- Type string `yaml:"type"`
- Action string `yaml:"action"`
-
- Kind string `yaml:"kind"`
- Namespace string `yaml:"namespace"`
- Name string `yaml:"name"`
-
- Exporter string `yaml:"exporter"`
-}
diff --git a/pkg/k8s/event.go b/pkg/k8s/event.go
index 4d54502..32a2b55 100644
--- a/pkg/k8s/event.go
+++ b/pkg/k8s/event.go
@@ -20,11 +20,12 @@
package k8s
import (
- "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
+
+ "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
)
type EventWatcher struct {
diff --git a/pkg/k8s/registry.go b/pkg/k8s/registry.go
new file mode 100644
index 0000000..a1caea5
--- /dev/null
+++ b/pkg/k8s/registry.go
@@ -0,0 +1,176 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package k8s
+
+import (
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+
+ "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+)
+
+type id struct {
+ namespace string
+ name string
+}
+
+type registry struct {
+ informers []cache.SharedIndexInformer
+ stopCh chan struct{}
+
+ podIDIpMap map[id]string
+ idSvcMap map[id]*corev1.Service
+ idPodMap map[id]*corev1.Pod
+ ipSvcIDMap map[string]id
+}
+
+func (r registry) OnAdd(obj interface{}) {
+ switch o := obj.(type) {
+ case *corev1.Pod:
+ podID := id{namespace: o.Namespace, name: o.Name}
+ r.podIDIpMap[podID] = o.Status.PodIP
+ r.idPodMap[podID] = o
+ case *corev1.Service:
+ r.idSvcMap[id{namespace: o.Namespace, name: o.Name}] = o
+ case *corev1.Endpoints:
+ for _, subset := range o.Subsets {
+ for _, address := range subset.Addresses {
+ r.ipSvcIDMap[address.IP] = id{
+ namespace: o.ObjectMeta.Namespace,
+ name: o.ObjectMeta.Name,
+ }
+ }
+ }
+ }
+}
+
+func (r registry) OnUpdate(oldObj, newObj interface{}) {
+ r.OnDelete(oldObj)
+ r.OnAdd(newObj)
+}
+
+func (r registry) OnDelete(obj interface{}) {
+ switch o := obj.(type) {
+ case *corev1.Pod:
+ podID := id{namespace: o.Namespace, name: o.Name}
+ go func() {
+ time.Sleep(3 * time.Second)
+ delete(r.podIDIpMap, podID)
+ delete(r.idPodMap, podID)
+ }()
+ case *corev1.Service:
+ go func() {
+ time.Sleep(3 * time.Second)
+ delete(r.idSvcMap, id{namespace: o.Namespace, name: o.Name})
+ }()
+ case *corev1.Endpoints:
+ go func() {
+ for _, subset := range o.Subsets {
+ for _, address := range subset.Addresses {
+ time.Sleep(3 * time.Second)
+ delete(r.ipSvcIDMap, address.IP)
+ }
+ }
+ }()
+ }
+}
+
+func (r *registry) Start() {
+ logger.Log.Debugf("starting registry")
+
+ for _, informer := range r.informers {
+ go informer.Run(r.stopCh)
+ }
+}
+
+func (r *registry) Stop() {
+ logger.Log.Debugf("stopping registry")
+
+ r.stopCh <- struct{}{}
+ close(r.stopCh)
+}
+
+type TemplateContext struct {
+ Service *corev1.Service
+ Pod *corev1.Pod
+ Event *corev1.Event
+}
+
+func (r *registry) GetContext(e *corev1.Event) TemplateContext {
+ result := TemplateContext{Event: e}
+
+ if obj := e.InvolvedObject; obj.Kind == "Pod" {
+ podID := id{
+ namespace: obj.Namespace,
+ name: obj.Name,
+ }
+ podIP := r.podIDIpMap[podID]
+ svcID := r.ipSvcIDMap[podIP]
+
+ result.Pod = r.idPodMap[podID]
+ result.Service = r.idSvcMap[svcID]
+ }
+
+ if obj := e.InvolvedObject; obj.Kind == "Service" {
+ svcID := id{
+ namespace: obj.Namespace,
+ name: obj.Name,
+ }
+ result.Service = r.idSvcMap[svcID]
+ }
+
+ return result
+}
+
+var Registry = ®istry{
+ stopCh: make(chan struct{}),
+
+ podIDIpMap: make(map[id]string),
+ idSvcMap: make(map[id]*corev1.Service),
+ idPodMap: make(map[id]*corev1.Pod),
+ ipSvcIDMap: make(map[string]id),
+}
+
+func (r *registry) Init() error {
+ logger.Log.Debugf("initializing template context registry")
+
+ config, err := GetConfig()
+ if err != nil {
+ return err
+ }
+ client := kubernetes.NewForConfigOrDie(config)
+ factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(corev1.NamespaceAll))
+
+ r.informers = []cache.SharedIndexInformer{
+ factory.Core().V1().Endpoints().Informer(),
+ factory.Core().V1().Services().Informer(),
+ factory.Core().V1().Pods().Informer(),
+ }
+
+ for _, informer := range Registry.informers {
+ informer.AddEventHandler(Registry)
+ }
+
+ return nil
+}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 4b258e0..5f32c00 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -80,6 +80,10 @@
}
}
+ if err := k8s.Registry.Init(); err != nil {
+ return err
+ }
+
logger.Log.Debugf("pipe has been initialized")
return nil
@@ -88,6 +92,8 @@
func (p *Pipe) Start() error {
p.Watcher.Start()
+ k8s.Registry.Start()
+
for _, wkfl := range p.workflows {
go wkfl.exporter.Export(wkfl.events)
}
@@ -120,6 +126,8 @@
w.exporter.Stop()
}
+ k8s.Registry.Stop()
+
p.stopper <- struct{}{}
close(p.stopper)
}