| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package mvcc |
| |
| import ( |
| "crypto/rand" |
| "encoding/binary" |
| "fmt" |
| "math" |
| mrand "math/rand" |
| "os" |
| "reflect" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/coreos/etcd/lease" |
| "github.com/coreos/etcd/mvcc/backend" |
| "github.com/coreos/etcd/mvcc/mvccpb" |
| "github.com/coreos/etcd/pkg/schedule" |
| "github.com/coreos/etcd/pkg/testutil" |
| ) |
| |
| func TestStoreRev(t *testing.T) { |
| b, tmpPath := backend.NewDefaultTmpBackend() |
| s := NewStore(b, &lease.FakeLessor{}, nil) |
| defer s.Close() |
| defer os.Remove(tmpPath) |
| |
| for i := 1; i <= 3; i++ { |
| s.Put([]byte("foo"), []byte("bar"), lease.NoLease) |
| if r := s.Rev(); r != int64(i+1) { |
| t.Errorf("#%d: rev = %d, want %d", i, r, i+1) |
| } |
| } |
| } |
| |
| func TestStorePut(t *testing.T) { |
| kv := mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| Value: []byte("bar"), |
| CreateRevision: 1, |
| ModRevision: 2, |
| Version: 1, |
| } |
| kvb, err := kv.Marshal() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| tests := []struct { |
| rev revision |
| r indexGetResp |
| rr *rangeResp |
| |
| wrev revision |
| wkey []byte |
| wkv mvccpb.KeyValue |
| wputrev revision |
| }{ |
| { |
| revision{1, 0}, |
| indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, |
| nil, |
| |
| revision{2, 0}, |
| newTestKeyBytes(revision{2, 0}, false), |
| mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| Value: []byte("bar"), |
| CreateRevision: 2, |
| ModRevision: 2, |
| Version: 1, |
| Lease: 1, |
| }, |
| revision{2, 0}, |
| }, |
| { |
| revision{1, 1}, |
| indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, |
| &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, |
| |
| revision{2, 0}, |
| newTestKeyBytes(revision{2, 0}, false), |
| mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| Value: []byte("bar"), |
| CreateRevision: 2, |
| ModRevision: 2, |
| Version: 2, |
| Lease: 2, |
| }, |
| revision{2, 0}, |
| }, |
| { |
| revision{2, 0}, |
| indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, |
| &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, |
| |
| revision{3, 0}, |
| newTestKeyBytes(revision{3, 0}, false), |
| mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| Value: []byte("bar"), |
| CreateRevision: 2, |
| ModRevision: 3, |
| Version: 3, |
| Lease: 3, |
| }, |
| revision{3, 0}, |
| }, |
| } |
| for i, tt := range tests { |
| s := newFakeStore() |
| b := s.b.(*fakeBackend) |
| fi := s.kvindex.(*fakeIndex) |
| |
| s.currentRev = tt.rev.main |
| fi.indexGetRespc <- tt.r |
| if tt.rr != nil { |
| b.tx.rangeRespc <- *tt.rr |
| } |
| |
| s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1)) |
| |
| data, err := tt.wkv.Marshal() |
| if err != nil { |
| t.Errorf("#%d: marshal err = %v, want nil", i, err) |
| } |
| |
| wact := []testutil.Action{ |
| {"seqput", []interface{}{keyBucketName, tt.wkey, data}}, |
| } |
| |
| if tt.rr != nil { |
| wact = []testutil.Action{ |
| {"seqput", []interface{}{keyBucketName, tt.wkey, data}}, |
| } |
| } |
| |
| if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) |
| } |
| wact = []testutil.Action{ |
| {"get", []interface{}{[]byte("foo"), tt.wputrev.main}}, |
| {"put", []interface{}{[]byte("foo"), tt.wputrev}}, |
| } |
| if g := fi.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) |
| } |
| if s.currentRev != tt.wrev.main { |
| t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) |
| } |
| |
| s.Close() |
| } |
| } |
| |
| func TestStoreRange(t *testing.T) { |
| key := newTestKeyBytes(revision{2, 0}, false) |
| kv := mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| Value: []byte("bar"), |
| CreateRevision: 1, |
| ModRevision: 2, |
| Version: 1, |
| } |
| kvb, err := kv.Marshal() |
| if err != nil { |
| t.Fatal(err) |
| } |
| wrev := int64(2) |
| |
| tests := []struct { |
| idxr indexRangeResp |
| r rangeResp |
| }{ |
| { |
| indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, |
| rangeResp{[][]byte{key}, [][]byte{kvb}}, |
| }, |
| { |
| indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}}, |
| rangeResp{[][]byte{key}, [][]byte{kvb}}, |
| }, |
| } |
| |
| ro := RangeOptions{Limit: 1, Rev: 0, Count: false} |
| for i, tt := range tests { |
| s := newFakeStore() |
| b := s.b.(*fakeBackend) |
| fi := s.kvindex.(*fakeIndex) |
| |
| s.currentRev = 2 |
| b.tx.rangeRespc <- tt.r |
| fi.indexRangeRespc <- tt.idxr |
| |
| ret, err := s.Range([]byte("foo"), []byte("goo"), ro) |
| if err != nil { |
| t.Errorf("#%d: err = %v, want nil", i, err) |
| } |
| if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) { |
| t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w) |
| } |
| if ret.Rev != wrev { |
| t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev) |
| } |
| |
| wstart := newRevBytes() |
| revToBytes(tt.idxr.revs[0], wstart) |
| wact := []testutil.Action{ |
| {"range", []interface{}{keyBucketName, wstart, []byte(nil), int64(0)}}, |
| } |
| if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) |
| } |
| wact = []testutil.Action{ |
| {"range", []interface{}{[]byte("foo"), []byte("goo"), wrev}}, |
| } |
| if g := fi.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) |
| } |
| if s.currentRev != 2 { |
| t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2) |
| } |
| |
| s.Close() |
| } |
| } |
| |
| func TestStoreDeleteRange(t *testing.T) { |
| key := newTestKeyBytes(revision{2, 0}, false) |
| kv := mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| Value: []byte("bar"), |
| CreateRevision: 1, |
| ModRevision: 2, |
| Version: 1, |
| } |
| kvb, err := kv.Marshal() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| tests := []struct { |
| rev revision |
| r indexRangeResp |
| rr rangeResp |
| |
| wkey []byte |
| wrev revision |
| wrrev int64 |
| wdelrev revision |
| }{ |
| { |
| revision{2, 0}, |
| indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, |
| rangeResp{[][]byte{key}, [][]byte{kvb}}, |
| |
| newTestKeyBytes(revision{3, 0}, true), |
| revision{3, 0}, |
| 2, |
| revision{3, 0}, |
| }, |
| } |
| for i, tt := range tests { |
| s := newFakeStore() |
| b := s.b.(*fakeBackend) |
| fi := s.kvindex.(*fakeIndex) |
| |
| s.currentRev = tt.rev.main |
| fi.indexRangeRespc <- tt.r |
| b.tx.rangeRespc <- tt.rr |
| |
| n, _ := s.DeleteRange([]byte("foo"), []byte("goo")) |
| if n != 1 { |
| t.Errorf("#%d: n = %d, want 1", i, n) |
| } |
| |
| data, err := (&mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| }).Marshal() |
| if err != nil { |
| t.Errorf("#%d: marshal err = %v, want nil", i, err) |
| } |
| wact := []testutil.Action{ |
| {"seqput", []interface{}{keyBucketName, tt.wkey, data}}, |
| } |
| if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) |
| } |
| wact = []testutil.Action{ |
| {"range", []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}}, |
| {"tombstone", []interface{}{[]byte("foo"), tt.wdelrev}}, |
| } |
| if g := fi.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) |
| } |
| if s.currentRev != tt.wrev.main { |
| t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) |
| } |
| } |
| } |
| |
| func TestStoreCompact(t *testing.T) { |
| s := newFakeStore() |
| defer s.Close() |
| b := s.b.(*fakeBackend) |
| fi := s.kvindex.(*fakeIndex) |
| |
| s.currentRev = 3 |
| fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} |
| key1 := newTestKeyBytes(revision{1, 0}, false) |
| key2 := newTestKeyBytes(revision{2, 0}, false) |
| b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil} |
| |
| s.Compact(3) |
| s.fifoSched.WaitFinish(1) |
| |
| if s.compactMainRev != 3 { |
| t.Errorf("compact main rev = %d, want 3", s.compactMainRev) |
| } |
| end := make([]byte, 8) |
| binary.BigEndian.PutUint64(end, uint64(4)) |
| wact := []testutil.Action{ |
| {"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, |
| {"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}}, |
| {"delete", []interface{}{keyBucketName, key2}}, |
| {"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, |
| } |
| if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("tx actions = %+v, want %+v", g, wact) |
| } |
| wact = []testutil.Action{ |
| {"compact", []interface{}{int64(3)}}, |
| } |
| if g := fi.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("index action = %+v, want %+v", g, wact) |
| } |
| } |
| |
| func TestStoreRestore(t *testing.T) { |
| s := newFakeStore() |
| b := s.b.(*fakeBackend) |
| fi := s.kvindex.(*fakeIndex) |
| |
| putkey := newTestKeyBytes(revision{3, 0}, false) |
| putkv := mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| Value: []byte("bar"), |
| CreateRevision: 4, |
| ModRevision: 4, |
| Version: 1, |
| } |
| putkvb, err := putkv.Marshal() |
| if err != nil { |
| t.Fatal(err) |
| } |
| delkey := newTestKeyBytes(revision{5, 0}, true) |
| delkv := mvccpb.KeyValue{ |
| Key: []byte("foo"), |
| } |
| delkvb, err := delkv.Marshal() |
| if err != nil { |
| t.Fatal(err) |
| } |
| b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} |
| b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} |
| |
| b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} |
| b.tx.rangeRespc <- rangeResp{nil, nil} |
| |
| s.restore() |
| |
| if s.compactMainRev != 3 { |
| t.Errorf("compact rev = %d, want 5", s.compactMainRev) |
| } |
| if s.currentRev != 5 { |
| t.Errorf("current rev = %v, want 5", s.currentRev) |
| } |
| wact := []testutil.Action{ |
| {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, |
| {"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, |
| {"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, |
| } |
| if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("tx actions = %+v, want %+v", g, wact) |
| } |
| |
| gens := []generation{ |
| {created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}}, |
| {created: revision{0, 0}, ver: 0, revs: nil}, |
| } |
| ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens} |
| wact = []testutil.Action{ |
| {"keyIndex", []interface{}{ki}}, |
| {"insert", []interface{}{ki}}, |
| } |
| if g := fi.Action(); !reflect.DeepEqual(g, wact) { |
| t.Errorf("index action = %+v, want %+v", g, wact) |
| } |
| } |
| |
| func TestRestoreDelete(t *testing.T) { |
| oldChunk := restoreChunkKeys |
| restoreChunkKeys = mrand.Intn(3) + 2 |
| defer func() { restoreChunkKeys = oldChunk }() |
| |
| b, tmpPath := backend.NewDefaultTmpBackend() |
| s := NewStore(b, &lease.FakeLessor{}, nil) |
| defer os.Remove(tmpPath) |
| |
| keys := make(map[string]struct{}) |
| for i := 0; i < 20; i++ { |
| ks := fmt.Sprintf("foo-%d", i) |
| k := []byte(ks) |
| s.Put(k, []byte("bar"), lease.NoLease) |
| keys[ks] = struct{}{} |
| switch mrand.Intn(3) { |
| case 0: |
| // put random key from past via random range on map |
| ks = fmt.Sprintf("foo-%d", mrand.Intn(i+1)) |
| s.Put([]byte(ks), []byte("baz"), lease.NoLease) |
| keys[ks] = struct{}{} |
| case 1: |
| // delete random key via random range on map |
| for k := range keys { |
| s.DeleteRange([]byte(k), nil) |
| delete(keys, k) |
| break |
| } |
| } |
| } |
| s.Close() |
| |
| s = NewStore(b, &lease.FakeLessor{}, nil) |
| defer s.Close() |
| for i := 0; i < 20; i++ { |
| ks := fmt.Sprintf("foo-%d", i) |
| r, err := s.Range([]byte(ks), nil, RangeOptions{}) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if _, ok := keys[ks]; ok { |
| if len(r.KVs) == 0 { |
| t.Errorf("#%d: expected %q, got deleted", i, ks) |
| } |
| } else if len(r.KVs) != 0 { |
| t.Errorf("#%d: expected deleted, got %q", i, ks) |
| } |
| } |
| } |
| |
| func TestRestoreContinueUnfinishedCompaction(t *testing.T) { |
| b, tmpPath := backend.NewDefaultTmpBackend() |
| s0 := NewStore(b, &lease.FakeLessor{}, nil) |
| defer os.Remove(tmpPath) |
| |
| s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) |
| s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) |
| s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease) |
| |
| // write scheduled compaction, but not do compaction |
| rbytes := newRevBytes() |
| revToBytes(revision{main: 2}, rbytes) |
| tx := s0.b.BatchTx() |
| tx.Lock() |
| tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) |
| tx.Unlock() |
| |
| s0.Close() |
| |
| s1 := NewStore(b, &lease.FakeLessor{}, nil) |
| |
| // wait for scheduled compaction to be finished |
| time.Sleep(100 * time.Millisecond) |
| |
| if _, err := s1.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted { |
| t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) |
| } |
| // check the key in backend is deleted |
| revbytes := newRevBytes() |
| revToBytes(revision{main: 1}, revbytes) |
| |
| // The disk compaction is done asynchronously and requires more time on slow disk. |
| // try 5 times for CI with slow IO. |
| for i := 0; i < 5; i++ { |
| tx = s1.b.BatchTx() |
| tx.Lock() |
| ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) |
| tx.Unlock() |
| if len(ks) != 0 { |
| time.Sleep(100 * time.Millisecond) |
| continue |
| } |
| return |
| } |
| |
| t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) |
| } |
| |
| type hashKVResult struct { |
| hash uint32 |
| compactRev int64 |
| } |
| |
| // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting. |
| func TestHashKVWhenCompacting(t *testing.T) { |
| b, tmpPath := backend.NewDefaultTmpBackend() |
| s := NewStore(b, &lease.FakeLessor{}, nil) |
| defer os.Remove(tmpPath) |
| |
| rev := 10000 |
| for i := 2; i <= rev; i++ { |
| s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) |
| } |
| |
| hashCompactc := make(chan hashKVResult, 1) |
| |
| donec := make(chan struct{}) |
| var wg sync.WaitGroup |
| for i := 0; i < 10; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| for { |
| hash, _, compactRev, err := s.HashByRev(int64(rev)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| select { |
| case <-donec: |
| return |
| case hashCompactc <- hashKVResult{hash, compactRev}: |
| } |
| } |
| }() |
| } |
| |
| go func() { |
| defer close(donec) |
| revHash := make(map[int64]uint32) |
| for round := 0; round < 1000; round++ { |
| r := <-hashCompactc |
| if revHash[r.compactRev] == 0 { |
| revHash[r.compactRev] = r.hash |
| } |
| if r.hash != revHash[r.compactRev] { |
| t.Fatalf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev]) |
| } |
| } |
| }() |
| |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| for i := 100; i >= 0; i-- { |
| _, err := s.Compact(int64(rev - 1 - i)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| time.Sleep(10 * time.Millisecond) |
| } |
| }() |
| |
| select { |
| case <-donec: |
| wg.Wait() |
| case <-time.After(10 * time.Second): |
| testutil.FatalStack(t, "timeout") |
| } |
| } |
| |
| // TestHashKVZeroRevision ensures that "HashByRev(0)" computes |
| // correct hash value with latest revision. |
| func TestHashKVZeroRevision(t *testing.T) { |
| b, tmpPath := backend.NewDefaultTmpBackend() |
| s := NewStore(b, &lease.FakeLessor{}, nil) |
| defer os.Remove(tmpPath) |
| |
| rev := 1000 |
| for i := 2; i <= rev; i++ { |
| s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) |
| } |
| if _, err := s.Compact(int64(rev / 2)); err != nil { |
| t.Fatal(err) |
| } |
| |
| hash1, _, _, err := s.HashByRev(int64(rev)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| var hash2 uint32 |
| hash2, _, _, err = s.HashByRev(0) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if hash1 != hash2 { |
| t.Errorf("hash %d (rev %d) != hash %d (rev 0)", hash1, rev, hash2) |
| } |
| } |
| |
| func TestTxnPut(t *testing.T) { |
| // assign arbitrary size |
| bytesN := 30 |
| sliceN := 100 |
| keys := createBytesSlice(bytesN, sliceN) |
| vals := createBytesSlice(bytesN, sliceN) |
| |
| b, tmpPath := backend.NewDefaultTmpBackend() |
| s := NewStore(b, &lease.FakeLessor{}, nil) |
| defer cleanup(s, b, tmpPath) |
| |
| for i := 0; i < sliceN; i++ { |
| txn := s.Write() |
| base := int64(i + 2) |
| if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base { |
| t.Errorf("#%d: rev = %d, want %d", i, rev, base) |
| } |
| txn.End() |
| } |
| } |
| |
| func TestTxnBlockBackendForceCommit(t *testing.T) { |
| b, tmpPath := backend.NewDefaultTmpBackend() |
| s := NewStore(b, &lease.FakeLessor{}, nil) |
| defer os.Remove(tmpPath) |
| |
| txn := s.Read() |
| |
| done := make(chan struct{}) |
| go func() { |
| s.b.ForceCommit() |
| done <- struct{}{} |
| }() |
| select { |
| case <-done: |
| t.Fatalf("failed to block ForceCommit") |
| case <-time.After(100 * time.Millisecond): |
| } |
| |
| txn.End() |
| select { |
| case <-done: |
| case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO |
| testutil.FatalStack(t, "failed to execute ForceCommit") |
| } |
| } |
| |
| // TODO: test attach key to lessor |
| |
| func newTestRevBytes(rev revision) []byte { |
| bytes := newRevBytes() |
| revToBytes(rev, bytes) |
| return bytes |
| } |
| |
| func newTestKeyBytes(rev revision, tombstone bool) []byte { |
| bytes := newRevBytes() |
| revToBytes(rev, bytes) |
| if tombstone { |
| bytes = appendMarkTombstone(bytes) |
| } |
| return bytes |
| } |
| |
| func newFakeStore() *store { |
| b := &fakeBackend{&fakeBatchTx{ |
| Recorder: &testutil.RecorderBuffered{}, |
| rangeRespc: make(chan rangeResp, 5)}} |
| fi := &fakeIndex{ |
| Recorder: &testutil.RecorderBuffered{}, |
| indexGetRespc: make(chan indexGetResp, 1), |
| indexRangeRespc: make(chan indexRangeResp, 1), |
| indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), |
| indexCompactRespc: make(chan map[revision]struct{}, 1), |
| } |
| s := &store{ |
| b: b, |
| le: &lease.FakeLessor{}, |
| kvindex: fi, |
| currentRev: 0, |
| compactMainRev: -1, |
| fifoSched: schedule.NewFIFOScheduler(), |
| stopc: make(chan struct{}), |
| } |
| s.ReadView, s.WriteView = &readView{s}, &writeView{s} |
| return s |
| } |
| |
| type rangeResp struct { |
| keys [][]byte |
| vals [][]byte |
| } |
| |
| type fakeBatchTx struct { |
| testutil.Recorder |
| rangeRespc chan rangeResp |
| } |
| |
| func (b *fakeBatchTx) Lock() {} |
| func (b *fakeBatchTx) Unlock() {} |
| func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} |
| func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { |
| b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) |
| } |
| func (b *fakeBatchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { |
| b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucketName, key, value}}) |
| } |
| func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { |
| b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}}) |
| r := <-b.rangeRespc |
| return r.keys, r.vals |
| } |
| func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) { |
| b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}}) |
| } |
| func (b *fakeBatchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { |
| return nil |
| } |
| func (b *fakeBatchTx) Commit() {} |
| func (b *fakeBatchTx) CommitAndStop() {} |
| |
| type fakeBackend struct { |
| tx *fakeBatchTx |
| } |
| |
| func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } |
| func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } |
| func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } |
| func (b *fakeBackend) Size() int64 { return 0 } |
| func (b *fakeBackend) SizeInUse() int64 { return 0 } |
| func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } |
| func (b *fakeBackend) ForceCommit() {} |
| func (b *fakeBackend) Defrag() error { return nil } |
| func (b *fakeBackend) Close() error { return nil } |
| |
| type indexGetResp struct { |
| rev revision |
| created revision |
| ver int64 |
| err error |
| } |
| |
| type indexRangeResp struct { |
| keys [][]byte |
| revs []revision |
| } |
| |
| type indexRangeEventsResp struct { |
| revs []revision |
| } |
| |
| type fakeIndex struct { |
| testutil.Recorder |
| indexGetRespc chan indexGetResp |
| indexRangeRespc chan indexRangeResp |
| indexRangeEventsRespc chan indexRangeEventsResp |
| indexCompactRespc chan map[revision]struct{} |
| } |
| |
| func (i *fakeIndex) Revisions(key, end []byte, atRev int64) []revision { |
| _, rev := i.Range(key, end, atRev) |
| return rev |
| } |
| |
| func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) { |
| i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}}) |
| r := <-i.indexGetRespc |
| return r.rev, r.created, r.ver, r.err |
| } |
| func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) { |
| i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}}) |
| r := <-i.indexRangeRespc |
| return r.keys, r.revs |
| } |
| func (i *fakeIndex) Put(key []byte, rev revision) { |
| i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}}) |
| } |
| func (i *fakeIndex) Tombstone(key []byte, rev revision) error { |
| i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}}) |
| return nil |
| } |
| func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision { |
| i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}}) |
| r := <-i.indexRangeEventsRespc |
| return r.revs |
| } |
| func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { |
| i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}}) |
| return <-i.indexCompactRespc |
| } |
| func (i *fakeIndex) Keep(rev int64) map[revision]struct{} { |
| i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}}) |
| return <-i.indexCompactRespc |
| } |
| func (i *fakeIndex) Equal(b index) bool { return false } |
| |
| func (i *fakeIndex) Insert(ki *keyIndex) { |
| i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}}) |
| } |
| |
| func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex { |
| i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}}) |
| return nil |
| } |
| |
| func createBytesSlice(bytesN, sliceN int) [][]byte { |
| rs := [][]byte{} |
| for len(rs) != sliceN { |
| v := make([]byte, bytesN) |
| if _, err := rand.Read(v); err != nil { |
| panic(err) |
| } |
| rs = append(rs, v) |
| } |
| return rs |
| } |