blob: dc96d53366c4a4a5f9c33ccbdf85db5222061e06 [file] [log] [blame]
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"fmt"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
)
func TestWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
w := s.NewWatchStream()
w.Watch(testKey, nil, 0)
if !s.synced.contains(string(testKey)) {
// the key must have had an entry in synced
t.Errorf("existence = false, want true")
}
}
func TestNewWatcherCancel(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
w := s.NewWatchStream()
wt := w.Watch(testKey, nil, 0)
if err := w.Cancel(wt); err != nil {
t.Error(err)
}
if s.synced.contains(string(testKey)) {
// the key shoud have been deleted
t.Errorf("existence = true, want false")
}
}
// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
func TestCancelUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
// manually create watchableStore instead of newWatchableStore
// because newWatchableStore automatically calls syncWatchers
// method to sync watchers in unsynced map. We want to keep watchers
// in unsynced to test if syncWatchers works as expected.
s := &watchableStore{
store: NewStore(b, &lease.FakeLessor{}, nil),
unsynced: newWatcherGroup(),
// to make the test not crash from assigning to nil map.
// 'synced' doesn't get populated in this test.
synced: newWatcherGroup(),
}
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
// Put a key so that we can spawn watchers on that key.
// (testKey in this test). This increases the rev to 1,
// and later we can we set the watcher's startRev to 1,
// and force watchers to be in unsynced.
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
w := s.NewWatchStream()
// arbitrary number for watchers
watcherN := 100
// create watcherN of watch ids to cancel
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// use 1 to keep watchers in unsynced
watchIDs[i] = w.Watch(testKey, nil, 1)
}
for _, idx := range watchIDs {
if err := w.Cancel(idx); err != nil {
t.Error(err)
}
}
// After running CancelFunc
//
// unsynced should be empty
// because cancel removes watcher from unsynced
if size := s.unsynced.size(); size != 0 {
t.Errorf("unsynced size = %d, want 0", size)
}
}
// TestSyncWatchers populates unsynced watcher map and tests syncWatchers
// method to see if it correctly sends events to channel of unsynced watchers
// and moves these watchers to synced.
func TestSyncWatchers(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := &watchableStore{
store: NewStore(b, &lease.FakeLessor{}, nil),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
}
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
w := s.NewWatchStream()
// arbitrary number for watchers
watcherN := 100
for i := 0; i < watcherN; i++ {
// specify rev as 1 to keep watchers in unsynced
w.Watch(testKey, nil, 1)
}
// Before running s.syncWatchers() synced should be empty because we manually
// populate unsynced only
sws := s.synced.watcherSetByKey(string(testKey))
uws := s.unsynced.watcherSetByKey(string(testKey))
if len(sws) != 0 {
t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws))
}
// unsynced should not be empty because we manually populated unsynced only
if len(uws) != watcherN {
t.Errorf("unsynced size = %d, want %d", len(uws), watcherN)
}
// this should move all unsynced watchers to synced ones
s.syncWatchers()
sws = s.synced.watcherSetByKey(string(testKey))
uws = s.unsynced.watcherSetByKey(string(testKey))
// After running s.syncWatchers(), synced should not be empty because syncwatchers
// populates synced in this test case
if len(sws) != watcherN {
t.Errorf("synced[string(testKey)] size = %d, want %d", len(sws), watcherN)
}
// unsynced should be empty because syncwatchers is expected to move all watchers
// from unsynced to synced in this test case
if len(uws) != 0 {
t.Errorf("unsynced size = %d, want 0", len(uws))
}
for w := range sws {
if w.minRev != s.Rev()+1 {
t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1)
}
}
if len(w.(*watchStream).ch) != watcherN {
t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
}
evs := (<-w.(*watchStream).ch).Events
if len(evs) != 1 {
t.Errorf("len(evs) got = %d, want = 1", len(evs))
}
if evs[0].Type != mvccpb.PUT {
t.Errorf("got = %v, want = %v", evs[0].Type, mvccpb.PUT)
}
if !bytes.Equal(evs[0].Kv.Key, testKey) {
t.Errorf("got = %s, want = %s", evs[0].Kv.Key, testKey)
}
if !bytes.Equal(evs[0].Kv.Value, testValue) {
t.Errorf("got = %s, want = %s", evs[0].Kv.Value, testValue)
}
}
// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
maxRev := 10
compactRev := int64(5)
for i := 0; i < maxRev; i++ {
s.Put(testKey, testValue, lease.NoLease)
}
_, err := s.Compact(compactRev)
if err != nil {
t.Fatalf("failed to compact kv (%v)", err)
}
w := s.NewWatchStream()
wt := w.Watch(testKey, nil, compactRev-1)
select {
case resp := <-w.Chan():
if resp.WatchID != wt {
t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt)
}
if resp.CompactRevision == 0 {
t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, compactRev)
}
case <-time.After(1 * time.Second):
t.Fatalf("failed to receive response (timeout)")
}
}
func TestWatchFutureRev(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
w := s.NewWatchStream()
wrev := int64(10)
w.Watch(testKey, nil, wrev)
for i := 0; i < 10; i++ {
rev := s.Put(testKey, testValue, lease.NoLease)
if rev >= wrev {
break
}
}
select {
case resp := <-w.Chan():
if resp.Revision != wrev {
t.Fatalf("rev = %d, want %d", resp.Revision, wrev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != wrev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, wrev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
}
func TestWatchRestore(t *testing.T) {
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)
testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)
newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath)
w := newStore.NewWatchStream()
w.Watch(testKey, nil, rev-1)
time.Sleep(delay)
newStore.Restore(b)
select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
}
}
t.Run("Normal", test(0))
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}
// TestWatchRestoreSyncedWatcher tests such a case that:
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
// 2. watcher with a future revision is added to "synced" watcher group
// 3. restore/overwrite storage with snapshot of a higher lasat revision
// 4. restore operation moves "synced" to "unsynced" watcher group
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
defer cleanup(s1, b1, b1Path)
b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
defer cleanup(s2, b2, b2Path)
testKey, testValue := []byte("foo"), []byte("bar")
rev := s1.Put(testKey, testValue, lease.NoLease)
startRev := rev + 2
// create a watcher with a future revision
// add to "synced" watcher group (startRev > s.store.currentRev)
w1 := s1.NewWatchStream()
w1.Watch(testKey, nil, startRev)
// make "s2" ends up with a higher last revision
s2.Put(testKey, testValue, lease.NoLease)
s2.Put(testKey, testValue, lease.NoLease)
// overwrite storage with higher revisions
if err := s1.Restore(b2); err != nil {
t.Fatal(err)
}
// wait for next "syncWatchersLoop" iteration
// and the unsynced watcher should be chosen
time.Sleep(2 * time.Second)
// trigger events for "startRev"
s1.Put(testKey, testValue, lease.NoLease)
select {
case resp := <-w1.Chan():
if resp.Revision != startRev {
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
}
if len(resp.Events) != 1 {
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
}
if resp.Events[0].Kv.ModRevision != startRev {
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second")
}
}
// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
oldMaxRevs := watchBatchMaxRevs
defer func() {
watchBatchMaxRevs = oldMaxRevs
s.store.Close()
os.Remove(tmpPath)
}()
batches := 3
watchBatchMaxRevs = 4
v := []byte("foo")
for i := 0; i < watchBatchMaxRevs*batches; i++ {
s.Put(v, v, lease.NoLease)
}
w := s.NewWatchStream()
w.Watch(v, nil, 1)
for i := 0; i < batches; i++ {
if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs)
}
}
s.store.revMu.Lock()
defer s.store.revMu.Unlock()
if size := s.synced.size(); size != 1 {
t.Errorf("synced size = %d, want 1", size)
}
}
func TestNewMapwatcherToEventMap(t *testing.T) {
k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
ws := []*watcher{{key: k0}, {key: k1}, {key: k2}}
evs := []mvccpb.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{Key: k0, Value: v0},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{Key: k1, Value: v1},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{Key: k2, Value: v2},
},
}
tests := []struct {
sync []*watcher
evs []mvccpb.Event
wwe map[*watcher][]mvccpb.Event
}{
// no watcher in sync, some events should return empty wwe
{
nil,
evs,
map[*watcher][]mvccpb.Event{},
},
// one watcher in sync, one event that does not match the key of that
// watcher should return empty wwe
{
[]*watcher{ws[2]},
evs[:1],
map[*watcher][]mvccpb.Event{},
},
// one watcher in sync, one event that matches the key of that
// watcher should return wwe with that matching watcher
{
[]*watcher{ws[1]},
evs[1:2],
map[*watcher][]mvccpb.Event{
ws[1]: evs[1:2],
},
},
// two watchers in sync that watches two different keys, one event
// that matches the key of only one of the watcher should return wwe
// with the matching watcher
{
[]*watcher{ws[0], ws[2]},
evs[2:],
map[*watcher][]mvccpb.Event{
ws[2]: evs[2:],
},
},
// two watchers in sync that watches the same key, two events that
// match the keys should return wwe with those two watchers
{
[]*watcher{ws[0], ws[1]},
evs[:2],
map[*watcher][]mvccpb.Event{
ws[0]: evs[:1],
ws[1]: evs[1:2],
},
},
}
for i, tt := range tests {
wg := newWatcherGroup()
for _, w := range tt.sync {
wg.add(w)
}
gwe := newWatcherBatch(&wg, tt.evs)
if len(gwe) != len(tt.wwe) {
t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
}
// compare gwe and tt.wwe
for w, eb := range gwe {
if len(eb.evs) != len(tt.wwe[w]) {
t.Errorf("#%d: len(eb.evs) got = %d, want = %d", i, len(eb.evs), len(tt.wwe[w]))
}
if !reflect.DeepEqual(eb.evs, tt.wwe[w]) {
t.Errorf("#%d: reflect.DeepEqual events got = %v, want = true", i, false)
}
}
}
}
// TestWatchVictims tests that watchable store delivers watch events
// when the watch channel is temporarily clogged with too many events.
func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.store.Close()
os.Remove(tmpPath)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()
chanBufLen, maxWatchersPerSync = 1, 2
numPuts := chanBufLen * 64
testKey, testValue := []byte("foo"), []byte("bar")
var wg sync.WaitGroup
numWatches := maxWatchersPerSync * 128
errc := make(chan error, numWatches)
wg.Add(numWatches)
for i := 0; i < numWatches; i++ {
go func() {
w := s.NewWatchStream()
w.Watch(testKey, nil, 1)
defer func() {
w.Close()
wg.Done()
}()
tc := time.After(10 * time.Second)
evs, nextRev := 0, int64(2)
for evs < numPuts {
select {
case <-tc:
errc <- fmt.Errorf("time out")
return
case wr := <-w.Chan():
evs += len(wr.Events)
for _, ev := range wr.Events {
if ev.Kv.ModRevision != nextRev {
errc <- fmt.Errorf("expected rev=%d, got %d", nextRev, ev.Kv.ModRevision)
return
}
nextRev++
}
time.Sleep(time.Millisecond)
}
}
if evs != numPuts {
errc <- fmt.Errorf("expected %d events, got %d", numPuts, evs)
return
}
select {
case <-w.Chan():
errc <- fmt.Errorf("unexpected response")
default:
}
}()
time.Sleep(time.Millisecond)
}
var wgPut sync.WaitGroup
wgPut.Add(numPuts)
for i := 0; i < numPuts; i++ {
go func() {
defer wgPut.Done()
s.Put(testKey, testValue, lease.NoLease)
}()
}
wgPut.Wait()
wg.Wait()
select {
case err := <-errc:
t.Fatal(err)
default:
}
}
// TestStressWatchCancelClose tests closing a watch stream while
// canceling its watches.
func TestStressWatchCancelClose(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey, testValue := []byte("foo"), []byte("bar")
var wg sync.WaitGroup
readyc := make(chan struct{})
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
w := s.NewWatchStream()
ids := make([]WatchID, 10)
for i := range ids {
ids[i] = w.Watch(testKey, nil, 0)
}
<-readyc
wg.Add(1 + len(ids)/2)
for i := range ids[:len(ids)/2] {
go func(n int) {
defer wg.Done()
w.Cancel(ids[n])
}(i)
}
go func() {
defer wg.Done()
w.Close()
}()
}()
}
close(readyc)
for i := 0; i < 100; i++ {
s.Put(testKey, testValue, lease.NoLease)
}
wg.Wait()
}