| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package recipe |
| |
| import ( |
| "context" |
| |
| v3 "github.com/coreos/etcd/clientv3" |
| "github.com/coreos/etcd/clientv3/concurrency" |
| "github.com/coreos/etcd/mvcc/mvccpb" |
| ) |
| |
| type RWMutex struct { |
| s *concurrency.Session |
| ctx context.Context |
| |
| pfx string |
| myKey *EphemeralKV |
| } |
| |
| func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex { |
| return &RWMutex{s, context.TODO(), prefix + "/", nil} |
| } |
| |
| func (rwm *RWMutex) RLock() error { |
| rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"read") |
| if err != nil { |
| return err |
| } |
| rwm.myKey = rk |
| // wait until nodes with "write-" and a lower revision number than myKey are gone |
| for { |
| if done, werr := rwm.waitOnLastRev(rwm.pfx + "write"); done || werr != nil { |
| return werr |
| } |
| } |
| } |
| |
| func (rwm *RWMutex) Lock() error { |
| rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"write") |
| if err != nil { |
| return err |
| } |
| rwm.myKey = rk |
| // wait until all keys of lower revision than myKey are gone |
| for { |
| if done, werr := rwm.waitOnLastRev(rwm.pfx); done || werr != nil { |
| return werr |
| } |
| // get the new lowest key until this is the only one left |
| } |
| } |
| |
| // waitOnLowest will wait on the last key with a revision < rwm.myKey.Revision with a |
| // given prefix. If there are no keys left to wait on, return true. |
| func (rwm *RWMutex) waitOnLastRev(pfx string) (bool, error) { |
| client := rwm.s.Client() |
| // get key that's blocking myKey |
| opts := append(v3.WithLastRev(), v3.WithMaxModRev(rwm.myKey.Revision()-1)) |
| lastKey, err := client.Get(rwm.ctx, pfx, opts...) |
| if err != nil { |
| return false, err |
| } |
| if len(lastKey.Kvs) == 0 { |
| return true, nil |
| } |
| // wait for release on blocking key |
| _, err = WaitEvents( |
| client, |
| string(lastKey.Kvs[0].Key), |
| rwm.myKey.Revision(), |
| []mvccpb.Event_EventType{mvccpb.DELETE}) |
| return false, err |
| } |
| |
| func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() } |
| func (rwm *RWMutex) Unlock() error { return rwm.myKey.Delete() } |