blob: 50bd42889a544787c8c42039a0452164ce474639 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util
import (
"container/list"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)
func NewMsgQueueForTest(maxMsgNum int) *MsgQueue {
q := new(MsgQueue)
q.msgList = list.New()
q.mtx = new(sync.Mutex)
q.msgCount = 0
q.maxMsgNum = maxMsgNum
q.state = Actived
q.notEmptyCond = sync.NewCond(q.mtx)
q.notFullCond = sync.NewCond(q.mtx)
return q
}
func TestMsgQueue(t *testing.T) {
t.Run("case empty", func(t *testing.T) {
q := NewMsgQueueForTest(2)
// append msg
eMSG := "msg to send"
err := q.Enqueue(eMSG)
assert.NoError(t, err)
assert.Equal(t, false, q.isEmpty())
assert.Equal(t, false, q.isFull())
// pop msg
dMSG, err := q.Dequeue()
assert.NoError(t, err)
assert.Equal(t, eMSG, dMSG)
// case empty
done := make(chan struct{})
go func(done chan struct{}) {
dMSG, err = q.Dequeue()
assert.NoError(t, err)
// Deavtive
q.Deavtive()
// error
_, err = q.Dequeue()
assert.Error(t, err)
if done != nil {
close(done)
}
}(done)
select {
case <-time.After(time.Second * 10):
case <-done:
}
})
t.Run("case Full", func(t *testing.T) {
eMSG := "msg to send"
// case Full
q1 := NewMsgQueueForTest(2)
err := q1.Enqueue(eMSG)
assert.NoError(t, err)
err = q1.Enqueue(eMSG)
assert.NoError(t, err)
done1 := make(chan struct{})
go func(c chan struct{}) {
err = q1.Enqueue(eMSG)
t.Log(err)
assert.Error(t, err)
if c != nil {
close(c)
}
}(done1)
select {
case <-time.After(time.Second * 10):
case <-done1:
}
// Deavtive
q1.Deavtive()
// error
err = q1.Enqueue(eMSG)
assert.Error(t, err)
})
}