feature: provide more template context objects (#3)

diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml
index fb60450..7f2913f 100644
--- a/.github/workflows/build-and-test.yaml
+++ b/.github/workflows/build-and-test.yaml
@@ -43,7 +43,7 @@
         run: make test
 
       - name: Build
-        run: make build
+        run: make
 
   gateway:
     name: Gateway
diff --git a/assets/default-config.yaml b/assets/default-config.yaml
index 556ba43..fa8fa32 100644
--- a/assets/default-config.yaml
+++ b/assets/default-config.yaml
@@ -25,8 +25,8 @@
   skywalking:
     template:
       source:
-        service: ""
-        service-instance: ""
+        service: "{{ .Service.Name }}"
+        serviceInstance: "{{ .Pod.Name }}"
         endpoint: ""
-      message: ""
+      message: "{{ .Event.Message }}" # this is default, just to demonstrate the context
     address: "127.0.0.1:11800"
diff --git a/configs/config.go b/configs/config.go
index 4a7c262..13a69f4 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -22,10 +22,11 @@
 import (
 	"regexp"
 
-	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	"gopkg.in/yaml.v3"
 	v1 "k8s.io/api/core/v1"
 
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+
 	evnt "github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
 )
 
@@ -50,7 +51,7 @@
 }
 
 func (filter *FilterConfig) Init() {
-	logger.Log.Debugf("initalizing filter config")
+	logger.Log.Debugf("initializing filter config")
 
 	filter.reasonRegExp = regexp.MustCompile(filter.Reason)
 	filter.messageRegExp = regexp.MustCompile(filter.Message)
diff --git a/configs/config_test.go b/configs/config_test.go
index 2b9d467..288edf3 100644
--- a/configs/config_test.go
+++ b/configs/config_test.go
@@ -22,7 +22,7 @@
 import (
 	"testing"
 
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 )
 
 func TestFilterConfig_Filter(t *testing.T) {
diff --git a/pkg/event/event.go b/pkg/event/event.go
index 7720ce9..b461205 100644
--- a/pkg/event/event.go
+++ b/pkg/event/event.go
@@ -22,9 +22,9 @@
 import v1 "k8s.io/api/core/v1"
 
 type Source struct {
-	Service         string `json:"service"`
-	ServiceInstance string `json:"serviceInstance"`
-	Endpoint        string `json:"endpoint"`
+	Service         string `mapstructure:"service"`
+	ServiceInstance string `mapstructure:"serviceInstance"`
+	Endpoint        string `mapstructure:"endpoint"`
 }
 
 var Stopper = (*v1.Event)(nil)
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go
index 7ec53e3..bc3ba15 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/exporter.go
@@ -56,8 +56,8 @@
 }
 
 type EventTemplate struct {
-	Source          *event.Source `mapstructure:"source"`
-	sourceTemplate  *SourceTemplate
+	Source          event.Source `mapstructure:"source"`
+	sourceTemplate  SourceTemplate
 	Message         string `mapstructure:"message"`
 	messageTemplate *template.Template
 }
@@ -69,21 +69,19 @@
 		}
 	}
 
