Add kafka reporter. (#61)

diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 10d189e..832700c 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -91,6 +91,7 @@
           - ""
           - "--features management"
           - "--features vendored"
+          - "--features kafka-reporter"
           - "--all-features"
     runs-on: ubuntu-20.04
     steps:
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index d1fab52..79609e2 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -40,3 +40,5 @@
           pip3 install setuptools
           pip3 install -r requirements.txt
           python3 e2e/run_e2e.py --expected_file=e2e/data/expected_context.yaml --max_retry_times=3 --target_path=/ping
+          docker-compose -f docker-compose.e2e.yml logs producer consumer
+          docker-compose -f docker-compose.e2e.yml exec -T producer /build/target/release/e2e-kafka
diff --git a/Cargo.toml b/Cargo.toml
index da1df32..909adb7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,6 +36,7 @@
 [features]
 management = ["hostname", "systemstat"]
 vendored = ["protobuf-src"]
+kafka-reporter = ["rdkafka"]
 
 mock = []  # For internal integration testing only, do not use.
 
@@ -46,11 +47,13 @@
 futures-core = "0.3.21"
 futures-util = "0.3.21"
 hostname = { version = "0.3.1", optional = true }
+libz-sys = "1.1.9"
 once_cell = "1.14.0"
 parking_lot = "0.12.1"
 portable-atomic = { version = "0.3.13", features = ["float"] }
 prost = "0.11.0"
 prost-derive = "0.11.0"
+rdkafka = { version = "0.32.2", optional = true }
 serde = { version = "1.0.143", features = ["derive"] }
 systemstat = { version = "0.2.0", optional = true }
 thiserror = "1.0.32"
diff --git a/README.md b/README.md
index 715001e..d01ab4a 100644
--- a/README.md
+++ b/README.md
@@ -212,6 +212,42 @@
 }
 ```
 
+# Advanced Reporter
+
+The advanced report provides an alternative way to submit the agent collected data to the backend.
+
+## kafka reporter
+
+The Kafka reporter plugin support report traces, metrics, logs, instance properties to Kafka cluster.
+
+Make sure the feature `kafka-reporter` is enabled.
+
+```rust
+#[cfg(feature = "kafka-reporter")]
+mod example {
+    use skywalking::reporter::Report;
+    use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig};
+
+    async fn do_something(reporter: &impl Report) {
+        // ....
+    }
+
+    async fn foo() {
+        let mut client_config = RDKafkaClientConfig::new();
+        client_config
+            .set("bootstrap.servers", "broker:9092")
+            .set("message.timeout.ms", "6000");
+
+        let (reporter, reporting) = KafkaReportBuilder::new(client_config).build().await.unwrap();
+        let handle = reporting.spawn();
+
+        do_something(&reporter);
+
+        handle.await.unwrap();
+    }
+}
+```
+
 # How to compile?
 
 If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows:
diff --git a/dist-material/LICENSE b/dist-material/LICENSE
index 6c5eda5..a7c1296 100644
--- a/dist-material/LICENSE
+++ b/dist-material/LICENSE
@@ -220,9 +220,11 @@
     https://crates.io/crates/autocfg/1.1.0 1.1.0 Apache-2.0 OR MIT
     https://crates.io/crates/base64/0.13.0 0.13.0 Apache-2.0 OR MIT
     https://crates.io/crates/bitflags/1.3.1 1.3.1 Apache-2.0 OR MIT
-    https://crates.io/crates/cc/1.0.0 1.0.0 Apache-2.0 OR MIT
+    https://crates.io/crates/cc/1.0.18 1.0.18 Apache-2.0 OR MIT
     https://crates.io/crates/cfg-if/0.1.2 0.1.2 Apache-2.0 OR MIT
     https://crates.io/crates/cfg-if/1.0.0 1.0.0 Apache-2.0 OR MIT
+    https://crates.io/crates/derivative/2.0.0 2.0.0 Apache-2.0 OR MIT
+    https://crates.io/crates/dtoa/0.4.0 0.4.0 Apache-2.0 OR MIT
     https://crates.io/crates/either/1.0.0 1.0.0 Apache-2.0 OR MIT
     https://crates.io/crates/fixedbitset/0.4.0 0.4.0 Apache-2.0 OR MIT
     https://crates.io/crates/fnv/1.0.5 1.0.5 Apache-2.0 OR MIT
@@ -242,10 +244,12 @@
     https://crates.io/crates/hyper-timeout/0.4.0 0.4.0 Apache-2.0 OR MIT
     https://crates.io/crates/indexmap/1.6.2 1.6.2 Apache-2.0 OR MIT
     https://crates.io/crates/itertools/0.10.0 0.10.0 Apache-2.0 OR MIT
+    https://crates.io/crates/itoa/0.3.0 0.3.0 Apache-2.0 OR MIT
     https://crates.io/crates/itoa/0.4.1 0.4.1 Apache-2.0 OR MIT
     https://crates.io/crates/itoa/1.0.1 1.0.1 Apache-2.0 OR MIT
     https://crates.io/crates/lazy_static/1.4.0 1.4.0 Apache-2.0 OR MIT
     https://crates.io/crates/libc/0.2.114 0.2.114 Apache-2.0 OR MIT
+    https://crates.io/crates/libz-sys/1.1.9 1.1.9 Apache-2.0 OR MIT
     https://crates.io/crates/lock_api/0.4.6 0.4.6 Apache-2.0 OR MIT
     https://crates.io/crates/log/0.4.17 0.4.17 Apache-2.0 OR MIT
     https://crates.io/crates/match_cfg/0.1.0 0.1.0 Apache-2.0 OR MIT
@@ -254,6 +258,7 @@
     https://crates.io/crates/miow/0.3.6 0.3.6 Apache-2.0 OR MIT
     https://crates.io/crates/multimap/0.8.0 0.8.0 Apache-2.0 OR MIT
     https://crates.io/crates/ntapi/0.3.0 0.3.0 Apache-2.0 OR MIT
+    https://crates.io/crates/num-traits/0.1.32 0.1.32 Apache-2.0 OR MIT
     https://crates.io/crates/num_cpus/1.8.0 1.8.0 Apache-2.0 OR MIT
     https://crates.io/crates/num_threads/0.1.2 0.1.2 Apache-2.0 OR MIT
     https://crates.io/crates/once_cell/1.14.0 1.14.0 Apache-2.0 OR MIT
@@ -267,9 +272,11 @@
     https://crates.io/crates/pin-project-internal/1.0.2 1.0.2 Apache-2.0 OR MIT
     https://crates.io/crates/pin-project-lite/0.2.9 0.2.9 Apache-2.0 OR MIT
     https://crates.io/crates/pin-utils/0.1.0 0.1.0 Apache-2.0 OR MIT
+    https://crates.io/crates/pkg-config/0.3.9 0.3.9 Apache-2.0 OR MIT
     https://crates.io/crates/portable-atomic/0.3.13 0.3.13 Apache-2.0 OR MIT
     https://crates.io/crates/ppv-lite86/0.2.8 0.2.8 Apache-2.0 OR MIT
     https://crates.io/crates/prettyplease/0.1.0 0.1.0 Apache-2.0 OR MIT
+    https://crates.io/crates/proc-macro-crate/0.1.4 0.1.4 Apache-2.0 OR MIT
     https://crates.io/crates/proc-macro2/1.0.32 1.0.32 Apache-2.0 OR MIT
     https://crates.io/crates/quote/1.0.0 1.0.0 Apache-2.0 OR MIT
     https://crates.io/crates/rand/0.4.1 0.4.1 Apache-2.0 OR MIT
@@ -283,6 +290,7 @@
     https://crates.io/crates/scopeguard/1.1.0 1.1.0 Apache-2.0 OR MIT
     https://crates.io/crates/serde/1.0.143 1.0.143 Apache-2.0 OR MIT
     https://crates.io/crates/serde_derive/1.0.143 1.0.143 Apache-2.0 OR MIT
+    https://crates.io/crates/serde_json/1.0.0 1.0.0 Apache-2.0 OR MIT
     https://crates.io/crates/signal-hook-registry/1.1.1 1.1.1 Apache-2.0 OR MIT
     https://crates.io/crates/smallvec/1.6.1 1.6.1 Apache-2.0 OR MIT
     https://crates.io/crates/socket2/0.3.17 0.3.17 Apache-2.0 OR MIT
@@ -295,10 +303,12 @@
     https://crates.io/crates/thiserror-impl/1.0.32 1.0.32 Apache-2.0 OR MIT
     https://crates.io/crates/time/0.3.9 0.3.9 Apache-2.0 OR MIT
     https://crates.io/crates/tokio-io-timeout/1.0.1 1.0.1 Apache-2.0 OR MIT
+    https://crates.io/crates/toml/0.5.2 0.5.2 Apache-2.0 OR MIT
     https://crates.io/crates/unicode-segmentation/1.2.0 1.2.0 Apache-2.0 OR MIT
     https://crates.io/crates/unicode-width/0.1.4 0.1.4 Apache-2.0 OR MIT
     https://crates.io/crates/unicode-xid/0.2.0 0.2.0 Apache-2.0 OR MIT
     https://crates.io/crates/uuid/1.1.2 1.1.2 Apache-2.0 OR MIT
+    https://crates.io/crates/vcpkg/0.2.0 0.2.0 Apache-2.0 OR MIT
     https://crates.io/crates/vec_map/0.8.0 0.8.0 Apache-2.0 OR MIT
     https://crates.io/crates/version_check/0.9.0 0.9.0 Apache-2.0 OR MIT
     https://crates.io/crates/winapi/0.3.9 0.3.9 Apache-2.0 OR MIT
@@ -319,6 +329,8 @@
 
     https://crates.io/crates/fuchsia-zircon/0.3.1 0.3.1 BSD-3-Clause
     https://crates.io/crates/fuchsia-zircon-sys/0.3.1 0.3.1 BSD-3-Clause
+    https://crates.io/crates/num_enum/0.5.0 0.5.0 BSD-3-Clause
+    https://crates.io/crates/num_enum_derive/0.5.0 0.5.0 BSD-3-Clause
 
 ========================================================================
 MIT licenses
@@ -345,6 +357,8 @@
     https://crates.io/crates/mio/0.8.1 0.8.1 MIT
     https://crates.io/crates/nom/7.0.0 7.0.0 MIT
     https://crates.io/crates/proc-macro-error/0.2.0 0.2.0 MIT
+    https://crates.io/crates/rdkafka/0.32.2 0.32.2 MIT
+    https://crates.io/crates/rdkafka-sys/4.5.0+1.9.2 4.5.0+1.9.2 MIT
     https://crates.io/crates/redox_syscall/0.1.38 0.1.38 MIT
     https://crates.io/crates/redox_syscall/0.2.8 0.2.8 MIT
     https://crates.io/crates/slab/0.4.2 0.4.2 MIT
diff --git a/dist-material/licenses/LICENSE-rdkafka-sys.txt b/dist-material/licenses/LICENSE-rdkafka-sys.txt
new file mode 100644
index 0000000..eb7158a
--- /dev/null
+++ b/dist-material/licenses/LICENSE-rdkafka-sys.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Federico Giraud
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/dist-material/licenses/LICENSE-rdkafka.txt b/dist-material/licenses/LICENSE-rdkafka.txt
new file mode 100644
index 0000000..eb7158a
--- /dev/null
+++ b/dist-material/licenses/LICENSE-rdkafka.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Federico Giraud
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml
index 3462189..84c7d92 100644
--- a/docker-compose.e2e.yml
+++ b/docker-compose.e2e.yml
@@ -37,6 +37,8 @@
     depends_on:
       collector:
         condition: service_healthy
+      broker:
+        condition: service_healthy
     healthcheck:
       test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ]
       interval: 5s
@@ -54,3 +56,15 @@
         condition: service_healthy
       consumer:
         condition: service_healthy
+      broker:
+        condition: service_healthy
+
+  broker:
+    image: landoop/fast-data-dev:3.3
+    container_name: broker
+    ports:
+      - "9092:9092"
+    healthcheck:
+      test: [ "CMD", "nc", "-zv", "0.0.0.0", "9092"]
+      interval: 5s
+      timeout: 5s
diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml
index 2b156d0..d2e49e1 100644
--- a/e2e/Cargo.toml
+++ b/e2e/Cargo.toml
@@ -22,9 +22,16 @@
 edition = "2021"
 publish = false
 license = "Apache-2.0"
+default-run = "e2e"
+
+[[bin]]
+name = "e2e-kafka"
+path = "src/e2e_kafka.rs"
 
 [dependencies]
-skywalking = { path = ".." }
 hyper = { version = "0.14", features = ["full"] }
-tokio = { version = "1", features = ["full"] }
+prost = "0.11.0"
+rdkafka = "0.32.2"
+skywalking = { path = "..", features = ["kafka-reporter"] }
 structopt = "0.3"
+tokio = { version = "1", features = ["full"] }
diff --git a/e2e/docker/Dockerfile b/e2e/docker/Dockerfile
index 94ab5c3..13fb2d2 100644
--- a/e2e/docker/Dockerfile
+++ b/e2e/docker/Dockerfile
@@ -20,4 +20,5 @@
 WORKDIR /build
 COPY . /build/
 RUN cargo build --release --workspace
+ENV RUST_BACKTRACE=1
 ENTRYPOINT ["/build/target/release/e2e"]
diff --git a/e2e/src/e2e_kafka.rs b/e2e/src/e2e_kafka.rs
new file mode 100644
index 0000000..e785050
--- /dev/null
+++ b/e2e/src/e2e_kafka.rs
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#![allow(clippy::bool_assert_comparison)]
+
+use prost::Message;
+use rdkafka::{
+    consumer::{Consumer, StreamConsumer},
+    ClientConfig, Message as _,
+};
+use skywalking::proto::v3::{
+    log_data_body::Content, meter_data::Metric, JsonLog, KeyStringValuePair, LogData, LogTags,
+    MeterData, RefType, SegmentObject, SpanLayer, SpanType,
+};
+use std::time::Duration;
+use tokio::time::timeout;
+
+async fn segment() {
+    let consumer = create_consumer("skywalking-segments");
+
+    for _ in 0..3 {
+        let segment: SegmentObject = consumer_recv(&consumer).await;
+        check_segment(&segment);
+    }
+}
+
+fn check_segment(segment: &SegmentObject) {
+    dbg!(segment);
+
+    assert_eq!(segment.service_instance, "node_0");
+
+    if segment.service == "consumer" {
+        assert_eq!(segment.spans.len(), 1);
+
+        assert_eq!(segment.spans[0].span_id, 0);
+        assert_eq!(segment.spans[0].parent_span_id, -1);
+        assert_eq!(segment.spans[0].operation_name, "/pong");
+        assert_eq!(segment.spans[0].peer, "");
+        assert_eq!(segment.spans[0].span_type, SpanType::Entry as i32);
+        assert_eq!(segment.spans[0].span_layer, SpanLayer::Http as i32);
+        assert_eq!(segment.spans[0].component_id, 11000);
+        assert_eq!(segment.spans[0].is_error, false);
+        assert_eq!(segment.spans[0].tags.len(), 0);
+        assert_eq!(segment.spans[0].logs.len(), 0);
+        assert_eq!(segment.spans[0].refs.len(), 1);
+
+        assert_eq!(
+            segment.spans[0].refs[0].ref_type,
+            RefType::CrossProcess as i32
+        );
+        assert_eq!(segment.spans[0].refs[0].parent_span_id, 1);
+        assert_eq!(segment.spans[0].refs[0].parent_service, "producer");
+        assert_eq!(segment.spans[0].refs[0].parent_service_instance, "node_0");
+        assert_eq!(segment.spans[0].refs[0].parent_endpoint, "/pong");
+        assert_eq!(
+            segment.spans[0].refs[0].network_address_used_at_peer,
+            "consumer:8082"
+        );
+    } else if segment.service == "producer" {
+        if segment.spans.last().unwrap().operation_name == "/ping" {
+            assert_eq!(segment.spans.len(), 3);
+
+            assert_eq!(segment.spans[0].span_id, 1);
+            assert_eq!(segment.spans[0].parent_span_id, 0);
+            assert_eq!(segment.spans[0].operation_name, "/pong");
+            assert_eq!(segment.spans[0].peer, "consumer:8082");
+            assert_eq!(segment.spans[0].span_type, SpanType::Exit as i32);
+            assert_eq!(segment.spans[0].span_layer, SpanLayer::Unknown as i32);
+            assert_eq!(segment.spans[0].component_id, 11000);
+            assert_eq!(segment.spans[0].is_error, false);
+            assert_eq!(segment.spans[0].tags.len(), 0);
+            assert_eq!(segment.spans[0].logs.len(), 0);
+            assert_eq!(segment.spans[0].refs.len(), 0);
+
+            assert_eq!(segment.spans[1].span_id, 2);
+            assert_eq!(segment.spans[1].parent_span_id, 0);
+            assert_eq!(segment.spans[1].operation_name, "async-job");
+            assert_eq!(segment.spans[1].peer, "");
+            assert_eq!(segment.spans[1].span_type, SpanType::Local as i32);
+            assert_eq!(segment.spans[1].span_layer, SpanLayer::Unknown as i32);
+            assert_eq!(segment.spans[1].component_id, 11000);
+            assert_eq!(segment.spans[1].is_error, false);
+            assert_eq!(segment.spans[1].tags.len(), 0);
+            assert_eq!(segment.spans[1].logs.len(), 0);
+            assert_eq!(segment.spans[1].refs.len(), 0);
+
+            assert_eq!(segment.spans[2].span_id, 0);
+            assert_eq!(segment.spans[2].parent_span_id, -1);
+            assert_eq!(segment.spans[2].operation_name, "/ping");
+            assert_eq!(segment.spans[2].peer, "");
+            assert_eq!(segment.spans[2].span_type, SpanType::Entry as i32);
+            assert_eq!(segment.spans[2].span_layer, SpanLayer::Http as i32);
+            assert_eq!(segment.spans[2].component_id, 11000);
+            assert_eq!(segment.spans[2].is_error, false);
+            assert_eq!(segment.spans[2].tags.len(), 0);
+            assert_eq!(segment.spans[2].logs.len(), 0);
+            assert_eq!(segment.spans[2].refs.len(), 0);
+        } else if segment.spans.last().unwrap().operation_name == "async-callback" {
+            assert_eq!(segment.spans.len(), 1);
+
+            assert_eq!(segment.spans[0].span_id, 0);
+            assert_eq!(segment.spans[0].parent_span_id, -1);
+            assert_eq!(segment.spans[0].peer, "");
+            assert_eq!(segment.spans[0].span_type, SpanType::Entry as i32);
+            assert_eq!(segment.spans[0].span_layer, SpanLayer::Http as i32);
+            assert_eq!(segment.spans[0].component_id, 11000);
+            assert_eq!(segment.spans[0].is_error, false);
+            assert_eq!(segment.spans[0].tags.len(), 0);
+            assert_eq!(segment.spans[0].logs.len(), 0);
+            assert_eq!(segment.spans[0].refs.len(), 1);
+
+            assert_eq!(
+                segment.spans[0].refs[0].ref_type,
+                RefType::CrossThread as i32
+            );
+            assert_eq!(segment.spans[0].refs[0].parent_span_id, 2);
+            assert_eq!(segment.spans[0].refs[0].parent_service, "producer");
+            assert_eq!(segment.spans[0].refs[0].parent_service_instance, "node_0");
+            assert_eq!(segment.spans[0].refs[0].parent_endpoint, "async-job");
+            assert_eq!(segment.spans[0].refs[0].network_address_used_at_peer, "");
+        } else {
+            panic!(
+                "unknown operation_name {}",
+                segment.spans.last().unwrap().operation_name
+            );
+        }
+    } else {
+        panic!("unknown service {}", segment.service);
+    }
+}
+
+async fn meter() {
+    let consumer = create_consumer("skywalking-meters");
+
+    for _ in 0..3 {
+        let meter: MeterData = consumer_recv(&consumer).await;
+        check_meter(&meter);
+    }
+}
+
+fn check_meter(meter: &MeterData) {
+    dbg!(meter);
+
+    assert_eq!(meter.service, "consumer");
+    assert_eq!(meter.service_instance, "node_0");
+
+    match &meter.metric {
+        Some(Metric::SingleValue(value)) => {
+            assert_eq!(value.name, "instance_trace_count");
+
+            if value.labels[0].name == "region" && value.labels[0].value == "us-west" {
+                assert_eq!(value.labels[1].name, "az");
+                assert_eq!(value.labels[1].value, "az-1");
+                assert_eq!(value.value, 30.0);
+            } else if value.labels[0].name == "region" && value.labels[0].value == "us-east" {
+                assert_eq!(value.labels[1].name, "az");
+                assert_eq!(value.labels[1].value, "az-3");
+                assert_eq!(value.value, 20.0);
+            } else {
+                panic!("unknown label {:?}", &value.labels[0]);
+            }
+        }
+        Some(Metric::Histogram(value)) => {
+            assert_eq!(value.name, "instance_trace_count");
+            assert_eq!(value.labels[0].name, "region");
+            assert_eq!(value.labels[0].value, "us-north");
+            assert_eq!(value.labels[1].name, "az");
+            assert_eq!(value.labels[1].value, "az-1");
+            assert_eq!(value.values[0].bucket, 10.0);
+            assert_eq!(value.values[0].count, 1);
+            assert_eq!(value.values[1].bucket, 20.0);
+            assert_eq!(value.values[1].count, 2);
+            assert_eq!(value.values[2].bucket, 30.0);
+            assert_eq!(value.values[2].count, 0);
+        }
+        _ => {
+            panic!("unknown metric");
+        }
+    }
+}
+
+async fn log() {
+    let consumer = create_consumer("skywalking-logs");
+
+    for _ in 0..3 {
+        let log: LogData = consumer_recv(&consumer).await;
+        check_log(&log);
+    }
+}
+
+fn check_log(log: &LogData) {
+    dbg!(log);
+
+    if log.service == "producer" && log.service_instance == "node_0" {
+        assert_eq!(log.endpoint, "/ping");
+
+        match &log.body.as_ref().unwrap().content {
+            Some(Content::Json(json)) => {
+                assert_eq!(json.json, r#"{"message": "handle ping"}"#);
+                assert_eq!(log.trace_context, None);
+                assert_eq!(
+                    log.tags,
+                    Some(LogTags {
+                        data: vec![KeyStringValuePair {
+                            key: "level".to_string(),
+                            value: "DEBUG".to_string()
+                        }]
+                    })
+                );
+            }
+            Some(Content::Text(text)) => {
+                assert_eq!(text.text, "do http request");
+                assert_eq!(log.trace_context.as_ref().unwrap().span_id, 1);
+                assert_eq!(
+                    log.tags,
+                    Some(LogTags {
+                        data: vec![KeyStringValuePair {
+                            key: "level".to_string(),
+                            value: "INFO".to_string()
+                        }]
+                    })
+                );
+            }
+            body => {
+                panic!("unknown log body {:?}", body);
+            }
+        }
+    } else if log.service == "consumer" && log.service_instance == "node_0" {
+        assert_eq!(log.endpoint, "/pong");
+        assert_eq!(
+            log.body.as_ref().unwrap().content,
+            Some(Content::Json(JsonLog {
+                json: r#"{"message": "handle pong"}"#.to_string()
+            }))
+        );
+        assert_eq!(log.trace_context, None);
+        assert_eq!(
+            log.tags,
+            Some(LogTags {
+                data: vec![KeyStringValuePair {
+                    key: "level".to_string(),
+                    value: "DEBUG".to_string()
+                }]
+            })
+        );
+    } else {
+        panic!("unknown log {} {}", log.service, log.service_instance);
+    }
+}
+
+fn create_consumer(topic: &str) -> StreamConsumer {
+    let consumer: StreamConsumer = ClientConfig::new()
+        .set("bootstrap.servers", "broker:9092")
+        .set("broker.address.family", "v4")
+        .set("session.timeout.ms", "30000")
+        .set("enable.auto.commit", "true")
+        .set("auto.offset.reset", "earliest")
+        .set("enable.auto.offset.store", "true")
+        .set("group.id", topic)
+        .create()
+        .unwrap();
+    consumer.subscribe(&[topic]).unwrap();
+    consumer
+}
+
+async fn consumer_recv<T: Message + Default>(consumer: &StreamConsumer) -> T {
+    let message = timeout(Duration::from_secs(12), consumer.recv())
+        .await
+        .unwrap()
+        .unwrap();
+    let value = message.payload_view::<[u8]>().unwrap().unwrap();
+    Message::decode(value).unwrap()
+}
+
+#[tokio::main(flavor = "multi_thread")]
+async fn main() {
+    segment().await;
+    meter().await;
+    log().await;
+}
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index 545c1c9..bf46cb3 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -29,7 +29,11 @@
         meter::{Counter, Gauge, Histogram},
         metricer::Metricer,
     },
-    reporter::grpc::GrpcReporter,
+    reporter::{
+        grpc::GrpcReporter,
+        kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig},
+        CollectItem, Report,
+    },
     trace::{
         propagation::{
             context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY, decoder::decode_propagation,
@@ -40,6 +44,7 @@
 };
 use std::{convert::Infallible, error::Error, net::SocketAddr};
 use structopt::StructOpt;
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
 
 static NOT_FOUND_MSG: &str = "not found";
 static SUCCESS_MSG: &str = "Success";
@@ -210,11 +215,52 @@
     mode: String,
 }
 
+#[derive(Clone)]
+struct CombineReporter {
+    grpc_reporter: GrpcReporter<UnboundedSender<CollectItem>, UnboundedReceiver<CollectItem>>,
+    kafka_reporter: KafkaReporter<UnboundedSender<CollectItem>>,
+}
+
+impl Report for CombineReporter {
+    fn report(&self, item: CollectItem) {
+        let typ = match &item {
+            CollectItem::Trace(_) => "trace",
+            CollectItem::Log(_) => "log",
+            CollectItem::Meter(_) => "meter",
+            _ => "unknown",
+        };
+        println!("report item type: {:?}", typ);
+        self.grpc_reporter.report(item.clone());
+        self.kafka_reporter.report(item);
+    }
+}
+
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error>> {
     let opt = Opt::from_args();
-    let reporter = GrpcReporter::connect("http://collector:19876").await?;
-    let handle = reporter.reporting().await.spawn();
+
+    let reporter1 = GrpcReporter::connect("http://collector:19876").await?;
+    let handle1 = reporter1.reporting().await.spawn();
+
+    let mut client_config = RDKafkaClientConfig::new();
+    client_config
+        .set("bootstrap.servers", "broker:9092")
+        .set("message.timeout.ms", "6000");
+    let (reporter2, reporting) = KafkaReportBuilder::new(client_config)
+        .with_err_handle(|message, err| {
+            eprintln!(
+                "kafka reporter failed, message: {}, err: {:?}",
+                message, err
+            );
+        })
+        .build()
+        .await?;
+    let handle2 = reporting.spawn();
+
+    let reporter = CombineReporter {
+        grpc_reporter: reporter1,
+        kafka_reporter: reporter2,
+    };
 
     if opt.mode == "consumer" {
         tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone()));
@@ -229,7 +275,8 @@
         unreachable!()
     }
 
-    handle.await?;
+    handle1.await?;
+    handle2.await?;
 
     Ok(())
 }
diff --git a/src/error/mod.rs b/src/error/mod.rs
index 2a26da6..b87a5b7 100644
--- a/src/error/mod.rs
+++ b/src/error/mod.rs
@@ -44,6 +44,11 @@
     #[error("tokio join failed: {0}")]
     TokioJoin(#[from] tokio::task::JoinError),
 
+    /// Kafka reporter error.
+    #[cfg(feature = "kafka-reporter")]
+    #[error("kafka reporter failed: {0}")]
+    KafkaReporter(crate::reporter::kafka::Error),
+
     /// Other uncovered errors.
     #[error(transparent)]
     Other(#[from] Box<dyn std::error::Error + Send + 'static>),
diff --git a/src/proto/v3/mod.rs b/src/proto/v3/mod.rs
index 0086949..d470966 100644
--- a/src/proto/v3/mod.rs
+++ b/src/proto/v3/mod.rs
@@ -18,6 +18,7 @@
 
 #![allow(missing_docs)]
 #![allow(rustdoc::invalid_html_tags)]
+#![allow(clippy::derive_partial_eq_without_eq)]
 
 use crate::common::system_time::{fetch_time, TimePeriod};
 
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index 5abb88f..31676e7 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -16,6 +16,7 @@
 
 //! Grpc implementation of [Report].
 
+use super::{CollectItemConsume, CollectItemProduce};
 #[cfg(feature = "management")]
 use crate::proto::v3::management_service_client::ManagementServiceClient;
 use crate::{
@@ -51,7 +52,6 @@
 };
 use tokio_stream::StreamExt;
 use tonic::{
-    async_trait,
     metadata::{Ascii, MetadataValue},
     service::{interceptor::InterceptedService, Interceptor},
     transport::{self, Channel, Endpoint},
@@ -71,66 +71,6 @@
     error!(?status, "{}", message);
 }
 
-/// Special purpose, used for user-defined production operations. Generally, it
-/// does not need to be handled.
-pub trait CollectItemProduce: Send + Sync + 'static {
-    /// Produce the collect item non-blocking.
-    fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>>;
-}
-
-impl CollectItemProduce for () {
-    fn produce(&self, _item: CollectItem) -> Result<(), Box<dyn Error>> {
-        Ok(())
-    }
-}
-
-impl CollectItemProduce for mpsc::UnboundedSender<CollectItem> {
-    fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
-        Ok(self.send(item)?)
-    }
-}
-
-/// Special purpose, used for user-defined consume operations. Generally, it
-/// does not need to be handled.
-#[async_trait]
-pub trait CollectItemConsume: Send + Sync + 'static {
-    /// Consume the collect item blocking.
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>>;
-
-    /// Try to consume the collect item non-blocking.
-    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>>;
-}
-
-#[async_trait]
-impl CollectItemConsume for () {
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        Ok(None)
-    }
-
-    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        Ok(None)
-    }
-}
-
-#[async_trait]
-impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        Ok(self.recv().await)
-    }
-
-    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        use mpsc::error::TryRecvError;
-
-        match self.try_recv() {
-            Ok(item) => Ok(Some(item)),
-            Err(e) => match e {
-                TryRecvError::Empty => Ok(None),
-                TryRecvError::Disconnected => Err(Box::new(e)),
-            },
-        }
-    }
-}
-
 #[derive(Default, Clone)]
 struct CustomInterceptor {
     authentication: Option<Arc<String>>,
diff --git a/src/reporter/kafka.rs b/src/reporter/kafka.rs
new file mode 100644
index 0000000..aff55ac
--- /dev/null
+++ b/src/reporter/kafka.rs
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//! Kafka implementation of [Report].
+
+use super::{CollectItemConsume, CollectItemProduce};
+use crate::reporter::{CollectItem, Report};
+pub use rdkafka::config::{ClientConfig as RDKafkaClientConfig, RDKafkaLogLevel};
+use rdkafka::producer::{FutureProducer, FutureRecord};
+use std::{
+    error,
+    future::{pending, Future},
+    pin::Pin,
+    sync::{
+        atomic::{AtomicBool, Ordering::Relaxed},
+        Arc,
+    },
+    time::Duration,
+};
+use tokio::{select, spawn, sync::mpsc, task::JoinHandle, try_join};
+use tracing::error;
+
+/// Kafka reporter error.
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    /// ksKafka error.
+    #[error(transparent)]
+    RdKafka(#[from] rdkafka::error::KafkaError),
+
+    /// kafka topic not found
+    #[error("topic not found: {topic}")]
+    TopicNotFound {
+        /// Name of kafka topic.
+        topic: String,
+    },
+}
+
+type DynErrHandler = dyn Fn(&str, &dyn error::Error) + Send + Sync + 'static;
+
+fn default_err_handle(message: &str, err: &dyn error::Error) {
+    error!(?err, "{}", message);
+}
+
+#[derive(Default)]
+struct State {
+    is_closing: AtomicBool,
+}
+
+impl State {
+    fn is_closing(&self) -> bool {
+        self.is_closing.load(Relaxed)
+    }
+}
+
+/// The Kafka reporter plugin support report traces, metrics, logs, instance
+/// properties to Kafka cluster.
+pub struct KafkaReportBuilder<P, C> {
+    state: Arc<State>,
+    producer: Arc<P>,
+    consumer: C,
+    client_config: RDKafkaClientConfig,
+    namespace: Option<String>,
+    err_handle: Arc<DynErrHandler>,
+}
+
+impl KafkaReportBuilder<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
+    /// Create builder, with rdkafka client configuration.
+    pub fn new(client_config: RDKafkaClientConfig) -> Self {
+        let (producer, consumer) = mpsc::unbounded_channel();
+        Self::new_with_pc(client_config, producer, consumer)
+    }
+}
+
+impl<P: CollectItemProduce, C: CollectItemConsume> KafkaReportBuilder<P, C> {
+    /// Special purpose, used for user-defined produce and consume operations,
+    /// usually you can use [KafkaReportBuilder::new].
+    pub fn new_with_pc(client_config: RDKafkaClientConfig, producer: P, consumer: C) -> Self {
+        Self {
+            state: Default::default(),
+            producer: Arc::new(producer),
+            consumer,
+            client_config,
+            namespace: None,
+            err_handle: Arc::new(default_err_handle),
+        }
+    }
+
+    /// Set error handle. By default, the error will be logged.
+    pub fn with_err_handle(
+        mut self,
+        handle: impl Fn(&str, &dyn error::Error) + Send + Sync + 'static,
+    ) -> Self {
+        self.err_handle = Arc::new(handle);
+        self
+    }
+
+    /// Use to isolate multi OAP server when using same Kafka cluster (final
+    /// topic name will append namespace before Kafka topics with - ).
+    pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
+        self.namespace = Some(namespace.into());
+        self
+    }
+
+    /// Build the Reporter implemented [Report] in the foreground, and the
+    /// handle to push data to kafka in the background.
+    pub async fn build(self) -> Result<(KafkaReporter<P>, KafkaReporting<C>), Error> {
+        let kafka_producer = KafkaProducer::new(
+            self.client_config.create()?,
+            self.err_handle.clone(),
+            self.namespace,
+        )
+        .await?;
+        Ok((
+            KafkaReporter {
+                state: self.state.clone(),
+                producer: self.producer,
+                err_handle: self.err_handle,
+            },
+            KafkaReporting {
+                state: self.state,
+                consumer: self.consumer,
+                kafka_producer,
+                shutdown_signal: Box::pin(pending()),
+            },
+        ))
+    }
+}
+
+/// The kafka reporter implemented [Report].
+pub struct KafkaReporter<P> {
+    state: Arc<State>,
+    producer: Arc<P>,
+    err_handle: Arc<DynErrHandler>,
+}
+
+impl<P> Clone for KafkaReporter<P> {
+    #[inline]
+    fn clone(&self) -> Self {
+        Self {
+            state: self.state.clone(),
+            producer: self.producer.clone(),
+            err_handle: self.err_handle.clone(),
+        }
+    }
+}
+
+impl<P: CollectItemProduce> Report for KafkaReporter<P> {
+    fn report(&self, item: CollectItem) {
+        if !self.state.is_closing() {
+            if let Err(e) = self.producer.produce(item) {
+                (self.err_handle)("report collect item failed", &*e);
+            }
+        }
+    }
+}
+
+/// The handle to push data to kafka.
+pub struct KafkaReporting<C> {
+    state: Arc<State>,
+    consumer: C,
+    kafka_producer: KafkaProducer,
+    shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
+}
+
+impl<C: CollectItemConsume> KafkaReporting<C> {
+    /// Quit when shutdown_signal received.
+    ///
+    /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
+    pub fn with_graceful_shutdown(
+        mut self,
+        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+    ) -> Self {
+        self.shutdown_signal = Box::pin(shutdown_signal);
+        self
+    }
+
+    /// Spawn the reporting in background.
+    pub fn spawn(self) -> ReportingJoinHandle {
+        let handle = spawn(async move {
+            let KafkaReporting {
+                state,
+                mut consumer,
+                mut kafka_producer,
+                shutdown_signal,
+            } = self;
+
+            let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
+
+            let work_fut = async move {
+                loop {
+                    select! {
+                        item = consumer.consume() => {
+                            match item {
+                                Ok(Some(item)) => {
+                                    kafka_producer.produce(item).await;
+                                }
+                                Ok(None) => break,
+                                Err(err) => return Err(crate::Error::Other(err)),
+                            }
+                        }
+                        _ =  shutdown_rx.recv() => break,
+                    }
+                }
+
+                state.is_closing.store(true, Relaxed);
+
+                // Flush.
+                loop {
+                    match consumer.try_consume().await {
+                        Ok(Some(item)) => {
+                            kafka_producer.produce(item).await;
+                        }
+                        Ok(None) => break,
+                        Err(err) => return Err(err.into()),
+                    }
+                }
+
+                Ok::<_, crate::Error>(())
+            };
+
+            let shutdown_fut = async move {
+                shutdown_signal.await;
+                shutdown_tx
+                    .send(())
+                    .map_err(|e| crate::Error::Other(Box::new(e)))?;
+                Ok(())
+            };
+
+            try_join!(work_fut, shutdown_fut)?;
+
+            Ok(())
+        });
+        ReportingJoinHandle { handle }
+    }
+}
+
+/// Handle of [KafkaReporting::spawn].
+pub struct ReportingJoinHandle {
+    handle: JoinHandle<crate::Result<()>>,
+}
+
+impl Future for ReportingJoinHandle {
+    type Output = crate::Result<()>;
+
+    fn poll(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Self::Output> {
+        Pin::new(&mut self.handle).poll(cx).map(|rs| rs?)
+    }
+}
+
+struct TopicNames {
+    segment: String,
+    meter: String,
+    log: String,
+    #[cfg(feature = "management")]
+    management: String,
+}
+
+impl TopicNames {
+    const TOPIC_LOG: &str = "skywalking-logs";
+    #[cfg(feature = "management")]
+    const TOPIC_MANAGEMENT: &str = "skywalking-managements";
+    const TOPIC_METER: &str = "skywalking-meters";
+    const TOPIC_SEGMENT: &str = "skywalking-segments";
+
+    fn new(namespace: Option<&str>) -> Self {
+        Self {
+            segment: Self::real_topic_name(namespace, Self::TOPIC_SEGMENT),
+            meter: Self::real_topic_name(namespace, Self::TOPIC_METER),
+            log: Self::real_topic_name(namespace, Self::TOPIC_LOG),
+            #[cfg(feature = "management")]
+            management: Self::real_topic_name(namespace, Self::TOPIC_MANAGEMENT),
+        }
+    }
+
+    fn real_topic_name(namespace: Option<&str>, topic_name: &str) -> String {
+        namespace
+            .map(|namespace| format!("{}-{}", namespace, topic_name))
+            .unwrap_or_else(|| topic_name.to_string())
+    }
+}
+
+struct KafkaProducer {
+    topic_names: TopicNames,
+    client: FutureProducer,
+    err_handle: Arc<DynErrHandler>,
+}
+
+impl KafkaProducer {
+    async fn new(
+        client: FutureProducer,
+        err_handle: Arc<DynErrHandler>,
+        namespace: Option<String>,
+    ) -> Result<Self, Error> {
+        let topic_names = TopicNames::new(namespace.as_deref());
+        Ok(Self {
+            client,
+            err_handle,
+            topic_names,
+        })
+    }
+
+    async fn produce(&mut self, item: CollectItem) {
+        let (topic_name, key) = match &item {
+            CollectItem::Trace(item) => (
+                &self.topic_names.segment,
+                item.trace_segment_id.as_bytes().to_vec(),
+            ),
+            CollectItem::Log(item) => (&self.topic_names.log, item.service.as_bytes().to_vec()),
+            CollectItem::Meter(item) => (
+                &self.topic_names.meter,
+                item.service_instance.as_bytes().to_vec(),
+            ),
+            #[cfg(feature = "management")]
+            CollectItem::Instance(item) => (
+                &self.topic_names.management,
+                format!("register-{}", &item.service_instance).into_bytes(),
+            ),
+            #[cfg(feature = "management")]
+            CollectItem::Ping(item) => (
+                &self.topic_names.log,
+                item.service_instance.as_bytes().to_vec(),
+            ),
+        };
+
+        let payload = item.encode_to_vec();
+        let record = FutureRecord::to(topic_name).payload(&payload).key(&key);
+
+        if let Err((err, _)) = self.client.send(record, Duration::from_secs(0)).await {
+            (self.err_handle)("Collect data to kafka failed", &err);
+        }
+    }
+}
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 0b46662..783fcfe 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -17,17 +17,21 @@
 //! Reporter contains common `Report` trait and the implementations.
 
 pub mod grpc;
+#[cfg(feature = "kafka-reporter")]
+#[cfg_attr(docsrs, doc(cfg(feature = "kafka-reporter")))]
+pub mod kafka;
 pub mod print;
 
 #[cfg(feature = "management")]
 use crate::proto::v3::{InstancePingPkg, InstanceProperties};
 use crate::proto::v3::{LogData, MeterData, SegmentObject};
 use serde::{Deserialize, Serialize};
-use std::{ops::Deref, sync::Arc};
-use tokio::sync::OnceCell;
+use std::{error::Error, ops::Deref, sync::Arc};
+use tokio::sync::{mpsc, OnceCell};
+use tonic::async_trait;
 
 /// Collect item of protobuf object.
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 #[non_exhaustive]
 pub enum CollectItem {
     /// Tracing object.
@@ -46,6 +50,23 @@
     Ping(Box<InstancePingPkg>),
 }
 
+impl CollectItem {
+    #[cfg(feature = "kafka-reporter")]
+    pub(crate) fn encode_to_vec(self) -> Vec<u8> {
+        use prost::Message;
+
+        match self {
+            CollectItem::Trace(item) => item.encode_to_vec(),
+            CollectItem::Log(item) => item.encode_to_vec(),
+            CollectItem::Meter(item) => item.encode_to_vec(),
+            #[cfg(feature = "management")]
+            CollectItem::Instance(item) => item.encode_to_vec(),
+            #[cfg(feature = "management")]
+            CollectItem::Ping(item) => item.encode_to_vec(),
+        }
+    }
+}
+
 pub(crate) type DynReport = dyn Report + Send + Sync + 'static;
 
 /// Report provide non-blocking report method for trace, log and metric object.
@@ -76,3 +97,91 @@
         Report::report(self.get().expect("OnceCell is empty"), item)
     }
 }
+
+/// Special purpose, used for user-defined production operations. Generally, it
+/// does not need to be handled.
+pub trait CollectItemProduce: Send + Sync + 'static {
+    /// Produce the collect item non-blocking.
+    fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>>;
+}
+
+impl CollectItemProduce for () {
+    fn produce(&self, _item: CollectItem) -> Result<(), Box<dyn Error>> {
+        Ok(())
+    }
+}
+
+impl CollectItemProduce for mpsc::Sender<CollectItem> {
+    fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
+        Ok(self.blocking_send(item)?)
+    }
+}
+
+impl CollectItemProduce for mpsc::UnboundedSender<CollectItem> {
+    fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
+        Ok(self.send(item)?)
+    }
+}
+
+/// Alias of method result of [CollectItemConsume].
+pub type ConsumeResult = Result<Option<CollectItem>, Box<dyn Error + Send>>;
+
+/// Special purpose, used for user-defined consume operations. Generally, it
+/// does not need to be handled.
+#[async_trait]
+pub trait CollectItemConsume: Send + Sync + 'static {
+    /// Consume the collect item blocking.
+    async fn consume(&mut self) -> ConsumeResult;
+
+    /// Try to consume the collect item non-blocking.
+    async fn try_consume(&mut self) -> ConsumeResult;
+}
+
+#[async_trait]
+impl CollectItemConsume for () {
+    async fn consume(&mut self) -> ConsumeResult {
+        Ok(None)
+    }
+
+    async fn try_consume(&mut self) -> ConsumeResult {
+        Ok(None)
+    }
+}
+
+#[async_trait]
+impl CollectItemConsume for mpsc::Receiver<CollectItem> {
+    async fn consume(&mut self) -> ConsumeResult {
+        Ok(self.recv().await)
+    }
+
+    async fn try_consume(&mut self) -> ConsumeResult {
+        use mpsc::error::TryRecvError;
+
+        match self.try_recv() {
+            Ok(item) => Ok(Some(item)),
+            Err(e) => match e {
+                TryRecvError::Empty => Ok(None),
+                TryRecvError::Disconnected => Err(Box::new(e)),
+            },
+        }
+    }
+}
+
+#[async_trait]
+impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
+    async fn consume(&mut self) -> ConsumeResult {
+        Ok(self.recv().await)
+    }
+
+    async fn try_consume(&mut self) -> ConsumeResult {
+        use mpsc::error::TryRecvError;
+
+        match self.try_recv() {
+            Ok(item) => Ok(Some(item)),
+            Err(e) => match e {
+                TryRecvError::Empty => Ok(None),
+                TryRecvError::Disconnected => Err(Box::new(e)),
+            },
+        }
+    }
+}