Add metrics APIs. (#44)

diff --git a/Cargo.toml b/Cargo.toml
index aef34c9..b4de204 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -43,6 +43,7 @@
 cfg-if = "1.0.0"
 futures-core = "0.3.21"
 futures-util = "0.3.21"
+portable-atomic = { version = "0.3.13", features = ["float"] }
 prost = "0.11.0"
 prost-derive = "0.11.0"
 serde = { version = "1.0.143", features = ["derive"] }
@@ -68,6 +69,10 @@
 name = "logging"
 required-features = ["mock"]
 
+[[test]]
+name = "metrics"
+required-features = ["mock"]
+
 [[example]]
 name = "simple_trace_report"
 path = "examples/simple_trace_report.rs"
diff --git a/README.md b/README.md
index 7cf924a..97211d4 100644
--- a/README.md
+++ b/README.md
@@ -47,6 +47,14 @@
 
 LogRecord is the simple builder for the LogData, which is the Log format of Skywalking.
 
+## Metrics
+
+### Meter
+
+- **Counter** API represents a single monotonically increasing counter which automatically collects data and reports to the backend.
+- **Gauge** API represents a single numerical value.
+- **Histogram** API represents a summary sample observations with customized buckets.
+
 # Example
 
 ```rust, no_run
@@ -54,6 +62,7 @@
     logging::{logger::Logger, record::{LogRecord, RecordType}},
     reporter::grpc::GrpcReporter,
     trace::tracer::Tracer,
+    metrics::{meter::Counter, metricer::Metricer},
 };
 use std::error::Error;
 use tokio::signal;
@@ -94,6 +103,18 @@
     // Auto report ctx when dropped.
 }
 
+async fn handle_metric(mut metricer: Metricer) {
+    let counter = metricer.register(
+        Counter::new("instance_trace_count")
+            .add_label("region", "us-west")
+            .add_label("az", "az-1"),
+    );
+
+    metricer.boot().await;
+
+    counter.increment(10.);
+}
+
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error>> {
     // Connect to skywalking oap server.
@@ -109,7 +130,10 @@
         .spawn();
 
     let tracer = Tracer::new("service", "instance", reporter.clone());
-    let logger = Logger::new("service", "instance", reporter);
+    let logger = Logger::new("service", "instance", reporter.clone());
+    let metricer = Metricer::new("service", "instance", reporter);
+
+    handle_metric(metricer).await;
 
     handle_request(tracer, logger).await;
 
@@ -167,7 +191,7 @@
 
 # Release
 
-The SkyWalking committer(PMC included) could follow [this doc](Release-guide.md) to release an official version.
+The SkyWalking committer(PMC included) could follow [this doc](https://github.com/apache/skywalking-rust/blob/master/Release-guide.md) to release an official version.
 
 # License
 
diff --git a/build.rs b/build.rs
index fa64567..7ef9a20 100644
--- a/build.rs
+++ b/build.rs
@@ -24,6 +24,7 @@
         .compile(
             &[
                 "./skywalking-data-collect-protocol/language-agent/Tracing.proto",
+                "./skywalking-data-collect-protocol/language-agent/Meter.proto",
                 "./skywalking-data-collect-protocol/logging/Logging.proto",
             ],
             &["./skywalking-data-collect-protocol"],
diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml
index 9778a2a..6551e9e 100644
--- a/e2e/data/expected_context.yaml
+++ b/e2e/data/expected_context.yaml
@@ -146,3 +146,33 @@
         value: DEBUG
     timestamp: gt 0
   serviceName: consumer
+
+meterItems:
+- serviceName: consumer
+  meterSize: 3
+  meters:
+  - meterId:
+      name: instance_trace_count
+      tags:
+      - name: region
+        value: us-west
+      - name: az
+        value: az-1
+    singleValue: 30.0
+  - meterId:
+      name: instance_trace_count
+      tags:
+      - name: region
+        value: us-east
+      - name: az
+        value: az-3
+    singleValue: 20.0
+  - meterId:
+      name: instance_trace_count
+      tags:
+      - name: region
+        value: us-north
+      - name: az
+        value: az-1
+    histogramBuckets: [10.0, 20.0, 30.0]
+    histogramValues: [1, 2, 0]
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index f3ffd57..dc88364 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -25,6 +25,10 @@
         logger::{self, Logger},
         record::{LogRecord, RecordType},
     },
+    metrics::{
+        meter::{Counter, Gauge, Histogram},
+        metricer::Metricer,
+    },
     reporter::grpc::GrpcReporter,
     trace::{
         propagation::{
@@ -172,6 +176,33 @@
     }
 }
 
+fn run_consumer_metric(mut metricer: Metricer) {
+    let counter = metricer.register(
+        Counter::new("instance_trace_count")
+            .add_label("region", "us-west")
+            .add_label("az", "az-1"),
+    );
+    metricer.register(
+        Gauge::new("instance_trace_count", || 20.)
+            .add_label("region", "us-east")
+            .add_label("az", "az-3"),
+    );
+    let histogram = metricer.register(
+        Histogram::new("instance_trace_count", vec![10., 20., 30.])
+            .add_label("region", "us-north")
+            .add_label("az", "az-1"),
+    );
+
+    counter.increment(10.);
+    counter.increment(20.);
+
+    histogram.add_value(10.);
+    histogram.add_value(29.);
+    histogram.add_value(20.);
+
+    metricer.boot();
+}
+
 #[derive(StructOpt)]
 #[structopt(name = "basic")]
 struct Opt {
@@ -187,7 +218,8 @@
 
     if opt.mode == "consumer" {
         tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone()));
-        logger::set_global_logger(Logger::new("consumer", "node_0", reporter));
+        logger::set_global_logger(Logger::new("consumer", "node_0", reporter.clone()));
+        run_consumer_metric(Metricer::new("consumer", "node_0", reporter));
         run_consumer_service([0, 0, 0, 0]).await;
     } else if opt.mode == "producer" {
         tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone()));
diff --git a/examples/simple_metric_report.rs b/examples/simple_metric_report.rs
new file mode 100644
index 0000000..ef4f9c8
--- /dev/null
+++ b/examples/simple_metric_report.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+use skywalking::{
+    metrics::{meter::Counter, metricer::Metricer},
+    reporter::grpc::GrpcReporter,
+};
+use std::error::Error;
+use tokio::signal;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+    // Connect to skywalking oap server.
+    let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+
+    // Spawn the reporting in background, with listening the graceful shutdown
+    // signal.
+    let handle = reporter
+        .reporting()
+        .await
+        .with_graceful_shutdown(async move {
+            signal::ctrl_c().await.expect("failed to listen for event");
+        })
+        .spawn();
+
+    // Do metrics.
+    let mut metricer = Metricer::new("service", "instance", reporter.clone());
+    let counter = metricer.register(
+        Counter::new("instance_trace_count")
+            .add_label("region", "us-west")
+            .add_label("az", "az-1"),
+    );
+
+    counter.increment(1.);
+
+    metricer.boot().await.unwrap();
+    handle.await.unwrap();
+
+    Ok(())
+}
diff --git a/src/common/system_time.rs b/src/common/system_time.rs
index ee97b4e..6513e81 100644
--- a/src/common/system_time.rs
+++ b/src/common/system_time.rs
@@ -19,6 +19,7 @@
 pub(crate) enum TimePeriod {
     Start,
     Log,
+    Metric,
     End,
 }
 
@@ -28,6 +29,7 @@
             match period {
                 TimePeriod::Start => 1,
                 TimePeriod::Log => 10,
+                TimePeriod::Metric => 10,
                 TimePeriod::End => 100,
             }
         }
diff --git a/src/lib.rs b/src/lib.rs
index 9cfd58b..7d4cc0c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -20,6 +20,7 @@
 pub mod common;
 pub(crate) mod error;
 pub mod logging;
+pub mod metrics;
 pub mod reporter;
 pub mod skywalking_proto;
 pub mod trace;
diff --git a/src/logging/logger.rs b/src/logging/logger.rs
index 2571ea7..f1bb2dc 100644
--- a/src/logging/logger.rs
+++ b/src/logging/logger.rs
@@ -22,8 +22,8 @@
 static GLOBAL_LOGGER: OnceCell<Logger> = OnceCell::const_new();
 
 /// Set the global logger.
-pub fn set_global_logger(tracer: Logger) {
-    if GLOBAL_LOGGER.set(tracer).is_err() {
+pub fn set_global_logger(logger: Logger) {
+    if GLOBAL_LOGGER.set(logger).is_err() {
         panic!("global logger has setted")
     }
 }
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 87f22cf..02f5a83 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -55,16 +55,19 @@
         Default::default()
     }
 
+    #[inline]
     pub fn custome_time(mut self, time: SystemTime) -> Self {
         self.time = Some(time);
         self
     }
 
+    #[inline]
     pub fn ignore_time(mut self) -> Self {
         self.is_ignore_time = true;
         self
     }
 
+    #[inline]
     pub fn endpoint(mut self, endpoint: impl ToString) -> Self {
         self.endpoint = endpoint.to_string();
         self
diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs
new file mode 100644
index 0000000..ea97852
--- /dev/null
+++ b/src/metrics/meter.rs
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use crate::{
+    common::system_time::{fetch_time, TimePeriod},
+    metrics::metricer::Metricer,
+    skywalking_proto::v3::{
+        meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue,
+    },
+};
+use portable_atomic::AtomicF64;
+use std::{
+    cmp::Ordering::Equal,
+    sync::atomic::{AtomicI64, Ordering},
+};
+
+pub trait Transform: Send + Sync {
+    fn meter_id(&self) -> MeterId;
+
+    fn transform(&self, metricer: &Metricer) -> MeterData;
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub(crate) enum MeterType {
+    Counter,
+    Gauge,
+    Histogram,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct MeterId {
+    name: String,
+    typ: MeterType,
+    labels: Vec<(String, String)>,
+}
+
+impl MeterId {
+    fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+        self.labels.push((key.to_string(), value.to_string()));
+        self
+    }
+
+    fn add_labels<K, V, I>(mut self, tags: I) -> Self
+    where
+        K: ToString,
+        V: ToString,
+        I: IntoIterator<Item = (K, V)>,
+    {
+        self.labels.extend(
+            tags.into_iter()
+                .map(|(k, v)| (k.to_string(), v.to_string())),
+        );
+        self
+    }
+}
+
+/// Counter mode.
+pub enum CounterMode {
+    /// INCREMENT mode represents reporting the latest value.
+    INCREMENT,
+
+    /// RATE mode represents reporting the increment rate. Value = latest value
+    /// - last reported value.
+    RATE,
+}
+
+pub struct Counter {
+    id: MeterId,
+    mode: CounterMode,
+    count: AtomicF64,
+    previous_count: AtomicF64,
+}
+
+impl Counter {
+    #[inline]
+    pub fn new(name: impl ToString) -> Self {
+        Self {
+            id: MeterId {
+                name: name.to_string(),
+                typ: MeterType::Counter,
+                labels: vec![],
+            },
+            mode: CounterMode::INCREMENT,
+            count: AtomicF64::new(0.),
+            previous_count: AtomicF64::new(0.),
+        }
+    }
+
+    #[inline]
+    pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+        self.id = self.id.add_label(key, value);
+        self
+    }
+
+    #[inline]
+    pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
+    where
+        K: ToString,
+        V: ToString,
+        I: IntoIterator<Item = (K, V)>,
+    {
+        self.id = self.id.add_labels(tags);
+        self
+    }
+
+    #[inline]
+    pub fn mode(mut self, mode: CounterMode) -> Self {
+        self.mode = mode;
+        self
+    }
+
+    pub fn increment(&self, count: f64) {
+        self.count.fetch_add(count, Ordering::Acquire);
+    }
+
+    pub fn get(&self) -> f64 {
+        self.count.load(Ordering::Acquire)
+    }
+}
+
+impl Transform for Counter {
+    fn meter_id(&self) -> MeterId {
+        self.id.clone()
+    }
+
+    fn transform(&self, metricer: &Metricer) -> MeterData {
+        MeterData {
+            service: metricer.service_name().to_owned(),
+            service_instance: metricer.instance_name().to_owned(),
+            timestamp: fetch_time(TimePeriod::Metric),
+            metric: Some(Metric::SingleValue(MeterSingleValue {
+                name: self.id.name.to_owned(),
+                labels: self
+                    .id
+                    .labels
+                    .iter()
+                    .map(|(name, value)| Label {
+                        name: name.clone(),
+                        value: value.clone(),
+                    })
+                    .collect(),
+                value: match self.mode {
+                    CounterMode::INCREMENT => self.get(),
+                    CounterMode::RATE => {
+                        let current_count = self.get();
+                        let previous_count =
+                            self.previous_count.swap(current_count, Ordering::Acquire);
+                        current_count - previous_count
+                    }
+                },
+            })),
+        }
+    }
+}
+
+pub struct Gauge<G> {
+    id: MeterId,
+    getter: G,
+}
+
+impl<G: Fn() -> f64> Gauge<G> {
+    #[inline]
+    pub fn new(name: impl ToString, getter: G) -> Self {
+        Self {
+            id: MeterId {
+                name: name.to_string(),
+                typ: MeterType::Gauge,
+                labels: vec![],
+            },
+            getter,
+        }
+    }
+
+    #[inline]
+    pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+        self.id = self.id.add_label(key, value);
+        self
+    }
+
+    #[inline]
+    pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
+    where
+        K: ToString,
+        V: ToString,
+        I: IntoIterator<Item = (K, V)>,
+    {
+        self.id = self.id.add_labels(tags);
+        self
+    }
+
+    pub fn get(&self) -> f64 {
+        (self.getter)()
+    }
+}
+
+impl<G: Fn() -> f64 + Send + Sync> Transform for Gauge<G> {
+    fn meter_id(&self) -> MeterId {
+        self.id.clone()
+    }
+
+    fn transform(&self, metricer: &Metricer) -> MeterData {
+        MeterData {
+            service: metricer.service_name().to_owned(),
+            service_instance: metricer.instance_name().to_owned(),
+            timestamp: fetch_time(TimePeriod::Metric),
+            metric: Some(Metric::SingleValue(MeterSingleValue {
+                name: self.id.name.to_owned(),
+                labels: self
+                    .id
+                    .labels
+                    .iter()
+                    .map(|(name, value)| Label {
+                        name: name.clone(),
+                        value: value.clone(),
+                    })
+                    .collect(),
+                value: self.get(),
+            })),
+        }
+    }
+}
+
+struct Bucket {
+    bucket: f64,
+    count: AtomicI64,
+}
+
+impl Bucket {
+    fn new(bucker: f64) -> Self {
+        Self {
+            bucket: bucker,
+            count: Default::default(),
+        }
+    }
+}
+
+pub struct Histogram {
+    id: MeterId,
+    buckets: Vec<Bucket>,
+}
+
+impl Histogram {
+    pub fn new(name: impl ToString, mut steps: Vec<f64>) -> Self {
+        Self {
+            id: MeterId {
+                name: name.to_string(),
+                typ: MeterType::Histogram,
+                labels: vec![],
+            },
+            buckets: {
+                steps.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Equal));
+                steps.dedup();
+                steps.into_iter().map(Bucket::new).collect()
+            },
+        }
+    }
+
+    #[inline]
+    pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
+        self.id = self.id.add_label(key, value);
+        self
+    }
+
+    #[inline]
+    pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
+    where
+        K: ToString,
+        V: ToString,
+        I: IntoIterator<Item = (K, V)>,
+    {
+        self.id = self.id.add_labels(tags);
+        self
+    }
+
+    pub fn add_value(&self, value: f64) {
+        if let Some(index) = self.find_bucket(value) {
+            self.buckets[index].count.fetch_add(1, Ordering::Acquire);
+        }
+    }
+
+    fn find_bucket(&self, value: f64) -> Option<usize> {
+        match self
+            .buckets
+            .binary_search_by(|bucket| bucket.bucket.partial_cmp(&value).unwrap_or(Equal))
+        {
+            Ok(i) => Some(i),
+            Err(i) => {
+                if i >= 1 {
+                    Some(i - 1)
+                } else {
+                    None
+                }
+            }
+        }
+    }
+}
+
+impl Transform for Histogram {
+    fn meter_id(&self) -> MeterId {
+        self.id.clone()
+    }
+
+    fn transform(&self, metricer: &Metricer) -> MeterData {
+        MeterData {
+            service: metricer.service_name().to_owned(),
+            service_instance: metricer.instance_name().to_owned(),
+            timestamp: fetch_time(TimePeriod::Metric),
+            metric: Some(Metric::Histogram(MeterHistogram {
+                name: self.id.name.to_owned(),
+                labels: self
+                    .id
+                    .labels
+                    .iter()
+                    .map(|(name, value)| Label {
+                        name: name.clone(),
+                        value: value.clone(),
+                    })
+                    .collect(),
+                values: self
+                    .buckets
+                    .iter()
+                    .map(|bucket| MeterBucketValue {
+                        bucket: bucket.bucket,
+                        count: bucket.count.load(Ordering::Acquire),
+                        is_negative_infinity: false,
+                    })
+                    .collect(),
+            })),
+        }
+    }
+}
diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs
new file mode 100644
index 0000000..81b8351
--- /dev/null
+++ b/src/metrics/metricer.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use super::meter::{MeterId, Transform};
+use crate::reporter::{CollectItem, DynReport, Report};
+use std::{
+    collections::HashMap,
+    future::Future,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+    time::Duration,
+};
+use tokio::{
+    select, spawn,
+    sync::mpsc,
+    task::{spawn_blocking, JoinError, JoinHandle},
+    time::interval,
+};
+
+pub struct Metricer {
+    service_name: String,
+    instance_name: String,
+    reporter: Box<DynReport>,
+    meter_map: HashMap<MeterId, Arc<dyn Transform>>,
+    report_interval: Duration,
+}
+
+impl Metricer {
+    /// New with service info and reporter.
+    pub fn new(
+        service_name: impl ToString,
+        instance_name: impl ToString,
+        reporter: impl Report + Send + Sync + 'static,
+    ) -> Self {
+        Self {
+            service_name: service_name.to_string(),
+            instance_name: instance_name.to_string(),
+            reporter: Box::new(reporter),
+            meter_map: Default::default(),
+            report_interval: Duration::from_secs(20),
+        }
+    }
+
+    pub fn service_name(&self) -> &str {
+        &self.service_name
+    }
+
+    pub fn instance_name(&self) -> &str {
+        &self.instance_name
+    }
+
+    pub fn set_report_interval(&mut self, report_interval: Duration) {
+        self.report_interval = report_interval;
+    }
+
+    pub fn register<T: Transform + 'static>(&mut self, transform: T) -> Arc<T> {
+        let transform = Arc::new(transform);
+        self.meter_map
+            .insert(transform.meter_id(), transform.clone());
+        transform
+    }
+
+    pub fn boot(self) -> Booting {
+        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
+
+        let handle = spawn(async move {
+            let mut ticker = interval(self.report_interval);
+            let metricer = Arc::new(self);
+            loop {
+                let metricer_ = metricer.clone();
+                let _ = spawn_blocking(move || {
+                    for trans in metricer_.meter_map.values() {
+                        metricer_
+                            .reporter
+                            .report(CollectItem::Meter(trans.transform(&metricer_)));
+                    }
+                })
+                .await;
+
+                select! {
+                    _ = ticker.tick() => {}
+                    _ = shutdown_rx.recv() => { return; }
+                }
+            }
+        });
+        Booting {
+            handle,
+            shutdown_tx,
+        }
+    }
+}
+
+pub struct Booting {
+    handle: JoinHandle<()>,
+    shutdown_tx: mpsc::Sender<()>,
+}
+
+impl Booting {
+    pub async fn shutdown(self) -> crate::Result<()> {
+        self.shutdown_tx.send(()).await.unwrap();
+        Ok(self.await?)
+    }
+}
+
+impl Future for Booting {
+    type Output = Result<(), JoinError>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        Pin::new(&mut self.handle).poll(cx)
+    }
+}
diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs
new file mode 100644
index 0000000..3c67e08
--- /dev/null
+++ b/src/metrics/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+pub mod meter;
+pub mod metricer;
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index eb92517..fcea63d 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -18,7 +18,8 @@
     reporter::{CollectItem, Report},
     skywalking_proto::v3::{
         log_report_service_client::LogReportServiceClient,
-        trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData,
+        meter_report_service_client::MeterReportServiceClient,
+        trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, MeterData,
         SegmentObject,
     },
 };
@@ -102,6 +103,7 @@
 struct Inner<P, C> {
     trace_client: Mutex<TraceSegmentReportServiceClient<Channel>>,
     log_client: Mutex<LogReportServiceClient<Channel>>,
+    meter_client: Mutex<MeterReportServiceClient<Channel>>,
     producer: P,
     consumer: Mutex<Option<C>>,
     is_reporting: AtomicBool,
@@ -135,7 +137,8 @@
         Self {
             inner: Arc::new(Inner {
                 trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())),
-                log_client: Mutex::new(LogReportServiceClient::new(channel)),
+                log_client: Mutex::new(LogReportServiceClient::new(channel.clone())),
+                meter_client: Mutex::new(MeterReportServiceClient::new(channel)),
                 producer,
                 consumer: Mutex::new(Some(consumer)),
                 is_reporting: Default::default(),
@@ -168,9 +171,10 @@
         Reporting {
             rb: ReporterAndBuffer {
                 inner: Arc::clone(&self.inner),
+                status_handle: None,
                 trace_buffer: Default::default(),
                 log_buffer: Default::default(),
-                status_handle: None,
+                meter_buffer: Default::default(),
             },
             shutdown_signal: Box::pin(pending()),
             consumer: self.inner.consumer.lock().await.take().unwrap(),
@@ -201,9 +205,10 @@
 
 struct ReporterAndBuffer<P, C> {
     inner: Arc<Inner<P, C>>,
+    status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
     trace_buffer: LinkedList<SegmentObject>,
     log_buffer: LinkedList<LogData>,
-    status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
+    meter_buffer: LinkedList<MeterData>,
 }
 
 impl<P: CollectItemProduce, C: ColletcItemConsume> ReporterAndBuffer<P, C> {
@@ -216,6 +221,9 @@
             CollectItem::Log(item) => {
                 self.log_buffer.push_back(item);
             }
+            CollectItem::Meter(item) => {
+                self.meter_buffer.push_back(item);
+            }
         }
 
         if !self.trace_buffer.is_empty() {
@@ -248,6 +256,22 @@
                 }
             }
         }
+
+        if !self.meter_buffer.is_empty() {
+            let buffer = take(&mut self.meter_buffer);
+            if let Err(e) = self
+                .inner
+                .meter_client
+                .lock()
+                .await
+                .collect(stream::iter(buffer))
+                .await
+            {
+                if let Some(status_handle) = &self.status_handle {
+                    status_handle(e);
+                }
+            }
+        }
     }
 }
 
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 044afa2..79735bf 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -17,7 +17,7 @@
 pub mod grpc;
 pub mod print;
 
-use crate::skywalking_proto::v3::{LogData, SegmentObject};
+use crate::skywalking_proto::v3::{LogData, MeterData, SegmentObject};
 use serde::{Deserialize, Serialize};
 use std::{ops::Deref, sync::Arc};
 use tokio::sync::OnceCell;
@@ -27,6 +27,7 @@
 pub enum CollectItem {
     Trace(SegmentObject),
     Log(LogData),
+    Meter(MeterData),
 }
 
 pub(crate) type DynReport = dyn Report + Send + Sync + 'static;
diff --git a/src/reporter/print.rs b/src/reporter/print.rs
index 6b355e5..e5b640f 100644
--- a/src/reporter/print.rs
+++ b/src/reporter/print.rs
@@ -36,11 +36,11 @@
 impl Report for PrintReporter {
     fn report(&self, items: CollectItem) {
         match items {
-            CollectItem::Trace(segment) => {
+            CollectItem::Trace(data) => {
                 if self.use_stderr {
-                    eprintln!("trace segment={:?}", segment);
+                    eprintln!("trace segment={:?}", data);
                 } else {
-                    println!("trace segment={:?}", segment);
+                    println!("trace segment={:?}", data);
                 }
             }
             CollectItem::Log(data) => {
@@ -50,6 +50,13 @@
                     println!("log data={:?}", data);
                 }
             }
+            CollectItem::Meter(data) => {
+                if self.use_stderr {
+                    eprintln!("meter data={:?}", data);
+                } else {
+                    println!("meter data={:?}", data);
+                }
+            }
         }
     }
 }
diff --git a/tests/metrics.rs b/tests/metrics.rs
new file mode 100644
index 0000000..f1d95d0
--- /dev/null
+++ b/tests/metrics.rs
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use skywalking::{
+    metrics::{
+        meter::{Counter, Gauge, Histogram},
+        metricer::Metricer,
+    },
+    reporter::{CollectItem, Report},
+    skywalking_proto::v3::{
+        meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue,
+    },
+};
+use std::{
+    collections::LinkedList,
+    sync::{Arc, Mutex},
+};
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn metrics() {
+    let reporter = Arc::new(MockReporter::default());
+
+    {
+        let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone());
+        let counter = metricer.register(
+            Counter::new("instance_trace_count")
+                .add_label("region", "us-west")
+                .add_labels([("az", "az-1")]),
+        );
+        counter.increment(100.);
+
+        metricer.boot().shutdown().await.unwrap();
+
+        assert_eq!(
+            reporter.pop(),
+            MeterData {
+                timestamp: 10,
+                service: "service_name".to_owned(),
+                service_instance: "instance_name".to_owned(),
+                metric: Some(Metric::SingleValue(MeterSingleValue {
+                    name: "instance_trace_count".to_owned(),
+                    labels: vec![
+                        Label {
+                            name: "region".to_owned(),
+                            value: "us-west".to_owned()
+                        },
+                        Label {
+                            name: "az".to_owned(),
+                            value: "az-1".to_owned()
+                        },
+                    ],
+                    value: 100.
+                })),
+            }
+        );
+    }
+
+    {
+        let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone());
+        metricer.register(
+            Gauge::new("instance_trace_count", || 100.)
+                .add_label("region", "us-west")
+                .add_labels([("az", "az-1")]),
+        );
+
+        metricer.boot().shutdown().await.unwrap();
+
+        assert_eq!(
+            reporter.pop(),
+            MeterData {
+                timestamp: 10,
+                service: "service_name".to_owned(),
+                service_instance: "instance_name".to_owned(),
+                metric: Some(Metric::SingleValue(MeterSingleValue {
+                    name: "instance_trace_count".to_owned(),
+                    labels: vec![
+                        Label {
+                            name: "region".to_owned(),
+                            value: "us-west".to_owned()
+                        },
+                        Label {
+                            name: "az".to_owned(),
+                            value: "az-1".to_owned()
+                        },
+                    ],
+                    value: 100.
+                })),
+            }
+        );
+    }
+
+    {
+        let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone());
+        let histogram = metricer.register(
+            Histogram::new("instance_trace_count", vec![1., 2.])
+                .add_label("region", "us-west")
+                .add_labels([("az", "az-1")]),
+        );
+        histogram.add_value(1.);
+        histogram.add_value(1.5);
+        histogram.add_value(2.);
+
+        metricer.boot().shutdown().await.unwrap();
+
+        assert_eq!(
+            reporter.pop(),
+            MeterData {
+                timestamp: 10,
+                service: "service_name".to_owned(),
+                service_instance: "instance_name".to_owned(),
+                metric: Some(Metric::Histogram(MeterHistogram {
+                    name: "instance_trace_count".to_owned(),
+                    labels: vec![
+                        Label {
+                            name: "region".to_owned(),
+                            value: "us-west".to_owned()
+                        },
+                        Label {
+                            name: "az".to_owned(),
+                            value: "az-1".to_owned()
+                        },
+                    ],
+                    values: vec![
+                        MeterBucketValue {
+                            bucket: 1.,
+                            count: 2,
+                            is_negative_infinity: false
+                        },
+                        MeterBucketValue {
+                            bucket: 2.,
+                            count: 1,
+                            is_negative_infinity: false
+                        },
+                    ]
+                })),
+            }
+        );
+    }
+}
+
+#[derive(Default, Clone)]
+struct MockReporter {
+    items: Arc<Mutex<LinkedList<MeterData>>>,
+}
+
+impl MockReporter {
+    fn pop(&self) -> MeterData {
+        self.items.try_lock().unwrap().pop_back().unwrap()
+    }
+}
+
+impl Report for MockReporter {
+    fn report(&self, item: CollectItem) {
+        match item {
+            CollectItem::Meter(data) => {
+                self.items.try_lock().unwrap().push_back(data);
+            }
+            _ => {}
+        }
+    }
+}