-	if tmplt.Source != nil {
-		if tmplt.Source.Service != "" {
-			if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(tmplt.Source.Service); err != nil {
-				return err
-			}
+	if t := tmplt.Source.Service; t != "" {
+		if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(t); err != nil {
+			return err
 		}
-		if tmplt.Source.ServiceInstance != "" {
-			if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(tmplt.Source.ServiceInstance); err != nil {
-				return err
-			}
+	}
+	if t := tmplt.Source.ServiceInstance; t != "" {
+		if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(t); err != nil {
+			return err
 		}
-		if tmplt.Source.Endpoint != "" {
-			if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(tmplt.Source.Endpoint); err != nil {
-				return err
-			}
+	}
+	if t := tmplt.Source.Endpoint; t != "" {
+		if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(t); err != nil {
+			return err
 		}
 	}
 
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index f713a14..015408b 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,10 +24,13 @@
 	"context"
 	"encoding/json"
 	"fmt"
+	"html/template"
 	"time"
 
 	sw "skywalking.apache.org/repo/goapi/collect/event/v3"
 
+	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
+
 	"google.golang.org/grpc"
 	k8score "k8s.io/api/core/v1"
 
@@ -135,9 +138,8 @@
 					EndTime:   kEvent.LastTimestamp.Unix() / 1000000,
 				}
 				if exporter.config.Template != nil {
-					if err := exporter.config.Template.Render(swEvent, kEvent); err != nil {
-						logger.Log.Warnf("failed to render the template, using the default event content. %+v", err)
-					}
+					exporter.config.Template.Render(swEvent, kEvent)
+					logger.Log.Debugf("rendered event is: %+v", swEvent)
 				}
 				if err := stream.Send(swEvent); err != nil {
 					logger.Log.Errorf("failed to send event to %+v. %+v", exporter.Name(), err)
@@ -147,55 +149,42 @@
 	}()
 }
 
-func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
-	var buf bytes.Buffer
+func (tmplt *EventTemplate) Render(swEvent *sw.Event, kEvent *k8score.Event) {
+	templateCtx := k8s.Registry.GetContext(kEvent)
 
-	// Render Event Message
-	if t := tmplt.messageTemplate; t != nil {
-		buf.Reset()
-		if err := t.Execute(&buf, data); err != nil {
+	logger.Log.Debugf("template context %+v", templateCtx)
+
+	render := func(t *template.Template, destination *string) error {
+		if t == nil {
+			return nil
+		}
+
+		var buf bytes.Buffer
+
+		if err := t.Execute(&buf, templateCtx); err != nil {
 			return err
 		}
+
 		if buf.Len() > 0 {
-			event.Message = buf.String()
+			*destination = buf.String()
 		}
+
+		return nil
 	}
 
-	// Render Event Source
-	if tmplt.sourceTemplate != nil {
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Service = buf.String()
-			}
-		}
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.ServiceInstance = buf.String()
-			}
-		}
-		// Render Event Source Endpoint
-		if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Endpoint = buf.String()
-			}
-		}
+	if err := render(tmplt.messageTemplate, &swEvent.Message); err != nil {
+		logger.Log.Debugf("failed to render the template, using the default event content. %+v", err)
 	}
 
