| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package runner |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "log" |
| "sync" |
| "time" |
| |
| "github.com/coreos/etcd/clientv3" |
| "github.com/coreos/etcd/pkg/stringutil" |
| |
| "github.com/spf13/cobra" |
| "golang.org/x/time/rate" |
| ) |
| |
| var ( |
| runningTime time.Duration // time for which operation should be performed |
| noOfPrefixes int // total number of prefixes which will be watched upon |
| watchPerPrefix int // number of watchers per prefix |
| watchPrefix string // prefix append to keys in watcher |
| totalKeys int // total number of keys for operation |
| ) |
| |
| // NewWatchCommand returns the cobra command for "watcher runner". |
| func NewWatchCommand() *cobra.Command { |
| cmd := &cobra.Command{ |
| Use: "watcher", |
| Short: "Performs watch operation", |
| Run: runWatcherFunc, |
| } |
| cmd.Flags().DurationVar(&runningTime, "running-time", 60, "number of seconds to run") |
| cmd.Flags().StringVar(&watchPrefix, "prefix", "", "the prefix to append on all keys") |
| cmd.Flags().IntVar(&noOfPrefixes, "total-prefixes", 10, "total no of prefixes to use") |
| cmd.Flags().IntVar(&watchPerPrefix, "watch-per-prefix", 10, "number of watchers per prefix") |
| cmd.Flags().IntVar(&totalKeys, "total-keys", 1000, "total number of keys to watch") |
| |
| return cmd |
| } |
| |
| func runWatcherFunc(cmd *cobra.Command, args []string) { |
| if len(args) > 0 { |
| ExitWithError(ExitBadArgs, errors.New("watcher does not take any argument")) |
| } |
| |
| ctx := context.Background() |
| for round := 0; round < rounds || rounds <= 0; round++ { |
| fmt.Println("round", round) |
| performWatchOnPrefixes(ctx, cmd, round) |
| } |
| } |
| |
| func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int) { |
| keyPerPrefix := totalKeys / noOfPrefixes |
| prefixes := stringutil.UniqueStrings(5, noOfPrefixes) |
| keys := stringutil.RandomStrings(10, keyPerPrefix) |
| |
| roundPrefix := fmt.Sprintf("%16x", round) |
| |
| eps := endpointsFromFlag(cmd) |
| |
| var ( |
| revision int64 |
| wg sync.WaitGroup |
| gr *clientv3.GetResponse |
| err error |
| ) |
| |
| client := newClient(eps, dialTimeout) |
| defer client.Close() |
| |
| gr, err = getKey(ctx, client, "non-existent") |
| if err != nil { |
| log.Fatalf("failed to get the initial revision: %v", err) |
| } |
| revision = gr.Header.Revision |
| |
| ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime*time.Second)) |
| defer cancel() |
| |
| // generate and put keys in cluster |
| limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate) |
| |
| go func() { |
| for _, key := range keys { |
| for _, prefix := range prefixes { |
| if err = limiter.Wait(ctxt); err != nil { |
| return |
| } |
| if err = putKeyAtMostOnce(ctxt, client, watchPrefix+"-"+roundPrefix+"-"+prefix+"-"+key); err != nil { |
| log.Fatalf("failed to put key: %v", err) |
| return |
| } |
| } |
| } |
| }() |
| |
| ctxc, cancelc := context.WithCancel(ctx) |
| |
| wcs := make([]clientv3.WatchChan, 0) |
| rcs := make([]*clientv3.Client, 0) |
| |
| for _, prefix := range prefixes { |
| for j := 0; j < watchPerPrefix; j++ { |
| rc := newClient(eps, dialTimeout) |
| rcs = append(rcs, rc) |
| |
| wprefix := watchPrefix + "-" + roundPrefix + "-" + prefix |
| |
| wc := rc.Watch(ctxc, wprefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) |
| wcs = append(wcs, wc) |
| |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| checkWatchResponse(wc, wprefix, keys) |
| }() |
| } |
| } |
| wg.Wait() |
| |
| cancelc() |
| |
| // verify all watch channels are closed |
| for e, wc := range wcs { |
| if _, ok := <-wc; ok { |
| log.Fatalf("expected wc to be closed, but received %v", e) |
| } |
| } |
| |
| for _, rc := range rcs { |
| rc.Close() |
| } |
| |
| if err = deletePrefix(ctx, client, watchPrefix); err != nil { |
| log.Fatalf("failed to clean up keys after test: %v", err) |
| } |
| } |
| |
| func checkWatchResponse(wc clientv3.WatchChan, prefix string, keys []string) { |
| for n := 0; n < len(keys); { |
| wr, more := <-wc |
| if !more { |
| log.Fatalf("expect more keys (received %d/%d) for %s", n, len(keys), prefix) |
| } |
| for _, event := range wr.Events { |
| expectedKey := prefix + "-" + keys[n] |
| receivedKey := string(event.Kv.Key) |
| if expectedKey != receivedKey { |
| log.Fatalf("expected key %q, got %q for prefix : %q\n", expectedKey, receivedKey, prefix) |
| } |
| n++ |
| } |
| } |
| } |
| |
| func putKeyAtMostOnce(ctx context.Context, client *clientv3.Client, key string) error { |
| gr, err := getKey(ctx, client, key) |
| if err != nil { |
| return err |
| } |
| |
| var modrev int64 |
| if len(gr.Kvs) > 0 { |
| modrev = gr.Kvs[0].ModRevision |
| } |
| |
| for ctx.Err() == nil { |
| _, err := client.Txn(ctx).If(clientv3.Compare(clientv3.ModRevision(key), "=", modrev)).Then(clientv3.OpPut(key, key)).Commit() |
| |
| if err == nil { |
| return nil |
| } |
| } |
| |
| return ctx.Err() |
| } |
| |
| func deletePrefix(ctx context.Context, client *clientv3.Client, key string) error { |
| for ctx.Err() == nil { |
| if _, err := client.Delete(ctx, key, clientv3.WithPrefix()); err == nil { |
| return nil |
| } |
| } |
| return ctx.Err() |
| } |
| |
| func getKey(ctx context.Context, client *clientv3.Client, key string) (*clientv3.GetResponse, error) { |
| for ctx.Err() == nil { |
| if gr, err := client.Get(ctx, key); err == nil { |
| return gr, nil |
| } |
| } |
| return nil, ctx.Err() |
| } |