| /* |
| Copyright 2014 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package record |
| |
| import ( |
| "fmt" |
| "math/rand" |
| "time" |
| |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/util/clock" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/watch" |
| restclient "k8s.io/client-go/rest" |
| "k8s.io/client-go/tools/record/util" |
| ref "k8s.io/client-go/tools/reference" |
| "k8s.io/klog" |
| ) |
| |
| const maxTriesPerEvent = 12 |
| |
| var defaultSleepDuration = 10 * time.Second |
| |
| const maxQueuedEvents = 1000 |
| |
| // EventSink knows how to store events (client.Client implements it.) |
| // EventSink must respect the namespace that will be embedded in 'event'. |
| // It is assumed that EventSink will return the same sorts of errors as |
| // pkg/client's REST client. |
| type EventSink interface { |
| Create(event *v1.Event) (*v1.Event, error) |
| Update(event *v1.Event) (*v1.Event, error) |
| Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error) |
| } |
| |
| // CorrelatorOptions allows you to change the default of the EventSourceObjectSpamFilter |
| // and EventAggregator in EventCorrelator |
| type CorrelatorOptions struct { |
| // The lru cache size used for both EventSourceObjectSpamFilter and the EventAggregator |
| // If not specified (zero value), the default specified in events_cache.go will be picked |
| // This means that the LRUCacheSize has to be greater than 0. |
| LRUCacheSize int |
| // The burst size used by the token bucket rate filtering in EventSourceObjectSpamFilter |
| // If not specified (zero value), the default specified in events_cache.go will be picked |
| // This means that the BurstSize has to be greater than 0. |
| BurstSize int |
| // The fill rate of the token bucket in queries per second in EventSourceObjectSpamFilter |
| // If not specified (zero value), the default specified in events_cache.go will be picked |
| // This means that the QPS has to be greater than 0. |
| QPS float32 |
| // The func used by the EventAggregator to group event keys for aggregation |
| // If not specified (zero value), EventAggregatorByReasonFunc will be used |
| KeyFunc EventAggregatorKeyFunc |
| // The func used by the EventAggregator to produced aggregated message |
| // If not specified (zero value), EventAggregatorByReasonMessageFunc will be used |
| MessageFunc EventAggregatorMessageFunc |
| // The number of events in an interval before aggregation happens by the EventAggregator |
| // If not specified (zero value), the default specified in events_cache.go will be picked |
| // This means that the MaxEvents has to be greater than 0 |
| MaxEvents int |
| // The amount of time in seconds that must transpire since the last occurrence of a similar event before it is considered new by the EventAggregator |
| // If not specified (zero value), the default specified in events_cache.go will be picked |
| // This means that the MaxIntervalInSeconds has to be greater than 0 |
| MaxIntervalInSeconds int |
| // The clock used by the EventAggregator to allow for testing |
| // If not specified (zero value), clock.RealClock{} will be used |
| Clock clock.Clock |
| } |
| |
| // EventRecorder knows how to record events on behalf of an EventSource. |
| type EventRecorder interface { |
| // Event constructs an event from the given information and puts it in the queue for sending. |
| // 'object' is the object this event is about. Event will make a reference-- or you may also |
| // pass a reference to the object directly. |
| // 'type' of this event, and can be one of Normal, Warning. New types could be added in future |
| // 'reason' is the reason this event is generated. 'reason' should be short and unique; it |
| // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used |
| // to automate handling of events, so imagine people writing switch statements to handle them. |
| // You want to make that easy. |
| // 'message' is intended to be human readable. |
| // |
| // The resulting event will be created in the same namespace as the reference object. |
| Event(object runtime.Object, eventtype, reason, message string) |
| |
| // Eventf is just like Event, but with Sprintf for the message field. |
| Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) |
| |
| // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field. |
| PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) |
| |
| // AnnotatedEventf is just like eventf, but with annotations attached |
| AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) |
| } |
| |
| // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. |
| type EventBroadcaster interface { |
| // StartEventWatcher starts sending events received from this EventBroadcaster to the given |
| // event handler function. The return value can be ignored or used to stop recording, if |
| // desired. |
| StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface |
| |
| // StartRecordingToSink starts sending events received from this EventBroadcaster to the given |
| // sink. The return value can be ignored or used to stop recording, if desired. |
| StartRecordingToSink(sink EventSink) watch.Interface |
| |
| // StartLogging starts sending events received from this EventBroadcaster to the given logging |
| // function. The return value can be ignored or used to stop recording, if desired. |
| StartLogging(logf func(format string, args ...interface{})) watch.Interface |
| |
| // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster |
| // with the event source set to the given event source. |
| NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder |
| } |
| |
| // Creates a new event broadcaster. |
| func NewBroadcaster() EventBroadcaster { |
| return &eventBroadcasterImpl{ |
| Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), |
| sleepDuration: defaultSleepDuration, |
| } |
| } |
| |
| func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { |
| return &eventBroadcasterImpl{ |
| Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), |
| sleepDuration: sleepDuration, |
| } |
| } |
| |
| func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster { |
| return &eventBroadcasterImpl{ |
| Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), |
| sleepDuration: defaultSleepDuration, |
| options: options, |
| } |
| } |
| |
| type eventBroadcasterImpl struct { |
| *watch.Broadcaster |
| sleepDuration time.Duration |
| options CorrelatorOptions |
| } |
| |
| // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. |
| // The return value can be ignored or used to stop recording, if desired. |
| // TODO: make me an object with parameterizable queue length and retry interval |
| func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface { |
| // The default math/rand package functions aren't thread safe, so create a |
| // new Rand object for each StartRecording call. |
| randGen := rand.New(rand.NewSource(time.Now().UnixNano())) |
| eventCorrelator := NewEventCorrelatorWithOptions(eventBroadcaster.options) |
| return eventBroadcaster.StartEventWatcher( |
| func(event *v1.Event) { |
| recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration) |
| }) |
| } |
| |
| func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) { |
| // Make a copy before modification, because there could be multiple listeners. |
| // Events are safe to copy like this. |
| eventCopy := *event |
| event = &eventCopy |
| result, err := eventCorrelator.EventCorrelate(event) |
| if err != nil { |
| utilruntime.HandleError(err) |
| } |
| if result.Skip { |
| return |
| } |
| tries := 0 |
| for { |
| if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { |
| break |
| } |
| tries++ |
| if tries >= maxTriesPerEvent { |
| klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) |
| break |
| } |
| // Randomize the first sleep so that various clients won't all be |
| // synced up if the master goes down. |
| if tries == 1 { |
| time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) |
| } else { |
| time.Sleep(sleepDuration) |
| } |
| } |
| } |
| |
| // recordEvent attempts to write event to a sink. It returns true if the event |
| // was successfully recorded or discarded, false if it should be retried. |
| // If updateExistingEvent is false, it creates a new event, otherwise it updates |
| // existing event. |
| func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool { |
| var newEvent *v1.Event |
| var err error |
| if updateExistingEvent { |
| newEvent, err = sink.Patch(event, patch) |
| } |
| // Update can fail because the event may have been removed and it no longer exists. |
| if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) { |
| // Making sure that ResourceVersion is empty on creation |
| event.ResourceVersion = "" |
| newEvent, err = sink.Create(event) |
| } |
| if err == nil { |
| // we need to update our event correlator with the server returned state to handle name/resourceversion |
| eventCorrelator.UpdateState(newEvent) |
| return true |
| } |
| |
| // If we can't contact the server, then hold everything while we keep trying. |
| // Otherwise, something about the event is malformed and we should abandon it. |
| switch err.(type) { |
| case *restclient.RequestConstructionError: |
| // We will construct the request the same next time, so don't keep trying. |
| klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) |
| return true |
| case *errors.StatusError: |
| if errors.IsAlreadyExists(err) { |
| klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) |
| } else { |
| klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) |
| } |
| return true |
| case *errors.UnexpectedObjectError: |
| // We don't expect this; it implies the server's response didn't match a |
| // known pattern. Go ahead and retry. |
| default: |
| // This case includes actual http transport errors. Go ahead and retry. |
| } |
| klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) |
| return false |
| } |
| |
| // StartLogging starts sending events received from this EventBroadcaster to the given logging function. |
| // The return value can be ignored or used to stop recording, if desired. |
| func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface { |
| return eventBroadcaster.StartEventWatcher( |
| func(e *v1.Event) { |
| logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message) |
| }) |
| } |
| |
| // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. |
| // The return value can be ignored or used to stop recording, if desired. |
| func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { |
| watcher := eventBroadcaster.Watch() |
| go func() { |
| defer utilruntime.HandleCrash() |
| for watchEvent := range watcher.ResultChan() { |
| event, ok := watchEvent.Object.(*v1.Event) |
| if !ok { |
| // This is all local, so there's no reason this should |
| // ever happen. |
| continue |
| } |
| eventHandler(event) |
| } |
| }() |
| return watcher |
| } |
| |
| // NewRecorder returns an EventRecorder that records events with the given event source. |
| func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder { |
| return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}} |
| } |
| |
| type recorderImpl struct { |
| scheme *runtime.Scheme |
| source v1.EventSource |
| *watch.Broadcaster |
| clock clock.Clock |
| } |
| |
| func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) { |
| ref, err := ref.GetReference(recorder.scheme, object) |
| if err != nil { |
| klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message) |
| return |
| } |
| |
| if !util.ValidateEventType(eventtype) { |
| klog.Errorf("Unsupported event type: '%v'", eventtype) |
| return |
| } |
| |
| event := recorder.makeEvent(ref, annotations, eventtype, reason, message) |
| event.Source = recorder.source |
| |
| go func() { |
| // NOTE: events should be a non-blocking operation |
| defer utilruntime.HandleCrash() |
| recorder.Action(watch.Added, event) |
| }() |
| } |
| |
| func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { |
| recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message) |
| } |
| |
| func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { |
| recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...)) |
| } |
| |
| func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) { |
| recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...)) |
| } |
| |
| func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { |
| recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...)) |
| } |
| |
| func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event { |
| t := metav1.Time{Time: recorder.clock.Now()} |
| namespace := ref.Namespace |
| if namespace == "" { |
| namespace = metav1.NamespaceDefault |
| } |
| return &v1.Event{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), |
| Namespace: namespace, |
| Annotations: annotations, |
| }, |
| InvolvedObject: *ref, |
| Reason: reason, |
| Message: message, |
| FirstTimestamp: t, |
| LastTimestamp: t, |
| Count: 1, |
| Type: eventtype, |
| } |
| } |