| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package testutil |
| |
| import ( |
| "errors" |
| "fmt" |
| "sync" |
| "time" |
| ) |
| |
| type Action struct { |
| Name string |
| Params []interface{} |
| } |
| |
| type Recorder interface { |
| // Record publishes an Action (e.g., function call) which will |
| // be reflected by Wait() or Chan() |
| Record(a Action) |
| // Wait waits until at least n Actions are available or returns with error |
| Wait(n int) ([]Action, error) |
| // Action returns immediately available Actions |
| Action() []Action |
| // Chan returns the channel for actions published by Record |
| Chan() <-chan Action |
| } |
| |
| // RecorderBuffered appends all Actions to a slice |
| type RecorderBuffered struct { |
| sync.Mutex |
| actions []Action |
| } |
| |
| func (r *RecorderBuffered) Record(a Action) { |
| r.Lock() |
| r.actions = append(r.actions, a) |
| r.Unlock() |
| } |
| func (r *RecorderBuffered) Action() []Action { |
| r.Lock() |
| cpy := make([]Action, len(r.actions)) |
| copy(cpy, r.actions) |
| r.Unlock() |
| return cpy |
| } |
| func (r *RecorderBuffered) Wait(n int) (acts []Action, err error) { |
| // legacy racey behavior |
| WaitSchedule() |
| acts = r.Action() |
| if len(acts) < n { |
| err = newLenErr(n, len(acts)) |
| } |
| return acts, err |
| } |
| |
| func (r *RecorderBuffered) Chan() <-chan Action { |
| ch := make(chan Action) |
| go func() { |
| acts := r.Action() |
| for i := range acts { |
| ch <- acts[i] |
| } |
| close(ch) |
| }() |
| return ch |
| } |
| |
| // RecorderStream writes all Actions to an unbuffered channel |
| type recorderStream struct { |
| ch chan Action |
| } |
| |
| func NewRecorderStream() Recorder { |
| return &recorderStream{ch: make(chan Action)} |
| } |
| |
| func (r *recorderStream) Record(a Action) { |
| r.ch <- a |
| } |
| |
| func (r *recorderStream) Action() (acts []Action) { |
| for { |
| select { |
| case act := <-r.ch: |
| acts = append(acts, act) |
| default: |
| return acts |
| } |
| } |
| } |
| |
| func (r *recorderStream) Chan() <-chan Action { |
| return r.ch |
| } |
| |
| func (r *recorderStream) Wait(n int) ([]Action, error) { |
| acts := make([]Action, n) |
| timeoutC := time.After(5 * time.Second) |
| for i := 0; i < n; i++ { |
| select { |
| case acts[i] = <-r.ch: |
| case <-timeoutC: |
| acts = acts[:i] |
| return acts, newLenErr(n, i) |
| } |
| } |
| // extra wait to catch any Action spew |
| select { |
| case act := <-r.ch: |
| acts = append(acts, act) |
| case <-time.After(10 * time.Millisecond): |
| } |
| return acts, nil |
| } |
| |
| func newLenErr(expected int, actual int) error { |
| s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected) |
| return errors.New(s) |
| } |