blob: 739bd98a46fe782a16eef8c35025758a5c7e892f [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dispatcher
import (
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"gotest.tools/assert"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
)
// app event for testing
type TestAppEvent struct {
appID string
eventType string
flag chan bool
}
func (t TestAppEvent) GetApplicationID() string {
return t.appID
}
func (t TestAppEvent) GetEvent() string {
return t.eventType
}
func (t TestAppEvent) GetArgs() []interface{} {
return nil
}
const RunApplication string = "RunApplication"
func TestRegisterEventHandler(t *testing.T) {
createDispatcher()
defer createDispatcher()
RegisterEventHandler(EventTypeApp, func(obj interface{}) {})
RegisterEventHandler(EventTypeTask, func(obj interface{}) {})
RegisterEventHandler(EventTypeTask, func(obj interface{}) {})
assert.Equal(t, len(dispatcher.handlers), 2)
}
type appEventsRecorder struct {
apps []string
lock *sync.RWMutex
}
func (a *appEventsRecorder) addApp(appID string) {
a.lock.Lock()
defer a.lock.Unlock()
a.apps = append(a.apps, appID)
}
func (a *appEventsRecorder) contains(appID string) bool {
a.lock.RLock()
defer a.lock.RUnlock()
for _, existingAppID := range a.apps {
if existingAppID == appID {
return true
}
}
return false
}
func (a *appEventsRecorder) size() int {
a.lock.RLock()
defer a.lock.RUnlock()
return len(a.apps)
}
func TestDispatcherStartStop(t *testing.T) {
createDispatcher()
defer createDispatcher()
// thread safe
recorder := &appEventsRecorder{
apps: make([]string, 0),
lock: &sync.RWMutex{},
}
RegisterEventHandler(EventTypeApp, func(obj interface{}) {
if event, ok := obj.(events.ApplicationEvent); ok {
recorder.addApp(event.GetApplicationID())
}
})
// start the dispatcher
Start()
// dispatch an event
Dispatch(TestAppEvent{
appID: "test-app-001",
eventType: RunApplication,
})
Dispatch(TestAppEvent{
appID: "test-app-002",
eventType: RunApplication,
})
// wait until all events are handled
dispatcher.drain()
// stop the dispatcher,
Stop()
assert.Equal(t, recorder.size(), 2)
assert.Equal(t, recorder.contains("test-app-001"), true)
assert.Equal(t, recorder.contains("test-app-002"), true)
// ensure state is stopped
assert.Equal(t, dispatcher.isRunning(), false)
// dispatch new events should fail
if err := dispatcher.dispatch(TestAppEvent{
appID: "test-app-002",
eventType: RunApplication,
}); err == nil {
t.Fatalf("dispatch is not running, this should fail")
} else {
t.Logf("seen expected error: %v", err)
}
}
// Test sending events from multiple senders in parallel,
// verify that events won't be lost
func TestEventWillNotBeLostWhenEventChannelIsFull(t *testing.T) {
createDispatcher()
defer createDispatcher()
dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
// thread safe
recorder := &appEventsRecorder{
apps: make([]string, 0),
lock: &sync.RWMutex{},
}
// pretend to be an time-consuming event-handler
RegisterEventHandler(EventTypeApp, func(obj interface{}) {
if event, ok := obj.(events.ApplicationEvent); ok {
recorder.addApp(event.GetApplicationID())
time.Sleep(1 * time.Millisecond)
}
})
// start the dispatcher
Start()
// send events
numEvents := 10
for i := 0; i < numEvents; i++ {
Dispatch(TestAppEvent{
appID: "test",
eventType: RunApplication,
})
}
// check event channel is full and some events are dispatched asynchronously
assert.Assert(t, atomic.LoadInt32(&asyncDispatchCount) > 0)
// wait until all events are handled
dispatcher.drain()
// stop the dispatcher
Stop()
// assert all event are handled
assert.Equal(t, recorder.size(), numEvents)
assert.Assert(t, atomic.LoadInt32(&asyncDispatchCount) == 0)
// ensure state is stopped
assert.Equal(t, dispatcher.isRunning(), false)
}
// Test dispatch timeout, verify that Dispatcher#asyncDispatch is called when event channel is full
// and will disappear after timeout.
func TestDispatchTimeout(t *testing.T) {
createDispatcher()
defer createDispatcher()
// reset event channel with small capacity for testing
dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
AsyncDispatchCheckInterval = 100 * time.Millisecond
DispatchTimeout = 500 * time.Millisecond
// start the handler, but waiting on a flag
RegisterEventHandler(EventTypeApp, func(obj interface{}) {
if appEvent, ok := obj.(TestAppEvent); ok {
fmt.Println(fmt.Sprintf("handling %s", appEvent.appID))
<-appEvent.flag
fmt.Println(fmt.Sprintf("handling %s DONE", appEvent.appID))
}
})
// start the dispatcher
Start()
// dispatch 3 events, the third event will be dispatched asynchronously
stop := make(chan bool)
for i := 0; i < 3; i++ {
Dispatch(TestAppEvent{
appID: fmt.Sprintf("test-%d", i),
eventType: RunApplication,
flag: stop,
})
}
// give it a small amount of time,
// 1st event should be picked up and stuck at handling
// 2nd one should be added to the channel
// 3rd one should be posted as an async request
time.Sleep(100 * time.Millisecond)
assert.Equal(t, atomic.LoadInt32(&asyncDispatchCount), int32(1))
// verify Dispatcher#asyncDispatch is called
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
assert.Assert(t, strings.Contains(string(buf), "asyncDispatch"))
// wait until async dispatch routine times out
err := utils.WaitForCondition(func() bool {
return atomic.LoadInt32(&asyncDispatchCount) == int32(0)
}, 100*time.Millisecond, DispatchTimeout+AsyncDispatchCheckInterval)
assert.NilError(t, err)
// verify no left-over thread
buf = make([]byte, 1<<16)
runtime.Stack(buf, true)
assert.Assert(t, !strings.Contains(string(buf), "asyncDispatch"))
// stop the dispatcher
close(stop)
Stop()
}
// Test exceeding the async-dispatch limit, should panic immediately.
func TestExceedAsyncDispatchLimit(t *testing.T) {
createDispatcher()
defer createDispatcher()
// reset event channel with small capacity for testing
dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
AsyncDispatchLimit = 1
// pretend to be an time-consuming event-handler
RegisterEventHandler(EventTypeApp, func(obj interface{}) {
if _, ok := obj.(events.ApplicationEvent); ok {
time.Sleep(2 * time.Second)
}
})
// Handle errors in defer func with recover.
defer func() {
// stop the dispatcher
Stop()
// check error
if err := recover(); err != nil {
assert.Assert(t, strings.Contains(err.(error).Error(), "dispatcher exceeds async-dispatch limit"))
} else {
t.Error("Panic should be caught here")
}
}()
// start the dispatcher
Start()
// dispatch 4 events, the third and forth events will be dispatched asynchronously
for i := 0; i < 4; i++ {
Dispatch(TestAppEvent{
appID: "test",
eventType: RunApplication,
})
}
}
func createDispatcher() {
once.Do(func() {}) // run nop, so that functions like RegisterEventHandler() won't run initDispatcher() again
initDispatcher()
}