Add kafka reporter. (#61)
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 10d189e..832700c 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -91,6 +91,7 @@
- ""
- "--features management"
- "--features vendored"
+ - "--features kafka-reporter"
- "--all-features"
runs-on: ubuntu-20.04
steps:
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index d1fab52..79609e2 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -40,3 +40,5 @@
pip3 install setuptools
pip3 install -r requirements.txt
python3 e2e/run_e2e.py --expected_file=e2e/data/expected_context.yaml --max_retry_times=3 --target_path=/ping
+ docker-compose -f docker-compose.e2e.yml logs producer consumer
+ docker-compose -f docker-compose.e2e.yml exec -T producer /build/target/release/e2e-kafka
diff --git a/Cargo.toml b/Cargo.toml
index da1df32..909adb7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,6 +36,7 @@
[features]
management = ["hostname", "systemstat"]
vendored = ["protobuf-src"]
+kafka-reporter = ["rdkafka"]
mock = [] # For internal integration testing only, do not use.
@@ -46,11 +47,13 @@
futures-core = "0.3.21"
futures-util = "0.3.21"
hostname = { version = "0.3.1", optional = true }
+libz-sys = "1.1.9"
once_cell = "1.14.0"
parking_lot = "0.12.1"
portable-atomic = { version = "0.3.13", features = ["float"] }
prost = "0.11.0"
prost-derive = "0.11.0"
+rdkafka = { version = "0.32.2", optional = true }
serde = { version = "1.0.143", features = ["derive"] }
systemstat = { version = "0.2.0", optional = true }
thiserror = "1.0.32"
diff --git a/README.md b/README.md
index 715001e..d01ab4a 100644
--- a/README.md
+++ b/README.md
@@ -212,6 +212,42 @@
}
```
+# Advanced Reporter
+
+The advanced report provides an alternative way to submit the agent collected data to the backend.
+
+## kafka reporter
+
+The Kafka reporter plugin support report traces, metrics, logs, instance properties to Kafka cluster.
+
+Make sure the feature `kafka-reporter` is enabled.
+
+```rust
+#[cfg(feature = "kafka-reporter")]
+mod example {
+ use skywalking::reporter::Report;
+ use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig};
+
+ async fn do_something(reporter: &impl Report) {
+ // ....
+ }
+
+ async fn foo() {
+ let mut client_config = RDKafkaClientConfig::new();
+ client_config
+ .set("bootstrap.servers", "broker:9092")
+ .set("message.timeout.ms", "6000");
+
+ let (reporter, reporting) = KafkaReportBuilder::new(client_config).build().await.unwrap();
+ let handle = reporting.spawn();
+
+ do_something(&reporter);
+
+ handle.await.unwrap();
+ }
+}
+```
+
# How to compile?
If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows:
diff --git a/dist-material/LICENSE b/dist-material/LICENSE
index 6c5eda5..a7c1296 100644
--- a/dist-material/LICENSE
+++ b/dist-material/LICENSE
@@ -220,9 +220,11 @@
https://crates.io/crates/autocfg/1.1.0 1.1.0 Apache-2.0 OR MIT
https://crates.io/crates/base64/0.13.0 0.13.0 Apache-2.0 OR MIT
https://crates.io/crates/bitflags/1.3.1 1.3.1 Apache-2.0 OR MIT
- https://crates.io/crates/cc/1.0.0 1.0.0 Apache-2.0 OR MIT
+ https://crates.io/crates/cc/1.0.18 1.0.18 Apache-2.0 OR MIT
https://crates.io/crates/cfg-if/0.1.2 0.1.2 Apache-2.0 OR MIT
https://crates.io/crates/cfg-if/1.0.0 1.0.0 Apache-2.0 OR MIT
+ https://crates.io/crates/derivative/2.0.0 2.0.0 Apache-2.0 OR MIT
+ https://crates.io/crates/dtoa/0.4.0 0.4.0 Apache-2.0 OR MIT
https://crates.io/crates/either/1.0.0 1.0.0 Apache-2.0 OR MIT
https://crates.io/crates/fixedbitset/0.4.0 0.4.0 Apache-2.0 OR MIT
https://crates.io/crates/fnv/1.0.5 1.0.5 Apache-2.0 OR MIT
@@ -242,10 +244,12 @@
https://crates.io/crates/hyper-timeout/0.4.0 0.4.0 Apache-2.0 OR MIT
https://crates.io/crates/indexmap/1.6.2 1.6.2 Apache-2.0 OR MIT
https://crates.io/crates/itertools/0.10.0 0.10.0 Apache-2.0 OR MIT
+ https://crates.io/crates/itoa/0.3.0 0.3.0 Apache-2.0 OR MIT
https://crates.io/crates/itoa/0.4.1 0.4.1 Apache-2.0 OR MIT
https://crates.io/crates/itoa/1.0.1 1.0.1 Apache-2.0 OR MIT
https://crates.io/crates/lazy_static/1.4.0 1.4.0 Apache-2.0 OR MIT
https://crates.io/crates/libc/0.2.114 0.2.114 Apache-2.0 OR MIT
+ https://crates.io/crates/libz-sys/1.1.9 1.1.9 Apache-2.0 OR MIT
https://crates.io/crates/lock_api/0.4.6 0.4.6 Apache-2.0 OR MIT
https://crates.io/crates/log/0.4.17 0.4.17 Apache-2.0 OR MIT
https://crates.io/crates/match_cfg/0.1.0 0.1.0 Apache-2.0 OR MIT
@@ -254,6 +258,7 @@
https://crates.io/crates/miow/0.3.6 0.3.6 Apache-2.0 OR MIT
https://crates.io/crates/multimap/0.8.0 0.8.0 Apache-2.0 OR MIT
https://crates.io/crates/ntapi/0.3.0 0.3.0 Apache-2.0 OR MIT
+ https://crates.io/crates/num-traits/0.1.32 0.1.32 Apache-2.0 OR MIT
https://crates.io/crates/num_cpus/1.8.0 1.8.0 Apache-2.0 OR MIT
https://crates.io/crates/num_threads/0.1.2 0.1.2 Apache-2.0 OR MIT
https://crates.io/crates/once_cell/1.14.0 1.14.0 Apache-2.0 OR MIT
@@ -267,9 +272,11 @@
https://crates.io/crates/pin-project-internal/1.0.2 1.0.2 Apache-2.0 OR MIT
https://crates.io/crates/pin-project-lite/0.2.9 0.2.9 Apache-2.0 OR MIT
https://crates.io/crates/pin-utils/0.1.0 0.1.0 Apache-2.0 OR MIT
+ https://crates.io/crates/pkg-config/0.3.9 0.3.9 Apache-2.0 OR MIT
https://crates.io/crates/portable-atomic/0.3.13 0.3.13 Apache-2.0 OR MIT
https://crates.io/crates/ppv-lite86/0.2.8 0.2.8 Apache-2.0 OR MIT
https://crates.io/crates/prettyplease/0.1.0 0.1.0 Apache-2.0 OR MIT
+ https://crates.io/crates/proc-macro-crate/0.1.4 0.1.4 Apache-2.0 OR MIT
https://crates.io/crates/proc-macro2/1.0.32 1.0.32 Apache-2.0 OR MIT
https://crates.io/crates/quote/1.0.0 1.0.0 Apache-2.0 OR MIT
https://crates.io/crates/rand/0.4.1 0.4.1 Apache-2.0 OR MIT
@@ -283,6 +290,7 @@
https://crates.io/crates/scopeguard/1.1.0 1.1.0 Apache-2.0 OR MIT
https://crates.io/crates/serde/1.0.143 1.0.143 Apache-2.0 OR MIT
https://crates.io/crates/serde_derive/1.0.143 1.0.143 Apache-2.0 OR MIT
+ https://crates.io/crates/serde_json/1.0.0 1.0.0 Apache-2.0 OR MIT
https://crates.io/crates/signal-hook-registry/1.1.1 1.1.1 Apache-2.0 OR MIT
https://crates.io/crates/smallvec/1.6.1 1.6.1 Apache-2.0 OR MIT
https://crates.io/crates/socket2/0.3.17 0.3.17 Apache-2.0 OR MIT
@@ -295,10 +303,12 @@
https://crates.io/crates/thiserror-impl/1.0.32 1.0.32 Apache-2.0 OR MIT
https://crates.io/crates/time/0.3.9 0.3.9 Apache-2.0 OR MIT
https://crates.io/crates/tokio-io-timeout/1.0.1 1.0.1 Apache-2.0 OR MIT
+ https://crates.io/crates/toml/0.5.2 0.5.2 Apache-2.0 OR MIT
https://crates.io/crates/unicode-segmentation/1.2.0 1.2.0 Apache-2.0 OR MIT
https://crates.io/crates/unicode-width/0.1.4 0.1.4 Apache-2.0 OR MIT
https://crates.io/crates/unicode-xid/0.2.0 0.2.0 Apache-2.0 OR MIT
https://crates.io/crates/uuid/1.1.2 1.1.2 Apache-2.0 OR MIT
+ https://crates.io/crates/vcpkg/0.2.0 0.2.0 Apache-2.0 OR MIT
https://crates.io/crates/vec_map/0.8.0 0.8.0 Apache-2.0 OR MIT
https://crates.io/crates/version_check/0.9.0 0.9.0 Apache-2.0 OR MIT
https://crates.io/crates/winapi/0.3.9 0.3.9 Apache-2.0 OR MIT
@@ -319,6 +329,8 @@
https://crates.io/crates/fuchsia-zircon/0.3.1 0.3.1 BSD-3-Clause
https://crates.io/crates/fuchsia-zircon-sys/0.3.1 0.3.1 BSD-3-Clause
+ https://crates.io/crates/num_enum/0.5.0 0.5.0 BSD-3-Clause
+ https://crates.io/crates/num_enum_derive/0.5.0 0.5.0 BSD-3-Clause
========================================================================
MIT licenses
@@ -345,6 +357,8 @@
https://crates.io/crates/mio/0.8.1 0.8.1 MIT
https://crates.io/crates/nom/7.0.0 7.0.0 MIT
https://crates.io/crates/proc-macro-error/0.2.0 0.2.0 MIT
+ https://crates.io/crates/rdkafka/0.32.2 0.32.2 MIT
+ https://crates.io/crates/rdkafka-sys/4.5.0+1.9.2 4.5.0+1.9.2 MIT
https://crates.io/crates/redox_syscall/0.1.38 0.1.38 MIT
https://crates.io/crates/redox_syscall/0.2.8 0.2.8 MIT
https://crates.io/crates/slab/0.4.2 0.4.2 MIT
diff --git a/dist-material/licenses/LICENSE-rdkafka-sys.txt b/dist-material/licenses/LICENSE-rdkafka-sys.txt
new file mode 100644
index 0000000..eb7158a
--- /dev/null
+++ b/dist-material/licenses/LICENSE-rdkafka-sys.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Federico Giraud
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/dist-material/licenses/LICENSE-rdkafka.txt b/dist-material/licenses/LICENSE-rdkafka.txt
new file mode 100644
index 0000000..eb7158a
--- /dev/null
+++ b/dist-material/licenses/LICENSE-rdkafka.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Federico Giraud
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml
index 3462189..84c7d92 100644
--- a/docker-compose.e2e.yml
+++ b/docker-compose.e2e.yml
@@ -37,6 +37,8 @@
depends_on:
collector:
condition: service_healthy
+ broker:
+ condition: service_healthy
healthcheck:
test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ]
interval: 5s
@@ -54,3 +56,15 @@
condition: service_healthy
consumer:
condition: service_healthy
+ broker:
+ condition: service_healthy
+
+ broker:
+ image: landoop/fast-data-dev:3.3
+ container_name: broker
+ ports:
+ - "9092:9092"
+ healthcheck:
+ test: [ "CMD", "nc", "-zv", "0.0.0.0", "9092"]
+ interval: 5s
+ timeout: 5s
diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml
index 2b156d0..d2e49e1 100644
--- a/e2e/Cargo.toml
+++ b/e2e/Cargo.toml
@@ -22,9 +22,16 @@
edition = "2021"
publish = false
license = "Apache-2.0"
+default-run = "e2e"
+
+[[bin]]
+name = "e2e-kafka"
+path = "src/e2e_kafka.rs"
[dependencies]
-skywalking = { path = ".." }
hyper = { version = "0.14", features = ["full"] }
-tokio = { version = "1", features = ["full"] }
+prost = "0.11.0"
+rdkafka = "0.32.2"
+skywalking = { path = "..", features = ["kafka-reporter"] }
structopt = "0.3"
+tokio = { version = "1", features = ["full"] }
diff --git a/e2e/docker/Dockerfile b/e2e/docker/Dockerfile
index 94ab5c3..13fb2d2 100644
--- a/e2e/docker/Dockerfile
+++ b/e2e/docker/Dockerfile
@@ -20,4 +20,5 @@
WORKDIR /build
COPY . /build/
RUN cargo build --release --workspace
+ENV RUST_BACKTRACE=1
ENTRYPOINT ["/build/target/release/e2e"]
diff --git a/e2e/src/e2e_kafka.rs b/e2e/src/e2e_kafka.rs
new file mode 100644
index 0000000..e785050
--- /dev/null
+++ b/e2e/src/e2e_kafka.rs
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#![allow(clippy::bool_assert_comparison)]
+
+use prost::Message;
+use rdkafka::{
+ consumer::{Consumer, StreamConsumer},
+ ClientConfig, Message as _,
+};
+use skywalking::proto::v3::{
+ log_data_body::Content, meter_data::Metric, JsonLog, KeyStringValuePair, LogData, LogTags,
+ MeterData, RefType, SegmentObject, SpanLayer, SpanType,
+};
+use std::time::Duration;
+use tokio::time::timeout;
+
+async fn segment() {
+ let consumer = create_consumer("skywalking-segments");
+
+ for _ in 0..3 {
+ let segment: SegmentObject = consumer_recv(&consumer).await;
+ check_segment(&segment);
+ }
+}
+
+fn check_segment(segment: &SegmentObject) {
+ dbg!(segment);
+
+ assert_eq!(segment.service_instance, "node_0");
+
+ if segment.service == "consumer" {
+ assert_eq!(segment.spans.len(), 1);
+
+ assert_eq!(segment.spans[0].span_id, 0);
+ assert_eq!(segment.spans[0].parent_span_id, -1);
+ assert_eq!(segment.spans[0].operation_name, "/pong");
+ assert_eq!(segment.spans[0].peer, "");
+ assert_eq!(segment.spans[0].span_type, SpanType::Entry as i32);
+ assert_eq!(segment.spans[0].span_layer, SpanLayer::Http as i32);
+ assert_eq!(segment.spans[0].component_id, 11000);
+ assert_eq!(segment.spans[0].is_error, false);
+ assert_eq!(segment.spans[0].tags.len(), 0);
+ assert_eq!(segment.spans[0].logs.len(), 0);
+ assert_eq!(segment.spans[0].refs.len(), 1);
+
+ assert_eq!(
+ segment.spans[0].refs[0].ref_type,
+ RefType::CrossProcess as i32
+ );
+ assert_eq!(segment.spans[0].refs[0].parent_span_id, 1);
+ assert_eq!(segment.spans[0].refs[0].parent_service, "producer");
+ assert_eq!(segment.spans[0].refs[0].parent_service_instance, "node_0");
+ assert_eq!(segment.spans[0].refs[0].parent_endpoint, "/pong");
+ assert_eq!(
+ segment.spans[0].refs[0].network_address_used_at_peer,
+ "consumer:8082"
+ );
+ } else if segment.service == "producer" {
+ if segment.spans.last().unwrap().operation_name == "/ping" {
+ assert_eq!(segment.spans.len(), 3);
+
+ assert_eq!(segment.spans[0].span_id, 1);
+ assert_eq!(segment.spans[0].parent_span_id, 0);
+ assert_eq!(segment.spans[0].operation_name, "/pong");
+ assert_eq!(segment.spans[0].peer, "consumer:8082");
+ assert_eq!(segment.spans[0].span_type, SpanType::Exit as i32);
+ assert_eq!(segment.spans[0].span_layer, SpanLayer::Unknown as i32);
+ assert_eq!(segment.spans[0].component_id, 11000);
+ assert_eq!(segment.spans[0].is_error, false);
+ assert_eq!(segment.spans[0].tags.len(), 0);
+ assert_eq!(segment.spans[0].logs.len(), 0);
+ assert_eq!(segment.spans[0].refs.len(), 0);
+
+ assert_eq!(segment.spans[1].span_id, 2);
+ assert_eq!(segment.spans[1].parent_span_id, 0);
+ assert_eq!(segment.spans[1].operation_name, "async-job");
+ assert_eq!(segment.spans[1].peer, "");
+ assert_eq!(segment.spans[1].span_type, SpanType::Local as i32);
+ assert_eq!(segment.spans[1].span_layer, SpanLayer::Unknown as i32);
+ assert_eq!(segment.spans[1].component_id, 11000);
+ assert_eq!(segment.spans[1].is_error, false);
+ assert_eq!(segment.spans[1].tags.len(), 0);
+ assert_eq!(segment.spans[1].logs.len(), 0);
+ assert_eq!(segment.spans[1].refs.len(), 0);
+
+ assert_eq!(segment.spans[2].span_id, 0);
+ assert_eq!(segment.spans[2].parent_span_id, -1);
+ assert_eq!(segment.spans[2].operation_name, "/ping");
+ assert_eq!(segment.spans[2].peer, "");
+ assert_eq!(segment.spans[2].span_type, SpanType::Entry as i32);
+ assert_eq!(segment.spans[2].span_layer, SpanLayer::Http as i32);
+ assert_eq!(segment.spans[2].component_id, 11000);
+ assert_eq!(segment.spans[2].is_error, false);
+ assert_eq!(segment.spans[2].tags.len(), 0);
+ assert_eq!(segment.spans[2].logs.len(), 0);
+ assert_eq!(segment.spans[2].refs.len(), 0);
+ } else if segment.spans.last().unwrap().operation_name == "async-callback" {
+ assert_eq!(segment.spans.len(), 1);
+
+ assert_eq!(segment.spans[0].span_id, 0);
+ assert_eq!(segment.spans[0].parent_span_id, -1);
+ assert_eq!(segment.spans[0].peer, "");
+ assert_eq!(segment.spans[0].span_type, SpanType::Entry as i32);
+ assert_eq!(segment.spans[0].span_layer, SpanLayer::Http as i32);
+ assert_eq!(segment.spans[0].component_id, 11000);
+ assert_eq!(segment.spans[0].is_error, false);
+ assert_eq!(segment.spans[0].tags.len(), 0);
+ assert_eq!(segment.spans[0].logs.len(), 0);
+ assert_eq!(segment.spans[0].refs.len(), 1);
+
+ assert_eq!(
+ segment.spans[0].refs[0].ref_type,
+ RefType::CrossThread as i32
+ );
+ assert_eq!(segment.spans[0].refs[0].parent_span_id, 2);
+ assert_eq!(segment.spans[0].refs[0].parent_service, "producer");
+ assert_eq!(segment.spans[0].refs[0].parent_service_instance, "node_0");
+ assert_eq!(segment.spans[0].refs[0].parent_endpoint, "async-job");
+ assert_eq!(segment.spans[0].refs[0].network_address_used_at_peer, "");
+ } else {
+ panic!(
+ "unknown operation_name {}",
+ segment.spans.last().unwrap().operation_name
+ );
+ }
+ } else {
+ panic!("unknown service {}", segment.service);
+ }
+}
+
+async fn meter() {
+ let consumer = create_consumer("skywalking-meters");
+
+ for _ in 0..3 {
+ let meter: MeterData = consumer_recv(&consumer).await;
+ check_meter(&meter);
+ }
+}
+
+fn check_meter(meter: &MeterData) {
+ dbg!(meter);
+
+ assert_eq!(meter.service, "consumer");
+ assert_eq!(meter.service_instance, "node_0");
+
+ match &meter.metric {
+ Some(Metric::SingleValue(value)) => {
+ assert_eq!(value.name, "instance_trace_count");
+
+ if value.labels[0].name == "region" && value.labels[0].value == "us-west" {
+ assert_eq!(value.labels[1].name, "az");
+ assert_eq!(value.labels[1].value, "az-1");
+ assert_eq!(value.value, 30.0);
+ } else if value.labels[0].name == "region" && value.labels[0].value == "us-east" {
+ assert_eq!(value.labels[1].name, "az");
+ assert_eq!(value.labels[1].value, "az-3");
+ assert_eq!(value.value, 20.0);
+ } else {
+ panic!("unknown label {:?}", &value.labels[0]);
+ }
+ }
+ Some(Metric::Histogram(value)) => {
+ assert_eq!(value.name, "instance_trace_count");
+ assert_eq!(value.labels[0].name, "region");
+ assert_eq!(value.labels[0].value, "us-north");
+ assert_eq!(value.labels[1].name, "az");
+ assert_eq!(value.labels[1].value, "az-1");
+ assert_eq!(value.values[0].bucket, 10.0);
+ assert_eq!(value.values[0].count, 1);
+ assert_eq!(value.values[1].bucket, 20.0);
+ assert_eq!(value.values[1].count, 2);
+ assert_eq!(value.values[2].bucket, 30.0);
+ assert_eq!(value.values[2].count, 0);
+ }
+ _ => {
+ panic!("unknown metric");
+ }
+ }
+}
+
+async fn log() {
+ let consumer = create_consumer("skywalking-logs");
+
+ for _ in 0..3 {
+ let log: LogData = consumer_recv(&consumer).await;
+ check_log(&log);
+ }
+}
+
+fn check_log(log: &LogData) {
+ dbg!(log);
+
+ if log.service == "producer" && log.service_instance == "node_0" {
+ assert_eq!(log.endpoint, "/ping");
+
+ match &log.body.as_ref().unwrap().content {
+ Some(Content::Json(json)) => {
+ assert_eq!(json.json, r#"{"message": "handle ping"}"#);
+ assert_eq!(log.trace_context, None);
+ assert_eq!(
+ log.tags,
+ Some(LogTags {
+ data: vec![KeyStringValuePair {
+ key: "level".to_string(),
+ value: "DEBUG".to_string()
+ }]
+ })
+ );
+ }
+ Some(Content::Text(text)) => {
+ assert_eq!(text.text, "do http request");
+ assert_eq!(log.trace_context.as_ref().unwrap().span_id, 1);
+ assert_eq!(
+ log.tags,
+ Some(LogTags {
+ data: vec![KeyStringValuePair {
+ key: "level".to_string(),
+ value: "INFO".to_string()
+ }]
+ })
+ );
+ }
+ body => {
+ panic!("unknown log body {:?}", body);
+ }
+ }
+ } else if log.service == "consumer" && log.service_instance == "node_0" {
+ assert_eq!(log.endpoint, "/pong");
+ assert_eq!(
+ log.body.as_ref().unwrap().content,
+ Some(Content::Json(JsonLog {
+ json: r#"{"message": "handle pong"}"#.to_string()
+ }))
+ );
+ assert_eq!(log.trace_context, None);
+ assert_eq!(
+ log.tags,
+ Some(LogTags {
+ data: vec![KeyStringValuePair {
+ key: "level".to_string(),
+ value: "DEBUG".to_string()
+ }]
+ })
+ );
+ } else {
+ panic!("unknown log {} {}", log.service, log.service_instance);
+ }
+}
+
+fn create_consumer(topic: &str) -> StreamConsumer {
+ let consumer: StreamConsumer = ClientConfig::new()
+ .set("bootstrap.servers", "broker:9092")
+ .set("broker.address.family", "v4")
+ .set("session.timeout.ms", "30000")
+ .set("enable.auto.commit", "true")
+ .set("auto.offset.reset", "earliest")
+ .set("enable.auto.offset.store", "true")
+ .set("group.id", topic)
+ .create()
+ .unwrap();
+ consumer.subscribe(&[topic]).unwrap();
+ consumer
+}
+
+async fn consumer_recv<T: Message + Default>(consumer: &StreamConsumer) -> T {
+ let message = timeout(Duration::from_secs(12), consumer.recv())
+ .await
+ .unwrap()
+ .unwrap();
+ let value = message.payload_view::<[u8]>().unwrap().unwrap();
+ Message::decode(value).unwrap()
+}
+
+#[tokio::main(flavor = "multi_thread")]
+async fn main() {
+ segment().await;
+ meter().await;
+ log().await;
+}
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index 545c1c9..bf46cb3 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -29,7 +29,11 @@
meter::{Counter, Gauge, Histogram},
metricer::Metricer,
},
- reporter::grpc::GrpcReporter,
+ reporter::{
+ grpc::GrpcReporter,
+ kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig},
+ CollectItem, Report,
+ },
trace::{
propagation::{
context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY, decoder::decode_propagation,
@@ -40,6 +44,7 @@
};
use std::{convert::Infallible, error::Error, net::SocketAddr};
use structopt::StructOpt;
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
static NOT_FOUND_MSG: &str = "not found";
static SUCCESS_MSG: &str = "Success";
@@ -210,11 +215,52 @@
mode: String,
}
+#[derive(Clone)]
+struct CombineReporter {
+ grpc_reporter: GrpcReporter<UnboundedSender<CollectItem>, UnboundedReceiver<CollectItem>>,
+ kafka_reporter: KafkaReporter<UnboundedSender<CollectItem>>,
+}
+
+impl Report for CombineReporter {
+ fn report(&self, item: CollectItem) {
+ let typ = match &item {
+ CollectItem::Trace(_) => "trace",
+ CollectItem::Log(_) => "log",
+ CollectItem::Meter(_) => "meter",
+ _ => "unknown",
+ };
+ println!("report item type: {:?}", typ);
+ self.grpc_reporter.report(item.clone());
+ self.kafka_reporter.report(item);
+ }
+}
+
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
- let reporter = GrpcReporter::connect("http://collector:19876").await?;
- let handle = reporter.reporting().await.spawn();
+
+ let reporter1 = GrpcReporter::connect("http://collector:19876").await?;
+ let handle1 = reporter1.reporting().await.spawn();
+
+ let mut client_config = RDKafkaClientConfig::new();
+ client_config
+ .set("bootstrap.servers", "broker:9092")
+ .set("message.timeout.ms", "6000");
+ let (reporter2, reporting) = KafkaReportBuilder::new(client_config)
+ .with_err_handle(|message, err| {
+ eprintln!(
+ "kafka reporter failed, message: {}, err: {:?}",
+ message, err
+ );
+ })
+ .build()
+ .await?;
+ let handle2 = reporting.spawn();
+
+ let reporter = CombineReporter {
+ grpc_reporter: reporter1,
+ kafka_reporter: reporter2,
+ };
if opt.mode == "consumer" {
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone()));
@@ -229,7 +275,8 @@
unreachable!()
}
- handle.await?;
+ handle1.await?;
+ handle2.await?;
Ok(())
}
diff --git a/src/error/mod.rs b/src/error/mod.rs
index 2a26da6..b87a5b7 100644
--- a/src/error/mod.rs
+++ b/src/error/mod.rs
@@ -44,6 +44,11 @@
#[error("tokio join failed: {0}")]
TokioJoin(#[from] tokio::task::JoinError),
+ /// Kafka reporter error.
+ #[cfg(feature = "kafka-reporter")]
+ #[error("kafka reporter failed: {0}")]
+ KafkaReporter(crate::reporter::kafka::Error),
+
/// Other uncovered errors.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + 'static>),
diff --git a/src/proto/v3/mod.rs b/src/proto/v3/mod.rs
index 0086949..d470966 100644
--- a/src/proto/v3/mod.rs
+++ b/src/proto/v3/mod.rs
@@ -18,6 +18,7 @@
#![allow(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]
+#![allow(clippy::derive_partial_eq_without_eq)]
use crate::common::system_time::{fetch_time, TimePeriod};
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index 5abb88f..31676e7 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -16,6 +16,7 @@
//! Grpc implementation of [Report].
+use super::{CollectItemConsume, CollectItemProduce};
#[cfg(feature = "management")]
use crate::proto::v3::management_service_client::ManagementServiceClient;
use crate::{
@@ -51,7 +52,6 @@
};
use tokio_stream::StreamExt;
use tonic::{
- async_trait,
metadata::{Ascii, MetadataValue},
service::{interceptor::InterceptedService, Interceptor},
transport::{self, Channel, Endpoint},
@@ -71,66 +71,6 @@
error!(?status, "{}", message);
}
-/// Special purpose, used for user-defined production operations. Generally, it
-/// does not need to be handled.
-pub trait CollectItemProduce: Send + Sync + 'static {
- /// Produce the collect item non-blocking.
- fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>>;
-}
-
-impl CollectItemProduce for () {
- fn produce(&self, _item: CollectItem) -> Result<(), Box<dyn Error>> {
- Ok(())
- }
-}
-
-impl CollectItemProduce for mpsc::UnboundedSender<CollectItem> {
- fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
- Ok(self.send(item)?)
- }
-}
-
-/// Special purpose, used for user-defined consume operations. Generally, it
-/// does not need to be handled.
-#[async_trait]
-pub trait CollectItemConsume: Send + Sync + 'static {
- /// Consume the collect item blocking.
- async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>>;
-
- /// Try to consume the collect item non-blocking.
- async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>>;
-}
-
-#[async_trait]
-impl CollectItemConsume for () {
- async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- Ok(None)
- }
-
- async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- Ok(None)
- }
-}
-
-#[async_trait]
-impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
- async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- Ok(self.recv().await)
- }
-
- async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- use mpsc::error::TryRecvError;
-
- match self.try_recv() {
- Ok(item) => Ok(Some(item)),
- Err(e) => match e {
- TryRecvError::Empty => Ok(None),
- TryRecvError::Disconnected => Err(Box::new(e)),
- },
- }
- }
-}
-
#[derive(Default, Clone)]
struct CustomInterceptor {
authentication: Option<Arc<String>>,
diff --git a/src/reporter/kafka.rs b/src/reporter/kafka.rs
new file mode 100644
index 0000000..aff55ac
--- /dev/null
+++ b/src/reporter/kafka.rs
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//! Kafka implementation of [Report].
+
+use super::{CollectItemConsume, CollectItemProduce};
+use crate::reporter::{CollectItem, Report};
+pub use rdkafka::config::{ClientConfig as RDKafkaClientConfig, RDKafkaLogLevel};
+use rdkafka::producer::{FutureProducer, FutureRecord};
+use std::{
+ error,
+ future::{pending, Future},
+ pin::Pin,
+ sync::{
+ atomic::{AtomicBool, Ordering::Relaxed},
+ Arc,
+ },
+ time::Duration,
+};
+use tokio::{select, spawn, sync::mpsc, task::JoinHandle, try_join};
+use tracing::error;
+
+/// Kafka reporter error.
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ /// ksKafka error.
+ #[error(transparent)]
+ RdKafka(#[from] rdkafka::error::KafkaError),
+
+ /// kafka topic not found
+ #[error("topic not found: {topic}")]
+ TopicNotFound {
+ /// Name of kafka topic.
+ topic: String,
+ },
+}
+
+type DynErrHandler = dyn Fn(&str, &dyn error::Error) + Send + Sync + 'static;
+
+fn default_err_handle(message: &str, err: &dyn error::Error) {
+ error!(?err, "{}", message);
+}
+
+#[derive(Default)]
+struct State {
+ is_closing: AtomicBool,
+}
+
+impl State {
+ fn is_closing(&self) -> bool {
+ self.is_closing.load(Relaxed)
+ }
+}
+
+/// The Kafka reporter plugin support report traces, metrics, logs, instance
+/// properties to Kafka cluster.
+pub struct KafkaReportBuilder<P, C> {
+ state: Arc<State>,
+ producer: Arc<P>,
+ consumer: C,
+ client_config: RDKafkaClientConfig,
+ namespace: Option<String>,
+ err_handle: Arc<DynErrHandler>,
+}
+
+impl KafkaReportBuilder<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
+ /// Create builder, with rdkafka client configuration.
+ pub fn new(client_config: RDKafkaClientConfig) -> Self {
+ let (producer, consumer) = mpsc::unbounded_channel();
+ Self::new_with_pc(client_config, producer, consumer)
+ }
+}
+
+impl<P: CollectItemProduce, C: CollectItemConsume> KafkaReportBuilder<P, C> {
+ /// Special purpose, used for user-defined produce and consume operations,
+ /// usually you can use [KafkaReportBuilder::new].
+ pub fn new_with_pc(client_config: RDKafkaClientConfig, producer: P, consumer: C) -> Self {
+ Self {
+ state: Default::default(),
+ producer: Arc::new(producer),
+ consumer,
+ client_config,
+ namespace: None,
+ err_handle: Arc::new(default_err_handle),
+ }
+ }
+
+ /// Set error handle. By default, the error will be logged.
+ pub fn with_err_handle(
+ mut self,
+ handle: impl Fn(&str, &dyn error::Error) + Send + Sync + 'static,
+ ) -> Self {
+ self.err_handle = Arc::new(handle);
+ self
+ }
+
+ /// Use to isolate multi OAP server when using same Kafka cluster (final
+ /// topic name will append namespace before Kafka topics with - ).
+ pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
+ self.namespace = Some(namespace.into());
+ self
+ }
+
+ /// Build the Reporter implemented [Report] in the foreground, and the
+ /// handle to push data to kafka in the background.
+ pub async fn build(self) -> Result<(KafkaReporter<P>, KafkaReporting<C>), Error> {
+ let kafka_producer = KafkaProducer::new(
+ self.client_config.create()?,
+ self.err_handle.clone(),
+ self.namespace,
+ )
+ .await?;
+ Ok((
+ KafkaReporter {
+ state: self.state.clone(),
+ producer: self.producer,
+ err_handle: self.err_handle,
+ },
+ KafkaReporting {
+ state: self.state,
+ consumer: self.consumer,
+ kafka_producer,
+ shutdown_signal: Box::pin(pending()),
+ },
+ ))
+ }
+}
+
+/// The kafka reporter implemented [Report].
+pub struct KafkaReporter<P> {
+ state: Arc<State>,
+ producer: Arc<P>,
+ err_handle: Arc<DynErrHandler>,
+}
+
+impl<P> Clone for KafkaReporter<P> {
+ #[inline]
+ fn clone(&self) -> Self {
+ Self {
+ state: self.state.clone(),
+ producer: self.producer.clone(),
+ err_handle: self.err_handle.clone(),
+ }
+ }
+}
+
+impl<P: CollectItemProduce> Report for KafkaReporter<P> {
+ fn report(&self, item: CollectItem) {
+ if !self.state.is_closing() {
+ if let Err(e) = self.producer.produce(item) {
+ (self.err_handle)("report collect item failed", &*e);
+ }
+ }
+ }
+}
+
+/// The handle to push data to kafka.
+pub struct KafkaReporting<C> {
+ state: Arc<State>,
+ consumer: C,
+ kafka_producer: KafkaProducer,
+ shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
+}
+
+impl<C: CollectItemConsume> KafkaReporting<C> {
+ /// Quit when shutdown_signal received.
+ ///
+ /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
+ pub fn with_graceful_shutdown(
+ mut self,
+ shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+ ) -> Self {
+ self.shutdown_signal = Box::pin(shutdown_signal);
+ self
+ }
+
+ /// Spawn the reporting in background.
+ pub fn spawn(self) -> ReportingJoinHandle {
+ let handle = spawn(async move {
+ let KafkaReporting {
+ state,
+ mut consumer,
+ mut kafka_producer,
+ shutdown_signal,
+ } = self;
+
+ let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
+
+ let work_fut = async move {
+ loop {
+ select! {
+ item = consumer.consume() => {
+ match item {
+ Ok(Some(item)) => {
+ kafka_producer.produce(item).await;
+ }
+ Ok(None) => break,
+ Err(err) => return Err(crate::Error::Other(err)),
+ }
+ }
+ _ = shutdown_rx.recv() => break,
+ }
+ }
+
+ state.is_closing.store(true, Relaxed);
+
+ // Flush.
+ loop {
+ match consumer.try_consume().await {
+ Ok(Some(item)) => {
+ kafka_producer.produce(item).await;
+ }
+ Ok(None) => break,
+ Err(err) => return Err(err.into()),
+ }
+ }
+
+ Ok::<_, crate::Error>(())
+ };
+
+ let shutdown_fut = async move {
+ shutdown_signal.await;
+ shutdown_tx
+ .send(())
+ .map_err(|e| crate::Error::Other(Box::new(e)))?;
+ Ok(())
+ };
+
+ try_join!(work_fut, shutdown_fut)?;
+
+ Ok(())
+ });
+ ReportingJoinHandle { handle }
+ }
+}
+
+/// Handle of [KafkaReporting::spawn].
+pub struct ReportingJoinHandle {
+ handle: JoinHandle<crate::Result<()>>,
+}
+
+impl Future for ReportingJoinHandle {
+ type Output = crate::Result<()>;
+
+ fn poll(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ Pin::new(&mut self.handle).poll(cx).map(|rs| rs?)
+ }
+}
+
+struct TopicNames {
+ segment: String,
+ meter: String,
+ log: String,
+ #[cfg(feature = "management")]
+ management: String,
+}
+
+impl TopicNames {
+ const TOPIC_LOG: &str = "skywalking-logs";
+ #[cfg(feature = "management")]
+ const TOPIC_MANAGEMENT: &str = "skywalking-managements";
+ const TOPIC_METER: &str = "skywalking-meters";
+ const TOPIC_SEGMENT: &str = "skywalking-segments";
+
+ fn new(namespace: Option<&str>) -> Self {
+ Self {
+ segment: Self::real_topic_name(namespace, Self::TOPIC_SEGMENT),
+ meter: Self::real_topic_name(namespace, Self::TOPIC_METER),
+ log: Self::real_topic_name(namespace, Self::TOPIC_LOG),
+ #[cfg(feature = "management")]
+ management: Self::real_topic_name(namespace, Self::TOPIC_MANAGEMENT),
+ }
+ }
+
+ fn real_topic_name(namespace: Option<&str>, topic_name: &str) -> String {
+ namespace
+ .map(|namespace| format!("{}-{}", namespace, topic_name))
+ .unwrap_or_else(|| topic_name.to_string())
+ }
+}
+
+struct KafkaProducer {
+ topic_names: TopicNames,
+ client: FutureProducer,
+ err_handle: Arc<DynErrHandler>,
+}
+
+impl KafkaProducer {
+ async fn new(
+ client: FutureProducer,
+ err_handle: Arc<DynErrHandler>,
+ namespace: Option<String>,
+ ) -> Result<Self, Error> {
+ let topic_names = TopicNames::new(namespace.as_deref());
+ Ok(Self {
+ client,
+ err_handle,
+ topic_names,
+ })
+ }
+
+ async fn produce(&mut self, item: CollectItem) {
+ let (topic_name, key) = match &item {
+ CollectItem::Trace(item) => (
+ &self.topic_names.segment,
+ item.trace_segment_id.as_bytes().to_vec(),
+ ),
+ CollectItem::Log(item) => (&self.topic_names.log, item.service.as_bytes().to_vec()),
+ CollectItem::Meter(item) => (
+ &self.topic_names.meter,
+ item.service_instance.as_bytes().to_vec(),
+ ),
+ #[cfg(feature = "management")]
+ CollectItem::Instance(item) => (
+ &self.topic_names.management,
+ format!("register-{}", &item.service_instance).into_bytes(),
+ ),
+ #[cfg(feature = "management")]
+ CollectItem::Ping(item) => (
+ &self.topic_names.log,
+ item.service_instance.as_bytes().to_vec(),
+ ),
+ };
+
+ let payload = item.encode_to_vec();
+ let record = FutureRecord::to(topic_name).payload(&payload).key(&key);
+
+ if let Err((err, _)) = self.client.send(record, Duration::from_secs(0)).await {
+ (self.err_handle)("Collect data to kafka failed", &err);
+ }
+ }
+}
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 0b46662..783fcfe 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -17,17 +17,21 @@
//! Reporter contains common `Report` trait and the implementations.
pub mod grpc;
+#[cfg(feature = "kafka-reporter")]
+#[cfg_attr(docsrs, doc(cfg(feature = "kafka-reporter")))]
+pub mod kafka;
pub mod print;
#[cfg(feature = "management")]
use crate::proto::v3::{InstancePingPkg, InstanceProperties};
use crate::proto::v3::{LogData, MeterData, SegmentObject};
use serde::{Deserialize, Serialize};
-use std::{ops::Deref, sync::Arc};
-use tokio::sync::OnceCell;
+use std::{error::Error, ops::Deref, sync::Arc};
+use tokio::sync::{mpsc, OnceCell};
+use tonic::async_trait;
/// Collect item of protobuf object.
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub enum CollectItem {
/// Tracing object.
@@ -46,6 +50,23 @@
Ping(Box<InstancePingPkg>),
}
+impl CollectItem {
+ #[cfg(feature = "kafka-reporter")]
+ pub(crate) fn encode_to_vec(self) -> Vec<u8> {
+ use prost::Message;
+
+ match self {
+ CollectItem::Trace(item) => item.encode_to_vec(),
+ CollectItem::Log(item) => item.encode_to_vec(),
+ CollectItem::Meter(item) => item.encode_to_vec(),
+ #[cfg(feature = "management")]
+ CollectItem::Instance(item) => item.encode_to_vec(),
+ #[cfg(feature = "management")]
+ CollectItem::Ping(item) => item.encode_to_vec(),
+ }
+ }
+}
+
pub(crate) type DynReport = dyn Report + Send + Sync + 'static;
/// Report provide non-blocking report method for trace, log and metric object.
@@ -76,3 +97,91 @@
Report::report(self.get().expect("OnceCell is empty"), item)
}
}
+
+/// Special purpose, used for user-defined production operations. Generally, it
+/// does not need to be handled.
+pub trait CollectItemProduce: Send + Sync + 'static {
+ /// Produce the collect item non-blocking.
+ fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>>;
+}
+
+impl CollectItemProduce for () {
+ fn produce(&self, _item: CollectItem) -> Result<(), Box<dyn Error>> {
+ Ok(())
+ }
+}
+
+impl CollectItemProduce for mpsc::Sender<CollectItem> {
+ fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
+ Ok(self.blocking_send(item)?)
+ }
+}
+
+impl CollectItemProduce for mpsc::UnboundedSender<CollectItem> {
+ fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
+ Ok(self.send(item)?)
+ }
+}
+
+/// Alias of method result of [CollectItemConsume].
+pub type ConsumeResult = Result<Option<CollectItem>, Box<dyn Error + Send>>;
+
+/// Special purpose, used for user-defined consume operations. Generally, it
+/// does not need to be handled.
+#[async_trait]
+pub trait CollectItemConsume: Send + Sync + 'static {
+ /// Consume the collect item blocking.
+ async fn consume(&mut self) -> ConsumeResult;
+
+ /// Try to consume the collect item non-blocking.
+ async fn try_consume(&mut self) -> ConsumeResult;
+}
+
+#[async_trait]
+impl CollectItemConsume for () {
+ async fn consume(&mut self) -> ConsumeResult {
+ Ok(None)
+ }
+
+ async fn try_consume(&mut self) -> ConsumeResult {
+ Ok(None)
+ }
+}
+
+#[async_trait]
+impl CollectItemConsume for mpsc::Receiver<CollectItem> {
+ async fn consume(&mut self) -> ConsumeResult {
+ Ok(self.recv().await)
+ }
+
+ async fn try_consume(&mut self) -> ConsumeResult {
+ use mpsc::error::TryRecvError;
+
+ match self.try_recv() {
+ Ok(item) => Ok(Some(item)),
+ Err(e) => match e {
+ TryRecvError::Empty => Ok(None),
+ TryRecvError::Disconnected => Err(Box::new(e)),
+ },
+ }
+ }
+}
+
+#[async_trait]
+impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
+ async fn consume(&mut self) -> ConsumeResult {
+ Ok(self.recv().await)
+ }
+
+ async fn try_consume(&mut self) -> ConsumeResult {
+ use mpsc::error::TryRecvError;
+
+ match self.try_recv() {
+ Ok(item) => Ok(Some(item)),
+ Err(e) => match e {
+ TryRecvError::Empty => Ok(None),
+ TryRecvError::Disconnected => Err(Box::new(e)),
+ },
+ }
+ }
+}