blob: 38301abef8a839e1aab476d556e61023e022bf39 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"context"
"sync"
"sync/atomic"
"unsafe"
)
type chCond struct {
L sync.Locker
// The pointer to the channel, the channel pointed to may change,
// because we will use the channel's close mechanism to implement broadcast notifications.
notifyChPtr unsafe.Pointer
}
func newCond(l sync.Locker) *chCond {
c := &chCond{L: l}
n := make(chan struct{})
c.notifyChPtr = unsafe.Pointer(&n)
return c
}
// wait for broadcast calls. Similar to regular sync.Cond
func (c *chCond) wait() {
n := c.notifyChan()
c.L.Unlock()
<-n
c.L.Lock()
}
// waitWithContext Same as wait() call, but the end condition can also be controlled through the context.
func (c *chCond) waitWithContext(ctx context.Context) bool {
n := c.notifyChan()
c.L.Unlock()
defer c.L.Lock()
select {
case <-n:
return true
case <-ctx.Done():
return false
default:
return true
}
}
// broadcast wakes all goroutines waiting on c.
// It is not required for the caller to hold c.L during the call.
func (c *chCond) broadcast() {
n := make(chan struct{})
ptrOld := atomic.SwapPointer(&c.notifyChPtr, unsafe.Pointer(&n))
// close old channels to trigger broadcast.
close(*(*chan struct{})(ptrOld))
}
func (c *chCond) notifyChan() <-chan struct{} {
ptr := atomic.LoadPointer(&c.notifyChPtr)
return *((*chan struct{})(ptr))
}