| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package events |
| |
| import ( |
| "fmt" |
| "sync" |
| "time" |
| |
| "github.com/apache/yunikorn-core/pkg/common" |
| "github.com/apache/yunikorn-core/pkg/common/configs" |
| "github.com/apache/yunikorn-core/pkg/log" |
| "github.com/apache/yunikorn-core/pkg/metrics" |
| "github.com/apache/yunikorn-scheduler-interface/lib/go/si" |
| ) |
| |
| // need to change for testing |
| var defaultEventChannelSize = 100000 |
| var defaultRingBufferSize uint64 = 100000 |
| |
| var once sync.Once |
| var ev EventSystem |
| |
| type EventSystem interface { |
| // AddEvent adds an event record to the event system for processing: |
| // 1. It is added to a slice from where it is periodically read by the shim publisher. |
| // 2. It is added to an internal ring buffer so that clients can retrieve the event history. |
| // 3. Streaming clients are updated. |
| AddEvent(event *si.EventRecord) |
| |
| // StartService starts the event system. |
| // This method does not block. Events are processed on a separate goroutine. |
| StartService() |
| |
| // Stop stops the event system. |
| Stop() |
| |
| // IsEventTrackingEnabled whether history tracking is currently enabled or not. |
| IsEventTrackingEnabled() bool |
| |
| // GetEventsFromID retrieves "count" number of elements from the history buffer from "id". Every |
| // event has a unique ID inside the ring buffer. |
| // If "id" is not in the buffer, then no record is returned, but the currently available range |
| // [low..high] is set. |
| GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64) |
| |
| // CreateEventStream creates an event stream (channel) for a consumer. |
| // The "name" argument is an arbitrary string for a consumer, which is used for logging. It does not need to be unique. |
| // The "count" argument defines how many historical elements should be returned on the stream. Zero is a valid value for "count". |
| // The returned type contains a read-only channel which is updated as soon as there is a new event record. |
| // It is also used as a handle to stop the streaming. |
| // Consumers must read the channel and process the event objects as soon as they can to avoid |
| // events piling up inside the channel buffers. |
| CreateEventStream(name string, count uint64) *EventStream |
| |
| // RemoveStream stops streaming for a given consumer. |
| // Consumers that no longer wish to be updated (e.g., a remote client |
| // disconnected) *must* call this method to gracefully stop the streaming. |
| RemoveStream(*EventStream) |
| |
| // GetEventStreams returns the current active event streams. |
| GetEventStreams() []EventStreamData |
| } |
| |
| // EventSystemImpl main implementation of the event system which is used for history tracking. |
| type EventSystemImpl struct { |
| eventSystemId string |
| Store *EventStore // storing eventChannel |
| publisher *EventPublisher |
| eventBuffer *eventRingBuffer |
| streaming *EventStreaming |
| |
| channel chan *si.EventRecord // channelling input eventChannel |
| stop chan bool // whether the service is stopped |
| stopped bool |
| |
| trackingEnabled bool |
| requestCapacity int |
| ringBufferCapacity uint64 |
| |
| sync.RWMutex |
| } |
| |
| // CreateEventStream creates an event stream. See the interface for details. |
| func (ec *EventSystemImpl) CreateEventStream(name string, count uint64) *EventStream { |
| return ec.streaming.CreateEventStream(name, count) |
| } |
| |
| // RemoveStream graceful termination of an event streaming for a consumer. See the interface for details. |
| func (ec *EventSystemImpl) RemoveStream(consumer *EventStream) { |
| ec.streaming.RemoveEventStream(consumer) |
| } |
| |
| // GetEventsFromID retrieves historical elements. See the interface for details. |
| func (ec *EventSystemImpl) GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64) { |
| return ec.eventBuffer.GetEventsFromID(id, count) |
| } |
| |
| // GetEventSystem returns the event system instance. Initialization happens during the first call. |
| func GetEventSystem() EventSystem { |
| once.Do(func() { |
| Init() |
| }) |
| return ev |
| } |
| |
| // IsEventTrackingEnabled whether history tracking is currently enabled or not. |
| func (ec *EventSystemImpl) IsEventTrackingEnabled() bool { |
| ec.RLock() |
| defer ec.RUnlock() |
| return ec.trackingEnabled |
| } |
| |
| // GetRequestCapacity returns the capacity of an intermediate storage which is used by the shim publisher. |
| func (ec *EventSystemImpl) GetRequestCapacity() int { |
| ec.RLock() |
| defer ec.RUnlock() |
| return ec.requestCapacity |
| } |
| |
| // GetRingBufferCapacity returns the capacity of the buffer which stores historical elements. |
| func (ec *EventSystemImpl) GetRingBufferCapacity() uint64 { |
| ec.RLock() |
| defer ec.RUnlock() |
| return ec.ringBufferCapacity |
| } |
| |
| // Init Initializes the event system. |
| // Only exported for testing. |
| func Init() { |
| store := newEventStore() |
| buffer := newEventRingBuffer(defaultRingBufferSize) |
| ev = &EventSystemImpl{ |
| Store: store, |
| channel: make(chan *si.EventRecord, defaultEventChannelSize), |
| stop: make(chan bool), |
| stopped: false, |
| publisher: CreateShimPublisher(store), |
| eventBuffer: buffer, |
| eventSystemId: fmt.Sprintf("event-system-%d", time.Now().Unix()), |
| streaming: NewEventStreaming(buffer), |
| } |
| } |
| |
| // StartService starts the event processing in the background. See the interface for details. |
| func (ec *EventSystemImpl) StartService() { |
| ec.StartServiceWithPublisher(true) |
| } |
| |
| // StartServiceWithPublisher starts the event processing background routines. |
| // Only exported for testing. |
| func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool) { |
| ec.Lock() |
| defer ec.Unlock() |
| |
| configs.AddConfigMapCallback(ec.eventSystemId, func() { |
| go ec.reloadConfig() |
| }) |
| |
| ec.trackingEnabled = ec.readIsTrackingEnabled() |
| ec.ringBufferCapacity = ec.readRingBufferCapacity() |
| ec.requestCapacity = ec.readRequestCapacity() |
| |
| go func() { |
| log.Log(log.Events).Info("Starting event system handler") |
| for { |
| select { |
| case <-ec.stop: |
| return |
| case event, ok := <-ec.channel: |
| if !ok { |
| return |
| } |
| if event != nil { |
| ec.Store.Store(event) |
| ec.eventBuffer.Add(event) |
| ec.streaming.PublishEvent(event) |
| metrics.GetEventMetrics().IncEventsProcessed() |
| } |
| } |
| } |
| }() |
| if withPublisher { |
| ec.publisher.StartService() |
| } |
| } |
| |
| // Stop stops the event system. |
| func (ec *EventSystemImpl) Stop() { |
| ec.Lock() |
| defer ec.Unlock() |
| |
| configs.RemoveConfigMapCallback(ec.eventSystemId) |
| |
| if ec.stopped { |
| return |
| } |
| |
| ec.stop <- true |
| if ec.channel != nil { |
| close(ec.channel) |
| ec.channel = nil |
| } |
| ec.publisher.Stop() |
| ec.stopped = true |
| } |
| |
| // AddEvent adds an event record to the event system. See the interface for details. |
| func (ec *EventSystemImpl) AddEvent(event *si.EventRecord) { |
| metrics.GetEventMetrics().IncEventsCreated() |
| select { |
| case ec.channel <- event: |
| metrics.GetEventMetrics().IncEventsChanneled() |
| default: |
| log.Log(log.Events).Debug("could not add Event to channel") |
| metrics.GetEventMetrics().IncEventsNotChanneled() |
| } |
| } |
| |
| func (ec *EventSystemImpl) readIsTrackingEnabled() bool { |
| return common.GetConfigurationBool(configs.GetConfigMap(), configs.CMEventTrackingEnabled, configs.DefaultEventTrackingEnabled) |
| } |
| |
| func (ec *EventSystemImpl) readRequestCapacity() int { |
| return common.GetConfigurationInt(configs.GetConfigMap(), configs.CMEventRequestCapacity, configs.DefaultEventRequestCapacity) |
| } |
| |
| func (ec *EventSystemImpl) readRingBufferCapacity() uint64 { |
| return common.GetConfigurationUint(configs.GetConfigMap(), configs.CMEventRingBufferCapacity, configs.DefaultEventRingBufferCapacity) |
| } |
| |
| func (ec *EventSystemImpl) isRestartNeeded() bool { |
| ec.RLock() |
| defer ec.RUnlock() |
| return ec.readIsTrackingEnabled() != ec.trackingEnabled |
| } |
| |
| // Restart restarts the event system, used during config update. |
| func (ec *EventSystemImpl) Restart() { |
| ec.Stop() |
| ec.StartServiceWithPublisher(true) |
| } |
| |
| // GetEventStreams returns the current active event streams. |
| func (ec *EventSystemImpl) GetEventStreams() []EventStreamData { |
| return ec.streaming.GetEventStreams() |
| } |
| |
| // VisibleForTesting |
| func (ec *EventSystemImpl) CloseAllStreams() { |
| ec.streaming.Lock() |
| defer ec.streaming.Unlock() |
| for consumer := range ec.streaming.eventStreams { |
| ec.streaming.removeEventStream(consumer) |
| } |
| } |
| |
| func (ec *EventSystemImpl) reloadConfig() { |
| ec.updateRequestCapacity() |
| |
| // resize the ring buffer with new capacity |
| ec.eventBuffer.Resize(ec.readRingBufferCapacity()) |
| |
| if ec.isRestartNeeded() { |
| ec.Restart() |
| } |
| } |
| |
| func (ec *EventSystemImpl) updateRequestCapacity() { |
| ec.Lock() |
| defer ec.Unlock() |
| ec.requestCapacity = ec.readRequestCapacity() |
| } |