| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package compactor |
| |
| import ( |
| "reflect" |
| "testing" |
| "time" |
| |
| pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| "github.com/coreos/etcd/pkg/testutil" |
| |
| "github.com/jonboulle/clockwork" |
| ) |
| |
| func TestPeriodicHourly(t *testing.T) { |
| retentionHours := 2 |
| retentionDuration := time.Duration(retentionHours) * time.Hour |
| |
| fc := clockwork.NewFakeClock() |
| rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} |
| compactable := &fakeCompactable{testutil.NewRecorderStream()} |
| tb := newPeriodic(fc, retentionDuration, rg, compactable) |
| |
| tb.Run() |
| defer tb.Stop() |
| |
| initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 |
| |
| // compaction doesn't happen til 2 hours elapse |
| for i := 0; i < initialIntervals; i++ { |
| rg.Wait(1) |
| fc.Advance(tb.getRetryInterval()) |
| } |
| |
| // very first compaction |
| a, err := compactable.Wait(1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| expectedRevision := int64(1) |
| if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { |
| t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) |
| } |
| |
| // simulate 3 hours |
| // now compactor kicks in, every hour |
| for i := 0; i < 3; i++ { |
| // advance one hour, one revision for each interval |
| for j := 0; j < intervalsPerPeriod; j++ { |
| rg.Wait(1) |
| fc.Advance(tb.getRetryInterval()) |
| } |
| |
| a, err = compactable.Wait(1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| expectedRevision = int64((i + 1) * 10) |
| if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { |
| t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) |
| } |
| } |
| } |
| |
| func TestPeriodicMinutes(t *testing.T) { |
| retentionMinutes := 5 |
| retentionDuration := time.Duration(retentionMinutes) * time.Minute |
| |
| fc := clockwork.NewFakeClock() |
| rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} |
| compactable := &fakeCompactable{testutil.NewRecorderStream()} |
| tb := newPeriodic(fc, retentionDuration, rg, compactable) |
| |
| tb.Run() |
| defer tb.Stop() |
| |
| initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 |
| |
| // compaction doesn't happen til 5 minutes elapse |
| for i := 0; i < initialIntervals; i++ { |
| rg.Wait(1) |
| fc.Advance(tb.getRetryInterval()) |
| } |
| |
| // very first compaction |
| a, err := compactable.Wait(1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| expectedRevision := int64(1) |
| if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { |
| t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) |
| } |
| |
| // compaction happens at every interval |
| for i := 0; i < 5; i++ { |
| // advance 5-minute, one revision for each interval |
| for j := 0; j < intervalsPerPeriod; j++ { |
| rg.Wait(1) |
| fc.Advance(tb.getRetryInterval()) |
| } |
| |
| a, err := compactable.Wait(1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| expectedRevision = int64((i + 1) * 10) |
| if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { |
| t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) |
| } |
| } |
| } |
| |
| func TestPeriodicPause(t *testing.T) { |
| fc := clockwork.NewFakeClock() |
| retentionDuration := time.Hour |
| rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} |
| compactable := &fakeCompactable{testutil.NewRecorderStream()} |
| tb := newPeriodic(fc, retentionDuration, rg, compactable) |
| |
| tb.Run() |
| tb.Pause() |
| |
| n := tb.getRetentions() |
| |
| // tb will collect 3 hours of revisions but not compact since paused |
| for i := 0; i < n*3; i++ { |
| rg.Wait(1) |
| fc.Advance(tb.getRetryInterval()) |
| } |
| // t.revs = [21 22 23 24 25 26 27 28 29 30] |
| |
| select { |
| case a := <-compactable.Chan(): |
| t.Fatalf("unexpected action %v", a) |
| case <-time.After(10 * time.Millisecond): |
| } |
| |
| // tb resumes to being blocked on the clock |
| tb.Resume() |
| rg.Wait(1) |
| |
| // unblock clock, will kick off a compaction at T=3h6m by retry |
| fc.Advance(tb.getRetryInterval()) |
| |
| // T=3h6m |
| a, err := compactable.Wait(1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // compact the revision from hour 2:06 |
| wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} |
| if !reflect.DeepEqual(a[0].Params[0], wreq) { |
| t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) |
| } |
| } |