blob: 8162301147ecd856be69c812b1bc52bfb52dc9d0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"sync"
)
// BlockingQueue is a interface of block queue
type BlockingQueue interface {
// Put enqueue one item, block if the queue is full
Put(item interface{})
// Take dequeue one item, block until it's available
Take() interface{}
// Poll dequeue one item, return nil if queue is empty
Poll() interface{}
// Peek return the first item without dequeing, return nil if queue is empty
Peek() interface{}
// PeekLast return last item in queue without dequeing, return nil if queue is empty
PeekLast() interface{}
// Size return the current size of the queue
Size() int
// ReadableSlice returns a new view of the readable items in the queue
ReadableSlice() []interface{}
}
type blockingQueue struct {
items []interface{}
headIdx int
tailIdx int
size int
maxSize int
mutex sync.Mutex
isNotEmpty *sync.Cond
isNotFull *sync.Cond
}
// NewBlockingQueue init block queue and returns a BlockingQueue
func NewBlockingQueue(maxSize int) BlockingQueue {
bq := &blockingQueue{
items: make([]interface{}, maxSize),
headIdx: 0,
tailIdx: 0,
size: 0,
maxSize: maxSize,
}
bq.isNotEmpty = sync.NewCond(&bq.mutex)
bq.isNotFull = sync.NewCond(&bq.mutex)
return bq
}
func (bq *blockingQueue) Put(item interface{}) {
bq.mutex.Lock()
defer bq.mutex.Unlock()
for bq.size == bq.maxSize {
bq.isNotFull.Wait()
}
wasEmpty := bq.size == 0
bq.items[bq.tailIdx] = item
bq.size++
bq.tailIdx++
if bq.tailIdx >= bq.maxSize {
bq.tailIdx = 0
}
if wasEmpty {
// Wake up eventual reader waiting for next item
bq.isNotEmpty.Signal()
}
}
func (bq *blockingQueue) Take() interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
for bq.size == 0 {
bq.isNotEmpty.Wait()
}
return bq.dequeue()
}
func (bq *blockingQueue) Poll() interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
if bq.size == 0 {
return nil
}
return bq.dequeue()
}
func (bq *blockingQueue) Peek() interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
if bq.size == 0 {
return nil
}
return bq.items[bq.headIdx]
}
func (bq *blockingQueue) PeekLast() interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
if bq.size == 0 {
return nil
}
idx := (bq.headIdx + bq.size - 1) % bq.maxSize
return bq.items[idx]
}
func (bq *blockingQueue) dequeue() interface{} {
item := bq.items[bq.headIdx]
bq.items[bq.headIdx] = nil
bq.headIdx++
if bq.headIdx == len(bq.items) {
bq.headIdx = 0
}
bq.size--
bq.isNotFull.Signal()
return item
}
func (bq *blockingQueue) Size() int {
bq.mutex.Lock()
defer bq.mutex.Unlock()
return bq.size
}
func (bq *blockingQueue) ReadableSlice() []interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
res := make([]interface{}, bq.size)
readIdx := bq.headIdx
for i := 0; i < bq.size; i++ {
res[i] = bq.items[readIdx]
readIdx++
if readIdx == bq.maxSize {
readIdx = 0
}
}
return res
}