feat: remote extra code for internal/utils (#370)
Closes #369
diff --git a/internal/utils/ring_buffer.go b/internal/utils/ring_buffer.go
deleted file mode 100644
index cab385a..0000000
--- a/internal/utils/ring_buffer.go
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package utils
-
-import (
- "runtime"
- "sync/atomic"
- "time"
-)
-
-// 1.需要能够动态扩容
-// 2.缩容看情况
-// 3.read的时候需要block
-// 4.线程安全
-type RingNodesBuffer struct {
- writePos uint64
- readPos uint64
- mask uint64
-
- nodes nodes
-}
-
-type node struct {
- position uint64
- buf []byte
-}
-
-type nodes []*node
-
-// roundUp takes a uint64 greater than 0 and rounds it up to the next
-// power of 2.
-func roundUp(v uint64) uint64 {
- v--
- v |= v >> 1
- v |= v >> 2
- v |= v >> 4
- v |= v >> 8
- v |= v >> 16
- v |= v >> 32
- v++
- return v
-}
-
-func (rb *RingNodesBuffer) init(size uint64) {
- size = roundUp(size)
- rb.nodes = make(nodes, size)
- for i := uint64(0); i < size; i++ {
- rb.nodes[i] = &node{position: i}
- }
- rb.mask = size - 1 // so we don't have to do this with every put/get operation
-}
-
-func NewRingNodesBuffer(cap uint64) *RingNodesBuffer {
- rb := &RingNodesBuffer{}
- rb.init(cap)
- //go rb.resize()
- return rb
-}
-
-func (r *RingNodesBuffer) Write(b []byte) error {
- var n *node
- var dif uint64
- pos := atomic.LoadUint64(&r.writePos)
- i := 0
-L:
- for {
- // pos 16 seq 1. 0001 0000 00001111 0001 1111
- n = r.nodes[pos&r.mask]
- seq := atomic.LoadUint64(&n.position)
- switch dif = seq - pos; {
- case dif == 0:
- if atomic.CompareAndSwapUint64(&r.writePos, pos, pos+1) {
- break L
- }
- default:
- pos = atomic.LoadUint64(&r.writePos)
- }
- if i == 10000 {
- runtime.Gosched() // free up the cpu before the next iteration
- i = 0
- } else {
- i++
- }
- }
-
- n.buf = b
- atomic.StoreUint64(&n.position, pos+1)
- return nil
-}
-
-// 直接返回数据
-func (r *RingNodesBuffer) Read(timeout time.Duration) (data []byte, err error) {
- var (
- node *node
- pos = atomic.LoadUint64(&r.readPos)
- start time.Time
- dif uint64
- )
- if timeout > 0 {
- start = time.Now()
- }
- i := 0
-L:
- for {
- node = r.nodes[pos&r.mask]
- seq := atomic.LoadUint64(&node.position)
- switch dif = seq - (pos + 1); {
- case dif == 0:
- if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
- break L
- }
- default:
- pos = atomic.LoadUint64(&r.readPos)
- }
- if timeout > 0 && time.Since(start) >= timeout {
- return
- }
- if i == 10000 {
- runtime.Gosched() // free up the cpu before the next iteration
- i = 0
- } else {
- i++
- }
- }
- data = node.buf
- atomic.StoreUint64(&node.position, pos+r.mask+1)
- return
-}
-
-// 知道大小,传进去解析
-func (r *RingNodesBuffer) ReadBySize(data []byte, timeout time.Duration) (n int, err error) {
- var (
- node *node
- pos = atomic.LoadUint64(&r.readPos)
- start time.Time
- dif uint64
- )
- i := 0
- if timeout > 0 {
- start = time.Now()
- }
-L:
- for {
- node = r.nodes[pos&r.mask]
- seq := atomic.LoadUint64(&node.position)
- switch dif = seq - (pos + 1); {
- case dif == 0:
- if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
- break L
- }
- default:
- pos = atomic.LoadUint64(&r.readPos)
- }
- if timeout > 0 && time.Since(start) >= timeout {
- return
- }
- if i == 10000 {
- runtime.Gosched() // free up the cpu before the next iteration
- i = 0
- } else {
- i++
- }
- }
- n = copy(data, node.buf)
- atomic.StoreUint64(&node.position, pos+r.mask+1)
- return
-}
-
-func (r *RingNodesBuffer) Size() uint64 {
- return atomic.LoadUint64(&r.writePos) - atomic.LoadUint64(&r.readPos)
-
-}
-
-// Cap returns the capacity of this ring buffer.
-func (rb *RingNodesBuffer) Cap() uint64 {
- return uint64(len(rb.nodes))
-}
-
-func (r *RingNodesBuffer) Destroy() {
-
-}
-
-func (r *RingNodesBuffer) resize() {
- // TODO
-}
diff --git a/internal/utils/ring_buffer_test.go b/internal/utils/ring_buffer_test.go
deleted file mode 100644
index 02a8cc0..0000000
--- a/internal/utils/ring_buffer_test.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package utils
-
-import (
- "fmt"
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestRingRead(t *testing.T) {
- rb := NewRingNodesBuffer(5)
- assert.Equal(t, uint64(8), rb.Cap())
-
- err := rb.Write([]byte("hello"))
- if !assert.Nil(t, err) {
- return
- }
- data, err := rb.Read(1 * time.Second)
- if !assert.Nil(t, err) {
- return
- }
-
- assert.Equal(t, "hello", string(data))
-}
-
-func TestRingReadBySize(t *testing.T) {
- rb := NewRingNodesBuffer(5)
- assert.Equal(t, uint64(8), rb.Cap())
-
- err := rb.Write([]byte("hello"))
- if !assert.Nil(t, err) {
- return
- }
- sink := make([]byte, 5)
- n, err := rb.ReadBySize(sink, 1*time.Second)
- if !assert.Nil(t, err) {
- return
- }
-
- assert.Equal(t, 5, n)
- assert.Equal(t, "hello", string(sink))
-}
-
-func BenchmarkRingReadBufferMPMC(b *testing.B) {
- q := NewRingNodesBuffer(uint64(b.N * 100))
- var wg sync.WaitGroup
- wg.Add(100)
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < 100; i++ {
- go func() {
- for i := 0; i < b.N; i++ {
- q.Write([]byte(strconv.Itoa(i)))
- }
- }()
- }
-
- for i := 0; i < 100; i++ {
- go func() {
- for i := 0; i < b.N; i++ {
- _ = len(strconv.Itoa(i))
- var p []byte
- p, _ = q.Read(1 * time.Second)
- fmt.Sprintf("%v", p)
-
- }
- wg.Done()
- }()
- }
-
- wg.Wait()
-}
-
-func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
- q := NewRingNodesBuffer(uint64(b.N * 100))
- var wg sync.WaitGroup
- wg.Add(100)
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < 100; i++ {
- go func() {
- for i := 0; i < b.N; i++ {
- q.Write([]byte(strconv.Itoa(i)))
- }
- }()
- }
-
- for i := 0; i < 100; i++ {
- go func() {
- for i := 0; i < b.N; i++ {
- p := make([]byte, len(strconv.Itoa(i)))
- q.ReadBySize(p, 1*time.Second)
- }
- wg.Done()
- }()
- }
-
- wg.Wait()
-}
diff --git a/internal/utils/string.go b/internal/utils/string.go
index 964a92c..f347397 100644
--- a/internal/utils/string.go
+++ b/internal/utils/string.go
@@ -17,10 +17,6 @@
package utils
-import (
- "fmt"
-)
-
// HashString hashes a string to a unique hashcode.
func HashString(s string) int {
val := []byte(s)
@@ -32,11 +28,3 @@
return int(h)
}
-
-func StrJoin(str, key string, value interface{}) string {
- if key == "" || value == "" {
- return str
- }
-
- return str + key + ": " + fmt.Sprint(value) + ", "
-}
diff --git a/internal/utils/string_test.go b/internal/utils/string_test.go
deleted file mode 100644
index 4e9e4ea..0000000
--- a/internal/utils/string_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package utils