blob: 8fe9e5f512e52c4ac672565a2fa2150c85a42232 [file] [log] [blame]
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"sync"
)
type watchBroadcasts struct {
wp *watchProxy
// mu protects bcasts and watchers from the coalesce loop.
mu sync.Mutex
bcasts map[*watchBroadcast]struct{}
watchers map[*watcher]*watchBroadcast
updatec chan *watchBroadcast
donec chan struct{}
}
// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
const maxCoalesceReceivers = 5
func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
wbs := &watchBroadcasts{
wp: wp,
bcasts: make(map[*watchBroadcast]struct{}),
watchers: make(map[*watcher]*watchBroadcast),
updatec: make(chan *watchBroadcast, 1),
donec: make(chan struct{}),
}
go func() {
defer close(wbs.donec)
for wb := range wbs.updatec {
wbs.coalesce(wb)
}
}()
return wbs
}
func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
if wb.size() >= maxCoalesceReceivers {
return
}
wbs.mu.Lock()
for wbswb := range wbs.bcasts {
if wbswb == wb {
continue
}
wb.mu.Lock()
wbswb.mu.Lock()
// 1. check if wbswb is behind wb so it won't skip any events in wb
// 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
// for a current watcher and expects a create event from the server.
if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
for w := range wb.receivers {
wbswb.receivers[w] = struct{}{}
wbs.watchers[w] = wbswb
}
wb.receivers = nil
}
wbswb.mu.Unlock()
wb.mu.Unlock()
if wb.empty() {
delete(wbs.bcasts, wb)
wb.stop()
break
}
}
wbs.mu.Unlock()
}
func (wbs *watchBroadcasts) add(w *watcher) {
wbs.mu.Lock()
defer wbs.mu.Unlock()
// find fitting bcast
for wb := range wbs.bcasts {
if wb.add(w) {
wbs.watchers[w] = wb
return
}
}
// no fit; create a bcast
wb := newWatchBroadcast(wbs.wp, w, wbs.update)
wbs.watchers[w] = wb
wbs.bcasts[wb] = struct{}{}
}
// delete removes a watcher and returns the number of remaining watchers.
func (wbs *watchBroadcasts) delete(w *watcher) int {
wbs.mu.Lock()
defer wbs.mu.Unlock()
wb, ok := wbs.watchers[w]
if !ok {
panic("deleting missing watcher from broadcasts")
}
delete(wbs.watchers, w)
wb.delete(w)
if wb.empty() {
delete(wbs.bcasts, wb)
wb.stop()
}
return len(wbs.bcasts)
}
func (wbs *watchBroadcasts) stop() {
wbs.mu.Lock()
for wb := range wbs.bcasts {
wb.stop()
}
wbs.bcasts = nil
close(wbs.updatec)
wbs.mu.Unlock()
<-wbs.donec
}
func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
select {
case wbs.updatec <- wb:
default:
}
}