add memory queue (#22)
* add memory queue
* polish codes
* polish codes
* polish doc
* polish codes
* convert queue to goconcurrentqueue
* polish doc
Co-authored-by: Evan <evanljp@outlook.com>
diff --git a/dist/LICENSE b/dist/LICENSE
index 084dfee..864ce75 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -240,5 +240,6 @@
grandecola (mmap) v0.6.0: https://github.com/grandecola/mmap MIT
grandecola (bigqueue) v0.4.0: https://github.com/grandecola/bigqueue MIT
Shopify (sarama) v1.27.2: https://github.com/Shopify/sarama MIT
+ enriquebris (goconcurrentqueue) v0.6.0: https://github.com/enriquebris/goconcurrentqueue MIT
diff --git a/dist/licenses/LICENSE-goconcurrentqueue.txt b/dist/licenses/LICENSE-goconcurrentqueue.txt
new file mode 100644
index 0000000..923df7b
--- /dev/null
+++ b/dist/licenses/LICENSE-goconcurrentqueue.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2019 Enrique Bris
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/docs/plugin-description.md b/docs/plugin-description.md
index 0b7506a..8b4d973 100755
--- a/docs/plugin-description.md
+++ b/docs/plugin-description.md
@@ -77,6 +77,15 @@
topic: "log-topic"
```
# api.Queue
+## memory-queue
+### description
+this is a memory queue to buffer the input event.
+### defaultConfig
+```yaml
+# The maximum buffer event size.
+event_buffer_size: 5000
+```
+# api.Queue
## mmap-queue
### description
this is a memory mapped queue to provide the persistent storage.
@@ -120,8 +129,8 @@
this is a grpc server
### defaultConfig
```yaml
-# The address of grpc server. Default value is :8000
-address: :8000
+# The address of grpc server. Default value is :11800
+address: :11800
# The network of grpc. Default value is :tcp
network: tcp
# The max size of receiving log. Default value is 2M. The unit is Byte.
@@ -140,7 +149,7 @@
### defaultConfig
```yaml
# The http server address.
-address: ":8080"
+address: ":12800"
```
# api.Server
## prometheus-server
diff --git a/docs/plugins/queue/mmap/README.md b/docs/plugins/queue/mmap/README.md
index 7d6845f..1b76412 100644
--- a/docs/plugins/queue/mmap/README.md
+++ b/docs/plugins/queue/mmap/README.md
@@ -47,40 +47,40 @@
System Firmware Version: 1554.60.15.0.0 (iBridge: 18.16.13030.0.0,0
```
-### push operation
+### enqueue operation
```
goos: darwin
goarch: amd64
pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
-BenchmarkPush
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 45946 28185 ns/op 9884 B/op 9 allocs/op
-BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
-BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 68137 19142 ns/op 9838 B/op 9 allocs/op
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000-12 47361 22318 ns/op 9884 B/op 9 allocs/op
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000-12 24478 45501 ns/op 18934 B/op 10 allocs/op
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
-BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000-12 45691 29413 ns/op 9884 B/op 9 allocs/op
+BenchmarkEnqueue
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 45946 28185 ns/op 9884 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 68137 19142 ns/op 9838 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000-12 47361 22318 ns/op 9884 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000-12 24478 45501 ns/op 18934 B/op 10 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000-12 45691 29413 ns/op 9884 B/op 9 allocs/op
PASS
```
-### push and pop operation
+### enqueue and dequeue operation
```
goos: darwin
goarch: amd64
pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
-BenchmarkPushAndPop
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 30657 34182 ns/op 28725 B/op 41 allocs/op
-BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
-BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 34617 31619 ns/op 28677 B/op 41 allocs/op
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000-12 32440 38439 ns/op 28726 B/op 41 allocs/op
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000-12 18554 56840 ns/op 54931 B/op 42 allocs/op
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
-BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000-12 27303 40016 ns/op 28725 B/op 41 allocs/op
+BenchmarkEnqueueAndDequeue
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 30657 34182 ns/op 28725 B/op 41 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000-12 34617 31619 ns/op 28677 B/op 41 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:20_message:8KB_queueCapacity:10000-12 32440 38439 ns/op 28726 B/op 41 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000-12 18554 56840 ns/op 54931 B/op 42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000-12 27303 40016 ns/op 28725 B/op 41 allocs/op
PASS
```
diff --git a/go.mod b/go.mod
index 1008ef1..0c7e00c 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@
require (
github.com/Shopify/sarama v1.27.2
+ github.com/enriquebris/goconcurrentqueue v0.6.0
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.4
github.com/grandecola/mmap v0.6.0
diff --git a/go.sum b/go.sum
index 9826328..13be86f 100644
--- a/go.sum
+++ b/go.sum
@@ -83,6 +83,8 @@
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
+github.com/enriquebris/goconcurrentqueue v0.6.0 h1:DJ97cgoPVoqlC4tTGBokn/omaB3o16yIs5QdAm6YEjc=
+github.com/enriquebris/goconcurrentqueue v0.6.0/go.mod h1:wGJhQNFI4wLNHleZLo5ehk1puj8M6OIl0tOjs3kwJus=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 5a66b7b..2695e3d 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -75,7 +75,7 @@
case <-timeTicker.C:
events := f.runningFetcher.Fetch()
for _, e := range events {
- err := f.runningQueue.Push(e)
+ err := f.runningQueue.Enqueue(e)
f.fetchCounter.WithLabelValues(f.config.PipeName, "all").Inc()
if err != nil {
f.fetchCounter.WithLabelValues(f.config.PipeName, "abandoned").Inc()
@@ -100,7 +100,7 @@
f.Shutdown()
return
default:
- if e, err := f.runningQueue.Pop(); err == nil {
+ if e, err := f.runningQueue.Dequeue(); err == nil {
f.outputChannel <- e
f.queueOutputCounter.WithLabelValues(f.config.PipeName, "success").Inc()
} else if err == queue.ErrEmpty {
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index 59fc02d..aadcbdd 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -82,7 +82,7 @@
select {
case e := <-r.runningReceiver.Channel():
r.receiveCounter.WithLabelValues(r.config.PipeName, "all").Inc()
- err := r.runningQueue.Push(e)
+ err := r.runningQueue.Enqueue(e)
if err != nil {
r.receiveCounter.WithLabelValues(r.config.PipeName, "abandoned").Inc()
log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", r.config.PipeName, err)
@@ -105,7 +105,7 @@
r.Shutdown()
return
default:
- if e, err := r.runningQueue.Pop(); err == nil {
+ if e, err := r.runningQueue.Dequeue(); err == nil {
r.outputChannel <- e
r.queueOutputCounter.WithLabelValues(r.config.PipeName, "success").Inc()
} else if err == queue.ErrEmpty {
diff --git a/plugins/queue/api/error.go b/plugins/queue/api/error.go
index ff58e95..05c7cec 100644
--- a/plugins/queue/api/error.go
+++ b/plugins/queue/api/error.go
@@ -21,5 +21,5 @@
var (
ErrEmpty = errors.New("cannot read data when the queue is empty")
- ErrFull = errors.New("cannot push data when the queue is full")
+ ErrFull = errors.New("cannot write data when the queue is full")
)
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index 0788b27..1b67301 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -32,11 +32,11 @@
// Initialize creates the queue.
Initialize() error
- // Push a inputEvent into the queue.
- Push(event *protocol.Event) error
+ // Enqueue a inputEvent into the queue.
+ Enqueue(event *protocol.Event) error
- // Pop returns a SequenceEvent when Queue is not empty,
- Pop() (*SequenceEvent, error)
+ // Dequeue returns a SequenceEvent when Queue is not empty,
+ Dequeue() (*SequenceEvent, error)
// Close would close the queue.
Close() error
diff --git a/plugins/queue/memory/queue.go b/plugins/queue/memory/queue.go
new file mode 100644
index 0000000..3a34e00
--- /dev/null
+++ b/plugins/queue/memory/queue.go
@@ -0,0 +1,87 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package memory
+
+import (
+ "github.com/enriquebris/goconcurrentqueue"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/event"
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+ "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+const (
+ Name = "memory-queue"
+)
+
+type Queue struct {
+ config.CommonFields
+ // config
+ EventBufferSize int `mapstructure:"event_buffer_size"` // The maximum buffer event size.
+
+ // components
+ buffer *goconcurrentqueue.FixedFIFO
+}
+
+func (q *Queue) Name() string {
+ return Name
+}
+
+func (q *Queue) Description() string {
+ return "this is a memory queue to buffer the input event."
+}
+
+func (q *Queue) DefaultConfig() string {
+ return `
+# The maximum buffer event size.
+event_buffer_size: 5000
+`
+}
+
+func (q *Queue) Initialize() error {
+ q.buffer = goconcurrentqueue.NewFixedFIFO(q.EventBufferSize)
+ return nil
+}
+
+func (q *Queue) Enqueue(e *protocol.Event) error {
+ if err := q.buffer.Enqueue(e); err != nil {
+ log.Logger.Errorf("error in enqueue: %v", err)
+ return api.ErrFull
+ }
+ return nil
+}
+
+func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
+ element, err := q.buffer.Dequeue()
+ if err != nil {
+ log.Logger.Errorf("error in dequeue: %v", err)
+ return nil, api.ErrEmpty
+ }
+ return &api.SequenceEvent{
+ Event: element.(*protocol.Event),
+ }, nil
+}
+
+func (q *Queue) Close() error {
+ return nil
+}
+
+func (q *Queue) Ack(_ event.Offset) {
+}
diff --git a/plugins/queue/memory/queue_test.go b/plugins/queue/memory/queue_test.go
new file mode 100644
index 0000000..0a7cbad
--- /dev/null
+++ b/plugins/queue/memory/queue_test.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package memory
+
+import (
+ "fmt"
+ "reflect"
+ "strconv"
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ _ "github.com/apache/skywalking-satellite/internal/satellite/test"
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+ "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+func initMemoryQueue(cfg plugin.Config) (*Queue, error) {
+ plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
+ plugin.RegisterPlugin(&Queue{})
+ var config plugin.Config = map[string]interface{}{
+ plugin.NameField: Name,
+ }
+ for k, v := range cfg {
+ config[k] = v
+ }
+ q := api.GetQueue(config)
+ if q == nil {
+ return nil, fmt.Errorf("cannot get a memoory queue from the registry")
+ }
+ if err := q.Initialize(); err != nil {
+ return nil, fmt.Errorf("queue cannot initialize: %v", err)
+ }
+ return q.(*Queue), nil
+}
+
+func TestQueue_Enqueue(t *testing.T) {
+ const num = 100000
+ q, err := initMemoryQueue(map[string]interface{}{
+ "event_buffer_size": num,
+ })
+ if err != nil {
+ t.Fatalf("cannot init the memory queue: %v", err)
+ }
+
+ if _, err := q.Dequeue(); err == nil {
+ t.Fatal("the dequeue want failure but success")
+ }
+
+ // enqueue
+ for i := 0; i <= num; i++ {
+ e := &protocol.Event{
+ Name: strconv.Itoa(i),
+ }
+ if i < num {
+ if err := q.Enqueue(e); err != nil {
+ t.Fatalf("the enqueue want seuccess but failure: %v", err)
+ }
+ } else {
+ if err := q.Enqueue(e); err == nil {
+ t.Fatal("the enqueue want failure but success when facing full")
+ }
+ }
+ }
+
+ // dequeue
+ for i := 0; i < num; i++ {
+ if e, err := q.Dequeue(); err != nil {
+ t.Fatalf("the dequeue want seuccess but failure: %v", err)
+ } else if e.Event.Name != strconv.Itoa(i) {
+ t.Fatalf("want got %s, but got %s", strconv.Itoa(i), e.Event.Name)
+ }
+ }
+ if _, err := q.Dequeue(); err == nil {
+ t.Fatal("the dequeue want failure but success")
+ }
+}
diff --git a/plugins/queue/mmap/branchmark_test.go b/plugins/queue/mmap/branchmark_test.go
index d4eb030..8b8e42d 100644
--- a/plugins/queue/mmap/branchmark_test.go
+++ b/plugins/queue/mmap/branchmark_test.go
@@ -50,13 +50,13 @@
}
}
-func BenchmarkPush(b *testing.B) {
+func BenchmarkEnqueue(b *testing.B) {
for _, param := range params {
name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB queueCapacity:%d",
param.segmentSize/1024, param.maxInMemSegments, param.message, param.queueCapacity)
b.Run(name, func(b *testing.B) {
q, err := initMmapQueue(plugin.Config{
- "queue_dir": "BenchmarkPush",
+ "queue_dir": "BenchmarkEnqueue",
"segment_size": param.segmentSize,
"max_in_mem_segments": param.maxInMemSegments,
"queue_capacity_segments": param.queueCapacity,
@@ -69,7 +69,7 @@
b.ResetTimer()
println()
for i := 0; i < b.N; i++ {
- if err := q.Push(event); err != nil {
+ if err := q.Enqueue(event); err != nil {
b.Fatalf("error in pushing: %v", err)
}
}
@@ -80,13 +80,13 @@
}
}
-func BenchmarkPushAndPop(b *testing.B) {
+func BenchmarkEnqueueAndDequeue(b *testing.B) {
for _, param := range params {
name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB queueCapacity:%d",
param.segmentSize/1024, param.maxInMemSegments, param.message, param.queueCapacity)
b.Run(name, func(b *testing.B) {
q, err := initMmapQueue(plugin.Config{
- "queue_dir": "BenchmarkPushAndPop",
+ "queue_dir": "BenchmarkEnqueueAndDequeue",
"segment_size": param.segmentSize,
"max_in_mem_segments": param.maxInMemSegments,
"queue_capacity_segments": param.queueCapacity,
@@ -99,11 +99,11 @@
b.ResetTimer()
println()
for i := 0; i < b.N; i++ {
- if err := q.Push(event); err != nil {
- b.Fatalf("error in pushing: %v", err)
+ if err := q.Enqueue(event); err != nil {
+ b.Fatalf("error in enqueue: %v", err)
}
- if _, err := q.Pop(); err != nil {
- b.Fatalf("error in pushing: %v", err)
+ if _, err := q.Dequeue(); err != nil {
+ b.Fatalf("error in enqueue: %v", err)
}
}
b.StopTimer()
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 46aecd6..5a5edda 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -144,19 +144,19 @@
return nil
}
-func (q *Queue) Push(e *protocol.Event) error {
+func (q *Queue) Enqueue(e *protocol.Event) error {
data, err := proto.Marshal(e)
if err != nil {
return err
}
if len(data) > q.MaxEventSize {
- return fmt.Errorf("cannot push the event to the queue because the size %dB is over ceiling", len(data))
+ return fmt.Errorf("cannot enqueue the event to the queue because the size %dB is over ceiling", len(data))
}
- return q.push(data)
+ return q.enqueue(data)
}
-func (q *Queue) Pop() (*api.SequenceEvent, error) {
- data, id, offset, err := q.pop()
+func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
+ data, id, offset, err := q.dequeue()
if err != nil {
return nil, err
}
diff --git a/plugins/queue/mmap/queue_opreation.go b/plugins/queue/mmap/queue_opreation.go
index 9516487..056069c 100644
--- a/plugins/queue/mmap/queue_opreation.go
+++ b/plugins/queue/mmap/queue_opreation.go
@@ -38,9 +38,9 @@
const uInt64Size = 8
-// push writes the data into the file system. It first writes the length of the data,
+// enqueue writes the data into the file system. It first writes the length of the data,
// then the data itself. It means the whole data may not exist in the one segments.
-func (q *Queue) push(bytes []byte) error {
+func (q *Queue) enqueue(bytes []byte) error {
if q.isFull() {
return api.ErrFull
}
@@ -62,9 +62,9 @@
return nil
}
-// pop reads the data from the file system. It first reads the length of the data,
+// dequeue reads the data from the file system. It first reads the length of the data,
// then the data itself. It means the whole data may not exist in the one segments.
-func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) {
if q.isEmpty() {
return nil, 0, 0, api.ErrEmpty
}
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index 9386af7..8d29562 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -155,16 +155,16 @@
}
events := getBatchEvents(10)
for _, e := range events {
- if err = q.Push(e); err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ if err = q.Enqueue(e); err != nil {
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
}
for i := 0; i < 10; i++ {
- sequenceEvent, err := q.Pop()
+ sequenceEvent, err := q.Dequeue()
if err != nil {
t.Errorf("error in fetching data from queue: %v", err)
} else if !cmp.Equal(events[i].String(), sequenceEvent.Event.String()) {
- t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[i], sequenceEvent.Event)
+ t.Errorf("history data and fetching data is not equal\n,history:%+v\n. dequeue data:%+v\n", events[i], sequenceEvent.Event)
}
}
}
@@ -199,8 +199,8 @@
}
for j := 0; j < batchNum; j++ {
index := i*batchSize + j
- if err = q.Push(events[index]); err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ if err = q.Enqueue(events[index]); err != nil {
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
}
if err := q.Close(); err != nil {
@@ -217,13 +217,13 @@
}
for j := 0; j < batchNum; j++ {
index := i*batchSize + j
- sequenceEvent, err := q.Pop()
+ sequenceEvent, err := q.Dequeue()
if err != nil {
t.Errorf("error in fetching data from queue: %v", err)
} else if cmp.Equal(events[index].String(), sequenceEvent.Event.String()) {
q.Ack(sequenceEvent.Offset)
} else {
- t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[index], sequenceEvent.Event)
+ t.Errorf("history data and fetching data is not equal\n,history:%+v\n. dequeue data:%+v\n", events[index], sequenceEvent.Event)
}
}
if err := q.Close(); err != nil {
@@ -244,7 +244,7 @@
t.Fatalf("cannot get a mmap queue: %v", err)
}
defer cleanTestQueue(t, q)
- err = q.Push(largeEvent)
+ err = q.Enqueue(largeEvent)
if err == nil {
t.Fatalf("The insertion of the over ceiling event is not as expected")
} else {
@@ -267,9 +267,9 @@
events := getBatchEvents(5)
for _, e := range events {
- err = q.Push(e)
+ err = q.Enqueue(e)
if err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
}
time.Sleep(time.Second)
@@ -295,9 +295,9 @@
events := getBatchEvents(5)
for _, e := range events {
- err = q.Push(e)
+ err = q.Enqueue(e)
if err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
}
time.Sleep(time.Second * 2)
@@ -322,10 +322,10 @@
events := getBatchEvents(20)
var memcost []int32
for _, e := range events {
- err = q.Push(e)
+ err = q.Enqueue(e)
memcost = append(memcost, q.mmapCount)
if err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
}
want := []int32{
@@ -349,9 +349,9 @@
size := 10
wantPos := size * 1024 / q.SegmentSize
largeEvent := getLargeEvent(size)
- err = q.Push(largeEvent)
+ err = q.Enqueue(largeEvent)
if err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
id, _ := q.meta.GetWritingOffset()
if int(id) != wantPos {
@@ -373,11 +373,11 @@
defer cleanTestQueue(t, q)
for i := 0; i < 100; i++ {
- err = q.Push(getLargeEvent(2))
+ err = q.Enqueue(getLargeEvent(2))
if err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
- _, err := q.Pop()
+ _, err := q.Dequeue()
if err != nil {
t.Errorf("error in fetching data from queue: %v", err)
}
@@ -402,17 +402,17 @@
}
defer cleanTestQueue(t, q)
for _, e := range getBatchEvents(3) {
- err = q.Push(e)
+ err = q.Enqueue(e)
if err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
}
for i := 0; i < 3; i++ {
- if _, err = q.Pop(); err != nil {
+ if _, err = q.Dequeue(); err != nil {
t.Errorf("error in fetching data from queue: %v", err)
}
}
- _, err = q.Pop()
+ _, err = q.Dequeue()
if err != nil && err.Error() != "cannot read data when the queue is empty" {
t.Fatalf("not except err: %v", err)
}
@@ -430,13 +430,13 @@
}
defer cleanTestQueue(t, q)
for _, e := range getBatchEvents(8) {
- err = q.Push(e)
+ err = q.Enqueue(e)
if err != nil {
- t.Errorf("queue cannot push one event: %+v", err)
+ t.Errorf("queue cannot enqueue one event: %+v", err)
}
}
- err = q.Push(getLargeEvent(2))
- if err != nil && err.Error() != "cannot push data when the queue is full" {
+ err = q.Enqueue(getLargeEvent(2))
+ if err != nil && err.Error() != "cannot write data when the queue is full" {
t.Fatalf("not except err: %v", err)
}
}
diff --git a/plugins/queue/queue_repository.go b/plugins/queue/queue_repository.go
index 1c49822..9ca29cc 100644
--- a/plugins/queue/queue_repository.go
+++ b/plugins/queue/queue_repository.go
@@ -22,6 +22,7 @@
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/queue/api"
+ "github.com/apache/skywalking-satellite/plugins/queue/memory"
"github.com/apache/skywalking-satellite/plugins/queue/mmap"
)
@@ -29,8 +30,9 @@
func RegisterQueuePlugins() {
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
queues := []api.Queue{
- &mmap.Queue{},
// Please register the queue plugins at here.
+ new(memory.Queue),
+ new(mmap.Queue),
}
for _, q := range queues {
plugin.RegisterPlugin(q)
diff --git a/plugins/receiver/grpc/nativelog/receiver_test.go b/plugins/receiver/grpc/nativelog/receiver_test.go
index ada630a..3e6ff6a 100644
--- a/plugins/receiver/grpc/nativelog/receiver_test.go
+++ b/plugins/receiver/grpc/nativelog/receiver_test.go
@@ -106,7 +106,7 @@
}
func initClient(t *testing.T) logging.LogReportServiceClient {
- conn, err := grpc.Dial("localhost:8000", grpc.WithInsecure(), grpc.WithBlock())
+ conn, err := grpc.Dial("localhost:11800", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Fatalf("cannot init the grpc client: %v", err)
}
diff --git a/plugins/receiver/http/nativcelog/receiver_test.go b/plugins/receiver/http/nativcelog/receiver_test.go
index a20e333..a1344d4 100644
--- a/plugins/receiver/http/nativcelog/receiver_test.go
+++ b/plugins/receiver/http/nativcelog/receiver_test.go
@@ -66,7 +66,7 @@
t.Fatalf("cannot marshal the data: %v", err)
}
go func() {
- resp, err := client.Post("http://localhost:8080/logging", "application/json", bytes.NewBuffer(dataBytes))
+ resp, err := client.Post("http://localhost:12800/logging", "application/json", bytes.NewBuffer(dataBytes))
if err != nil {
fmt.Printf("cannot request the http-server , error: %v", err)
}
@@ -111,7 +111,7 @@
if err != nil {
t.Fatalf("cannot marshal the data: %v", err)
}
- resp, err := client.Post("http://localhost:8080/logging", "application/json", bytes.NewBuffer(dataBytes))
+ resp, err := client.Post("http://localhost:12800/logging", "application/json", bytes.NewBuffer(dataBytes))
if err != nil {
fmt.Printf("cannot request the http-server , error: %v", err)
}
diff --git a/plugins/server/grpc/server.go b/plugins/server/grpc/server.go
index 9ddd28d..993472f 100644
--- a/plugins/server/grpc/server.go
+++ b/plugins/server/grpc/server.go
@@ -52,8 +52,8 @@
func (s *Server) DefaultConfig() string {
return `
-# The address of grpc server. Default value is :8000
-address: :8000
+# The address of grpc server. Default value is :11800
+address: :11800
# The network of grpc. Default value is :tcp
network: tcp
# The max size of receiving log. Default value is 2M. The unit is Byte.
diff --git a/plugins/server/http/server.go b/plugins/server/http/server.go
index 627a023..e63d16b 100644
--- a/plugins/server/http/server.go
+++ b/plugins/server/http/server.go
@@ -43,7 +43,7 @@
func (s *Server) DefaultConfig() string {
return `
# The http server address.
-address: ":8080"
+address: ":12800"
`
}