| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package server |
| |
| import ( |
| "time" |
| ) |
| |
| func (s *MdsServer) debounce(ch chan *RegisterRequest, stopCh <-chan struct{}, pushFn func(m *RegisterRequest)) { |
| var timeChan <-chan time.Time |
| var startDebounce time.Time |
| var lastConfigUpdateTime time.Time |
| |
| pushCounter := 0 |
| debouncedEvents := 0 |
| |
| var req *RegisterRequest |
| |
| free := true |
| freeCh := make(chan struct{}, 1) |
| |
| push := func(req *RegisterRequest) { |
| pushFn(req) |
| freeCh <- struct{}{} |
| } |
| |
| pushWorker := func() { |
| eventDelay := time.Since(startDebounce) |
| quietTime := time.Since(lastConfigUpdateTime) |
| if eventDelay >= s.config.Debounce.Max || quietTime >= s.config.Debounce.After { |
| if req != nil { |
| pushCounter++ |
| free = false |
| go push(req) |
| req = nil |
| debouncedEvents = 0 |
| } |
| } else { |
| timeChan = time.After(s.config.Debounce.After - quietTime) |
| } |
| } |
| |
| for { |
| select { |
| case <-freeCh: |
| free = true |
| pushWorker() |
| case r := <-ch: |
| if !s.config.Debounce.Enable { |
| go push(r) |
| req = nil |
| continue |
| } |
| |
| lastConfigUpdateTime = time.Now() |
| if debouncedEvents == 0 { |
| timeChan = time.After(200 * time.Millisecond) |
| startDebounce = lastConfigUpdateTime |
| } |
| debouncedEvents++ |
| |
| req = req.Merge(r) |
| case <-timeChan: |
| if free { |
| pushWorker() |
| } |
| case <-stopCh: |
| return |
| } |
| } |
| } |