-	return nil
+	if err := render(tmplt.sourceTemplate.serviceTemplate, &swEvent.Source.Service); err != nil {
+		logger.Log.Debugf("failed to render service template, using the default event content. %+v", err)
+	}
+	if err := render(tmplt.sourceTemplate.serviceInstanceTemplate, &swEvent.Source.ServiceInstance); err != nil {
+		logger.Log.Debugf("failed to render service instance template, using the default event content. %+v", err)
+	}
+	if err := render(tmplt.sourceTemplate.endpointTemplate, &swEvent.Source.Endpoint); err != nil {
+		logger.Log.Debugf("failed to render endpoin template, using the default event content. %+v", err)
+	}
 }
 
 func (exporter *SkyWalking) Stop() {
diff --git a/pkg/exporter/util.go b/pkg/exporter/util.go
index d02859e..b389f2d 100644
--- a/pkg/exporter/util.go
+++ b/pkg/exporter/util.go
@@ -20,9 +20,10 @@
 package exporter
 
 import (
+	k8score "k8s.io/api/core/v1"
+
 	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
-	k8score "k8s.io/api/core/v1"
 )
 
 func drain(events chan *k8score.Event) {
diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go
deleted file mode 100644
index e23e6fa..0000000
--- a/pkg/filter/filter.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package filter
-
-type Filter struct {
-	Reason   string `yaml:"reason"`
-	Message  string `yaml:"message"`
-	MinCount int32  `yaml:"min-count"`
-	Type     string `yaml:"type"`
-	Action   string `yaml:"action"`
-
-	Kind      string `yaml:"kind"`
-	Namespace string `yaml:"namespace"`
-	Name      string `yaml:"name"`
-
-	Exporter string `yaml:"exporter"`
-}
diff --git a/pkg/k8s/event.go b/pkg/k8s/event.go
index 4d54502..32a2b55 100644
--- a/pkg/k8s/event.go
+++ b/pkg/k8s/event.go
@@ -20,11 +20,12 @@
 package k8s
 
 import (
-	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/tools/cache"
+
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 )
 
 type EventWatcher struct {
diff --git a/pkg/k8s/registry.go b/pkg/k8s/registry.go
new file mode 100644
index 0000000..a1caea5
--- /dev/null
+++ b/pkg/k8s/registry.go
@@ -0,0 +1,176 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package k8s
+
+import (
+	"time"
+
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/cache"
+
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+)
+
+type id struct {
+	namespace string
+	name      string
+}
+
+type registry struct {
+	informers []cache.SharedIndexInformer
+	stopCh    chan struct{}
+
+	podIDIpMap map[id]string
+	idSvcMap   map[id]*corev1.Service
+	idPodMap   map[id]*corev1.Pod
+	ipSvcIDMap map[string]id
+}
+
+func (r registry) OnAdd(obj interface{}) {
+	switch o := obj.(type) {
+	case *corev1.Pod:
+		podID := id{namespace: o.Namespace, name: o.Name}
+		r.podIDIpMap[podID] = o.Status.PodIP
+		r.idPodMap[podID] = o
+	case *corev1.Service:
+		r.idSvcMap[id{namespace: o.Namespace, name: o.Name}] = o
+	case *corev1.Endpoints:
+		for _, subset := range o.Subsets {
+			for _, address := range subset.Addresses {
+				r.ipSvcIDMap[address.IP] = id{
+					namespace: o.ObjectMeta.Namespace,
+					name:      o.ObjectMeta.Name,
+				}
+			}
+		}
+	}
+}
+
+func (r registry) OnUpdate(oldObj, newObj interface{}) {
+	r.OnDelete(oldObj)
+	r.OnAdd(newObj)
+}
+
+func (r registry) OnDelete(obj interface{}) {
+	switch o := obj.(type) {
+	case *corev1.Pod:
+		podID := id{namespace: o.Namespace, name: o.Name}
+		go func() {
+			time.Sleep(3 * time.Second)
+			delete(r.podIDIpMap, podID)
+			delete(r.idPodMap, podID)
+		}()
+	case *corev1.Service:
+		go func() {
+			time.Sleep(3 * time.Second)
+			delete(r.idSvcMap, id{namespace: o.Namespace, name: o.Name})
+		}()
+	case *corev1.Endpoints:
+		go func() {
+			for _, subset := range o.Subsets {
+				for _, address := range subset.Addresses {
+					time.Sleep(3 * time.Second)
+					delete(r.ipSvcIDMap, address.IP)
+				}
+			}
+		}()
+	}
+}
+
+func (r *registry) Start() {
+	logger.Log.Debugf("starting registry")
+
+	for _, informer := range r.informers {
+		go informer.Run(r.stopCh)
+	}
+}
+
+func (r *registry) Stop() {
+	logger.Log.Debugf("stopping registry")
+
+	r.stopCh <- struct{}{}
+	close(r.stopCh)
+}
+
+type TemplateContext struct {
+	Service *corev1.Service
+	Pod     *corev1.Pod
+	Event   *corev1.Event
+}
+
+func (r *registry) GetContext(e *corev1.Event) TemplateContext {
+	result := TemplateContext{Event: e}
+
+	if obj := e.InvolvedObject; obj.Kind == "Pod" {
+		podID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		podIP := r.podIDIpMap[podID]
+		svcID := r.ipSvcIDMap[podIP]
+
+		result.Pod = r.idPodMap[podID]
+		result.Service = r.idSvcMap[svcID]
+	}
+
+	if obj := e.InvolvedObject; obj.Kind == "Service" {
+		svcID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		result.Service = r.idSvcMap[svcID]
+	}
+
+	return result
+}
+
+var Registry = &registry{
+	stopCh: make(chan struct{}),
+
+	podIDIpMap: make(map[id]string),
+	idSvcMap:   make(map[id]*corev1.Service),
+	idPodMap:   make(map[id]*corev1.Pod),
+	ipSvcIDMap: make(map[string]id),
+}
+
+func (r *registry) Init() error {
+	logger.Log.Debugf("initializing template context registry")
+
+	config, err := GetConfig()
+	if err != nil {
+		return err
+	}
+	client := kubernetes.NewForConfigOrDie(config)
+	factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(corev1.NamespaceAll))
+
+	r.informers = []cache.SharedIndexInformer{
+		factory.Core().V1().Endpoints().Informer(),
+		factory.Core().V1().Services().Informer(),
+		factory.Core().V1().Pods().Informer(),
+	}
+
+	for _, informer := range Registry.informers {
+		informer.AddEventHandler(Registry)
+	}
+
+	return nil
+}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 4b258e0..5f32c00 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -80,6 +80,10 @@
 		}
 	}
 
+	if err := k8s.Registry.Init(); err != nil {
+		return err
+	}
+
 	logger.Log.Debugf("pipe has been initialized")
 
 	return nil
@@ -88,6 +92,8 @@
 func (p *Pipe) Start() error {
 	p.Watcher.Start()
 
+	k8s.Registry.Start()
+
 	for _, wkfl := range p.workflows {
 		go wkfl.exporter.Export(wkfl.events)
 	}
@@ -120,6 +126,8 @@
 		w.exporter.Stop()
 	}
 
+	k8s.Registry.Stop()
+
 	p.stopper <- struct{}{}
 	close(p.stopper)
 }