blob: 06f88b2c9b99f433a1c5cfd3bf6e8554fbae37a1 [file] [log] [blame]
package notifications
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
"github.com/sirupsen/logrus"
"testing"
)
func TestBroadcaster(t *testing.T) {
const nEvents = 1000
var sinks []Sink
for i := 0; i < 10; i++ {
sinks = append(sinks, &testSink{})
}
b := NewBroadcaster(sinks...)
var block []Event
var wg sync.WaitGroup
for i := 1; i <= nEvents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := b.Write(block...); err != nil {
t.Errorf("error writing block of length %d: %v", len(block), err)
}
wg.Done()
}(block...)
block = nil
}
}
wg.Wait() // Wait until writes complete
if t.Failed() {
t.FailNow()
}
checkClose(t, b)
// Iterate through the sinks and check that they all have the expected length.
for _, sink := range sinks {
ts := sink.(*testSink)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != nEvents {
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
}
if !ts.closed {
t.Fatalf("sink should have been closed")
}
}
}
func TestEventQueue(t *testing.T) {
const nevents = 1000
var ts testSink
metrics := newSafeMetrics()
eq := newEventQueue(
// delayed sync simulates destination slower than channel comms
&delayedSink{
Sink: &ts,
delay: time.Millisecond * 1,
}, metrics.eventQueueListener())
var wg sync.WaitGroup
var block []Event
for i := 1; i <= nevents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := eq.Write(block...); err != nil {
t.Errorf("error writing event block: %v", err)
}
wg.Done()
}(block...)
block = nil
}
}
wg.Wait()
if t.Failed() {
t.FailNow()
}
checkClose(t, eq)
ts.mu.Lock()
defer ts.mu.Unlock()
metrics.Lock()
defer metrics.Unlock()
if len(ts.events) != nevents {
t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
}
if !ts.closed {
t.Fatalf("sink should have been closed")
}
if metrics.Events != nevents {
t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
}
if metrics.Pending != 0 {
t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
}
}
func TestIgnoredSink(t *testing.T) {
blob := createTestEvent("push", "library/test", "blob")
manifest := createTestEvent("pull", "library/test", "manifest")
type testcase struct {
ignoreMediaTypes []string
ignoreActions []string
expected []Event
}
cases := []testcase{
{nil, nil, []Event{blob, manifest}},
{[]string{"other"}, []string{"other"}, []Event{blob, manifest}},
{[]string{"blob"}, []string{"other"}, []Event{manifest}},
{[]string{"blob", "manifest"}, []string{"other"}, nil},
{[]string{"other"}, []string{"push"}, []Event{manifest}},
{[]string{"other"}, []string{"pull"}, []Event{blob}},
{[]string{"other"}, []string{"pull", "push"}, nil},
}
for _, c := range cases {
ts := &testSink{}
s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions)
if err := s.Write(blob, manifest); err != nil {
t.Fatalf("error writing event: %v", err)
}
ts.mu.Lock()
if !reflect.DeepEqual(ts.events, c.expected) {
t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected)
}
ts.mu.Unlock()
}
}
func TestRetryingSink(t *testing.T) {
// Make a sync that fails most of the time, ensuring that all the events
// make it through.
var ts testSink
flaky := &flakySink{
rate: 1.0, // start out always failing.
Sink: &ts,
}
s := newRetryingSink(flaky, 3, 10*time.Millisecond)
var wg sync.WaitGroup
var block []Event
for i := 1; i <= 100; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
// Above 50, set the failure rate lower
if i > 50 {
s.mu.Lock()
flaky.rate = 0.90
s.mu.Unlock()
}
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
defer wg.Done()
if err := s.Write(block...); err != nil {
t.Errorf("error writing event block: %v", err)
}
}(block...)
block = nil
}
}
wg.Wait()
if t.Failed() {
t.FailNow()
}
checkClose(t, s)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != 100 {
t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
}
}
type testSink struct {
events []Event
mu sync.Mutex
closed bool
}
func (ts *testSink) Write(events ...Event) error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.events = append(ts.events, events...)
return nil
}
func (ts *testSink) Close() error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.closed = true
logrus.Infof("closing testSink")
return nil
}
type delayedSink struct {
Sink
delay time.Duration
}
func (ds *delayedSink) Write(events ...Event) error {
time.Sleep(ds.delay)
return ds.Sink.Write(events...)
}
type flakySink struct {
Sink
rate float64
}
func (fs *flakySink) Write(events ...Event) error {
if rand.Float64() < fs.rate {
return fmt.Errorf("error writing %d events", len(events))
}
return fs.Sink.Write(events...)
}
func checkClose(t *testing.T, sink Sink) {
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
// second close should not crash but should return an error.
if err := sink.Close(); err == nil {
t.Fatalf("no error on double close")
}
// Write after closed should be an error
if err := sink.Write([]Event{}...); err == nil {
t.Fatalf("write after closed did not have an error")
} else if err != ErrSinkClosed {
t.Fatalf("error should be ErrSinkClosed")
}
}