| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package recipe |
| |
| import ( |
| "context" |
| |
| "github.com/coreos/etcd/clientv3" |
| "github.com/coreos/etcd/clientv3/concurrency" |
| "github.com/coreos/etcd/mvcc/mvccpb" |
| ) |
| |
| // DoubleBarrier blocks processes on Enter until an expected count enters, then |
| // blocks again on Leave until all processes have left. |
| type DoubleBarrier struct { |
| s *concurrency.Session |
| ctx context.Context |
| |
| key string // key for the collective barrier |
| count int |
| myKey *EphemeralKV // current key for this process on the barrier |
| } |
| |
| func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier { |
| return &DoubleBarrier{ |
| s: s, |
| ctx: context.TODO(), |
| key: key, |
| count: count, |
| } |
| } |
| |
| // Enter waits for "count" processes to enter the barrier then returns |
| func (b *DoubleBarrier) Enter() error { |
| client := b.s.Client() |
| ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters") |
| if err != nil { |
| return err |
| } |
| b.myKey = ek |
| |
| resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) |
| if err != nil { |
| return err |
| } |
| |
| if len(resp.Kvs) > b.count { |
| return ErrTooManyClients |
| } |
| |
| if len(resp.Kvs) == b.count { |
| // unblock waiters |
| _, err = client.Put(b.ctx, b.key+"/ready", "") |
| return err |
| } |
| |
| _, err = WaitEvents( |
| client, |
| b.key+"/ready", |
| ek.Revision(), |
| []mvccpb.Event_EventType{mvccpb.PUT}) |
| return err |
| } |
| |
| // Leave waits for "count" processes to leave the barrier then returns |
| func (b *DoubleBarrier) Leave() error { |
| client := b.s.Client() |
| resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) |
| if err != nil { |
| return err |
| } |
| if len(resp.Kvs) == 0 { |
| return nil |
| } |
| |
| lowest, highest := resp.Kvs[0], resp.Kvs[0] |
| for _, k := range resp.Kvs { |
| if k.ModRevision < lowest.ModRevision { |
| lowest = k |
| } |
| if k.ModRevision > highest.ModRevision { |
| highest = k |
| } |
| } |
| isLowest := string(lowest.Key) == b.myKey.Key() |
| |
| if len(resp.Kvs) == 1 { |
| // this is the only node in the barrier; finish up |
| if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil { |
| return err |
| } |
| return b.myKey.Delete() |
| } |
| |
| // this ensures that if a process fails, the ephemeral lease will be |
| // revoked, its barrier key is removed, and the barrier can resume |
| |
| // lowest process in node => wait on highest process |
| if isLowest { |
| _, err = WaitEvents( |
| client, |
| string(highest.Key), |
| highest.ModRevision, |
| []mvccpb.Event_EventType{mvccpb.DELETE}) |
| if err != nil { |
| return err |
| } |
| return b.Leave() |
| } |
| |
| // delete self and wait on lowest process |
| if err = b.myKey.Delete(); err != nil { |
| return err |
| } |
| |
| key := string(lowest.Key) |
| _, err = WaitEvents( |
| client, |
| key, |
| lowest.ModRevision, |
| []mvccpb.Event_EventType{mvccpb.DELETE}) |
| if err != nil { |
| return err |
| } |
| return b.Leave() |
| } |