Add kafka reporter. (#88)

diff --git a/.github/workflows/pecl.yml b/.github/workflows/pecl.yml
index 5535c87..4384173 100644
--- a/.github/workflows/pecl.yml
+++ b/.github/workflows/pecl.yml
@@ -52,6 +52,9 @@
         version:
           - php: "8.2"
             swoole: "5.0.0"
+        option:
+          - 'enable-cargo-debug=\"no\" enable-kafka-reporter=\"no\"'
+          - 'enable-cargo-debug=\"no\" enable-kafka-reporter=\"yes\"'
 
     runs-on: ${{ matrix.os }}
     steps:
@@ -77,14 +80,14 @@
 
       - name: Install Rust Stable Globally
         run: |
-          curl https://sh.rustup.rs -sSf | sudo -E sh -s -- -y --no-modify-path
-          sudo ln -sf $CARGO_HOME/bin/rustup /usr/local/bin/rustup
-          sudo ln -sf $CARGO_HOME/bin/rustc /usr/local/bin/rustc
-          sudo ln -sf $CARGO_HOME/bin/cargo /usr/local/bin/cargo
+          curl https://sh.rustup.rs -sSf | sudo -E sh -s -- -y --default-toolchain none
 
       - name: PECL install
         run: |
-          sudo -E cargo run -p scripts --release -- create-package-xml --version 0.0.0 --notes "Just for TEST."
-          cat package.xml
-          printf "\n" | sudo -E pecl install package.xml
+          sudo bash -c "\
+            source /tmp/cargo/env && \
+            cargo version && \
+            cargo run -p scripts --release -- create-package-xml --version 0.0.0 --notes 'Just for TEST.' && \
+            cat package.xml && \
+            pecl install -D '${{ matrix.option }}' package.xml"
           php -d "extension=skywalking_agent" --ri skywalking_agent
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 73bd75c..4ca62e4 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -52,35 +52,48 @@
         os:
           - ubuntu-20.04
           - macos-12
-        version:
+        flag:
           # Many composer dependencies need PHP 7.2+
-          - php: "7.2"
-            swoole: "4.6.7"
+          - php_version: "7.2"
+            swoole_version: "4.6.7"
             enable_zend_observer: "Off"
-          - php: "7.3"
-            swoole: "4.7.1"
+            cargo_features: ""
+          - php_version: "7.3"
+            swoole_version: "4.7.1"
             enable_zend_observer: "Off"
-          - php: "7.4"
-            swoole: "4.8.10"
+            cargo_features: ""
+          - php_version: "7.4"
+            swoole_version: "4.8.10"
             enable_zend_observer: "Off"
-          - php: "8.0"
-            swoole: "5.0.0"
+            cargo_features: ""
+          - php_version: "8.0"
+            swoole_version: "5.0.0"
             enable_zend_observer: "Off"
-          - php: "8.0"
-            swoole: "5.0.0"
+            cargo_features: ""
+          - php_version: "8.0"
+            swoole_version: "5.0.0"
             enable_zend_observer: "On"
-          - php: "8.1"
-            swoole: "5.0.0"
+            cargo_features: ""
+          - php_version: "8.1"
+            swoole_version: "5.0.0"
             enable_zend_observer: "Off"
-          - php: "8.1"
-            swoole: "5.0.0"
+            cargo_features: ""
+          - php_version: "8.1"
+            swoole_version: "5.0.0"
             enable_zend_observer: "On"
-          - php: "8.2"
-            swoole: "5.0.0"
+            cargo_features: ""
+          - php_version: "8.2"
+            swoole_version: "5.0.0"
             enable_zend_observer: "Off"
-          - php: "8.2"
-            swoole: "5.0.0"
+            cargo_features: ""
+          - php_version: "8.2"
+            swoole_version: "5.0.0"
             enable_zend_observer: "On"
+            cargo_features: ""
+          - php_version: "8.2"
+            swoole_version: "5.0.0"
+            enable_zend_observer: "On"
+            cargo_features: "--features kafka-reporter"
 
     runs-on: ${{ matrix.os }}
     steps:
@@ -100,19 +113,19 @@
       - name: Setup PHP
         uses: shivammathur/setup-php@v2
         with:
-          php-version: ${{ matrix.version.php }}
+          php-version: ${{ matrix.flag.php_version }}
           tools: php-config, composer:v2
           extensions: >
             bcmath, calendar, ctype, dom, exif, gettext, iconv, intl, json, mbstring,
             mysqli, mysqlnd, opcache, pdo, pdo_mysql, phar, posix, readline, redis,
-            memcached, swoole-${{ matrix.version.swoole }}, xml, xmlreader, xmlwriter,
+            memcached, swoole-${{ matrix.flag.swoole_version }}, xml, xmlreader, xmlwriter,
             yaml, zip, mongodb
 
       - name: Setup php-fpm for Linux
         if: matrix.os == 'ubuntu-20.04'
         run: |
-          sudo apt-get install -y php${{ matrix.version.php }}-fpm
-          sudo ln -sf /usr/sbin/php-fpm${{ matrix.version.php }} /usr/sbin/php-fpm
+          sudo apt-get install -y php${{ matrix.flag.php_version }}-fpm
+          sudo ln -sf /usr/sbin/php-fpm${{ matrix.flag.php_version }} /usr/sbin/php-fpm
 
       - name: PHP version
         run: |
@@ -122,9 +135,9 @@
           php -r 'echo "Swoole version: " . phpversion("swoole") . "\n";'
           composer --version
 
-          [[ `php --version` == PHP\ ${{ matrix.version.php }}.* ]] || exit 1;
-          [[ `php-fpm --version` == PHP\ ${{ matrix.version.php }}.* ]] || exit 1;
-          [[ `php-config --version` == ${{ matrix.version.php }}.* ]] || exit 1;
+          [[ `php --version` == PHP\ ${{ matrix.flag.php_version }}.* ]] || exit 1;
+          [[ `php-fpm --version` == PHP\ ${{ matrix.flag.php_version }}.* ]] || exit 1;
+          [[ `php-config --version` == ${{ matrix.flag.php_version }}.* ]] || exit 1;
 
       - name: Install Rust
         uses: actions-rs/toolchain@v1
@@ -142,21 +155,21 @@
             ~/.cargo/registry/cache/
             ~/.cargo/git/db/
             target/
-          key: ${{ matrix.os }}-test-${{ matrix.version.php }}-${{ hashFiles('**/Cargo.lock') }}
+          key: ${{ matrix.os }}-test-${{ matrix.flag.php_version }}-${{ hashFiles('**/Cargo.lock') }}
 
       - name: Cargo clippy
         uses: actions-rs/cargo@v1
         with:
           toolchain: ${{ env.RUST_STABLE_TOOLCHAIN }}
           command: clippy
-          args: --release --workspace
+          args: --release --workspace ${{ matrix.flag.cargo_features }}
 
       - name: Cargo build
         uses: actions-rs/cargo@v1
         with:
           toolchain: ${{ env.RUST_STABLE_TOOLCHAIN }}
           command: build
-          args: --release --workspace
+          args: --release --workspace ${{ matrix.flag.cargo_features }}
 
       - name: Composer install
         run: composer install --working-dir=tests/php
@@ -178,9 +191,9 @@
         with:
           toolchain: ${{ env.RUST_STABLE_TOOLCHAIN }}
           command: test
-          args: --release --workspace
+          args: --release --workspace ${{ matrix.flag.cargo_features }}
         env:
-          ENABLE_ZEND_OBSERVER: ${{ matrix.version.enable_zend_observer }}
+          ENABLE_ZEND_OBSERVER: ${{ matrix.flag.enable_zend_observer }}
         continue-on-error: true
 
       # Rebuild the mixture when cargo test failed.
@@ -207,7 +220,7 @@
           command: test
           args: --release --workspace
         env:
-          ENABLE_ZEND_OBSERVER: ${{ matrix.version.enable_zend_observer }}
+          ENABLE_ZEND_OBSERVER: ${{ matrix.flag.enable_zend_observer }}
 
       - name: View logs
         if: always()
diff --git a/Cargo.lock b/Cargo.lock
index b92eaad..c123d6f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1207,6 +1207,27 @@
 ]
 
 [[package]]
+name = "num_enum"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
+dependencies = [
+ "num_enum_derive",
+]
+
+[[package]]
+name = "num_enum_derive"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
+dependencies = [
+ "proc-macro-crate",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
 name = "object"
 version = "0.31.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1534,6 +1555,16 @@
 ]
 
 [[package]]
+name = "proc-macro-crate"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919"
+dependencies = [
+ "once_cell",
+ "toml_edit",
+]
+
+[[package]]
 name = "proc-macro2"
 version = "1.0.66"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1642,6 +1673,36 @@
 ]
 
 [[package]]
+name = "rdkafka"
+version = "0.32.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8733bc5dc0b192d1a4b28073f9bff1326ad9e4fecd4d9b025d6fc358d1c3e79"
+dependencies = [
+ "futures-channel",
+ "futures-util",
+ "libc",
+ "log",
+ "rdkafka-sys",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "slab",
+ "tokio",
+]
+
+[[package]]
+name = "rdkafka-sys"
+version = "4.5.0+1.9.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1bb0676c2112342ac7165decdedbc4e7086c0af384479ccce534546b10687a5d"
+dependencies = [
+ "libc",
+ "libz-sys",
+ "num_enum",
+ "pkg-config",
+]
+
+[[package]]
 name = "redox_syscall"
 version = "0.3.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2000,6 +2061,7 @@
  "portable-atomic 0.3.20",
  "prost",
  "prost-derive",
+ "rdkafka",
  "serde",
  "systemstat",
  "thiserror",
@@ -2322,6 +2384,23 @@
 ]
 
 [[package]]
+name = "toml_datetime"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
+
+[[package]]
+name = "toml_edit"
+version = "0.19.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
+dependencies = [
+ "indexmap 2.0.0",
+ "toml_datetime",
+ "winnow",
+]
+
+[[package]]
 name = "tonic"
 version = "0.8.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2911,6 +2990,15 @@
 checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
 
 [[package]]
+name = "winnow"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f46aab759304e4d7b2075a9aecba26228bb073ee8c50db796b2c72c676b5d807"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
 name = "winreg"
 version = "0.10.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 0345bd0..771a788 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,6 +35,9 @@
 name = "skywalking_agent"
 crate-type = ["lib", "cdylib"]
 
+[features]
+kafka-reporter = ["skywalking/kafka-reporter"]
+
 [dependencies]
 anyhow = { version = "1.0.72", features = ["backtrace"] }
 bincode = "1.3.3"
diff --git a/config.m4 b/config.m4
index b4f9560..edd4c0e 100644
--- a/config.m4
+++ b/config.m4
@@ -22,6 +22,9 @@
 PHP_ARG_ENABLE([cargo_debug], [whether to enable cargo debug mode],
 [  --enable-cargo-debug           Enable cargo debug], no, no)
 
+PHP_ARG_ENABLE([kafka_reporter], [whether to enable kafka reporter],
+[  --enable-kafka-reporter        Enable kafka reporter], no, no)
+
 if test "$PHP_THREAD_SAFETY" == "yes"; then
   AC_MSG_ERROR([skywalking_agent does not support ZTS])
 fi
@@ -43,19 +46,24 @@
 
   CARGO_MODE_FLAGS="--release"
   CARGO_MODE_DIR="release"
+  CARGO_FEATURES_FLAGS=""
 
   if test "$PHP_CARGO_DEBUG" != "no"; then
     CARGO_MODE_FLAGS=""
     CARGO_MODE_DIR="debug"
   fi
 
+  if test "$PHP_KAFKA_REPORTER" != "no"; then
+    CARGO_FEATURES_FLAGS="--features kafka-reporter"
+  fi
+
   cat >>Makefile.objects<< EOF
 all: cargo_build
 
 clean: cargo_clean
 
 cargo_build:
-	PHP_CONFIG=$PHP_PHP_CONFIG cargo build $CARGO_MODE_FLAGS
+	PHP_CONFIG=$PHP_PHP_CONFIG cargo build $CARGO_MODE_FLAGS $CARGO_FEATURES_FLAGS
 	if [[ -f ./target/$CARGO_MODE_DIR/libskywalking_agent.dylib ]] ; then \\
 		cp ./target/$CARGO_MODE_DIR/libskywalking_agent.dylib ./modules/skywalking_agent.so ; fi
 	if [[ -f ./target/$CARGO_MODE_DIR/libskywalking_agent.so ]] ; then \\
@@ -73,6 +81,7 @@
     Cargo.toml:Cargo.toml \
     build.rs:build.rs \
     docker-compose.yml:docker-compose.yml \
+    rust-toolchain.toml:rust-toolchain.toml \
     scripts:scripts \
     src:src \
     tests:tests \
diff --git a/dist-material/LICENSE b/dist-material/LICENSE
index 35c499b..70b7c5a 100644
--- a/dist-material/LICENSE
+++ b/dist-material/LICENSE
@@ -237,6 +237,15 @@
     https://crates.io/crates/wasi/0.11.0+wasi-snapshot-preview1 0.11.0+wasi-snapshot-preview1 Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT
 
 ========================================================================
+Apache-2.0 OR BSD-3-Clause OR MIT licenses
+========================================================================
+The following components are provided under the Apache-2.0 OR BSD-3-Clause OR MIT License. See project link for details.
+The text of each license is also included in licenses/LICENSE-[project].txt.
+
+    https://crates.io/crates/num_enum/0.5.11 0.5.11 Apache-2.0 OR BSD-3-Clause OR MIT
+    https://crates.io/crates/num_enum_derive/0.5.11 0.5.11 Apache-2.0 OR BSD-3-Clause OR MIT
+
+========================================================================
 Apache-2.0 OR BSL-1.0 licenses
 ========================================================================
 The following components are provided under the Apache-2.0 OR BSL-1.0 License. See project link for details.
@@ -374,6 +383,7 @@
     https://crates.io/crates/portable-atomic/1.4.2 1.4.2 Apache-2.0 OR MIT
     https://crates.io/crates/ppv-lite86/0.2.17 0.2.17 Apache-2.0 OR MIT
     https://crates.io/crates/prettyplease/0.1.25 0.1.25 Apache-2.0 OR MIT
+    https://crates.io/crates/proc-macro-crate/1.3.1 1.3.1 Apache-2.0 OR MIT
     https://crates.io/crates/proc-macro2/1.0.66 1.0.66 Apache-2.0 OR MIT
     https://crates.io/crates/quick-error/1.2.3 1.2.3 Apache-2.0 OR MIT
     https://crates.io/crates/quote/1.0.32 1.0.32 Apache-2.0 OR MIT
@@ -416,6 +426,8 @@
     https://crates.io/crates/time-core/0.1.1 0.1.1 Apache-2.0 OR MIT
     https://crates.io/crates/tokio-io-timeout/1.2.0 1.2.0 Apache-2.0 OR MIT
     https://crates.io/crates/tokio-rustls/0.23.4 0.23.4 Apache-2.0 OR MIT
+    https://crates.io/crates/toml_datetime/0.6.3 0.6.3 Apache-2.0 OR MIT
+    https://crates.io/crates/toml_edit/0.19.14 0.19.14 Apache-2.0 OR MIT
     https://crates.io/crates/trust-dns-proto/0.22.0 0.22.0 Apache-2.0 OR MIT
     https://crates.io/crates/trust-dns-resolver/0.22.0 0.22.0 Apache-2.0 OR MIT
     https://crates.io/crates/typenum/1.16.0 1.16.0 Apache-2.0 OR MIT
@@ -529,6 +541,8 @@
     https://crates.io/crates/phf_codegen/0.10.0 0.10.0 MIT
     https://crates.io/crates/phf_generator/0.10.0 0.10.0 MIT
     https://crates.io/crates/phf_shared/0.10.0 0.10.0 MIT
+    https://crates.io/crates/rdkafka/0.32.2 0.32.2 MIT
+    https://crates.io/crates/rdkafka-sys/4.5.0+1.9.2 4.5.0+1.9.2 MIT
     https://crates.io/crates/redox_syscall/0.3.5 0.3.5 MIT
     https://crates.io/crates/schannel/0.1.22 0.1.22 MIT
     https://crates.io/crates/sharded-slab/0.1.4 0.1.4 MIT
@@ -556,6 +570,7 @@
     https://crates.io/crates/valuable/0.1.0 0.1.0 MIT
     https://crates.io/crates/want/0.3.1 0.3.1 MIT
     https://crates.io/crates/which/4.4.0 4.4.0 MIT
+    https://crates.io/crates/winnow/0.5.3 0.5.3 MIT
     https://crates.io/crates/winreg/0.10.1 0.10.1 MIT
     https://crates.io/crates/winreg/0.50.0 0.50.0 MIT
 
diff --git a/docs/en/configuration/ini-settings.md b/docs/en/configuration/ini-settings.md
index bc48b4d..b8a24fd 100644
--- a/docs/en/configuration/ini-settings.md
+++ b/docs/en/configuration/ini-settings.md
@@ -2,21 +2,24 @@
 
 This is the configuration list supported in `php.ini`.
 
-| Configuration Item                               | Description                                                                                                              | Default Value             |
-| ------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------ | ------------------------- |
-| skywalking_agent.enable                          | Enable skywalking_agent extension or not.                                                                                | Off                       |
-| skywalking_agent.log_file                        | Log file path.                                                                                                           | /tmp/skywalking-agent.log |
-| skywalking_agent.log_level                       | Log level: one of `OFF`, `TRACE`, `DEBUG`, `INFO`, `WARN`, `ERROR`.                                                      | INFO                      |
-| skywalking_agent.runtime_dir                     | Skywalking agent runtime directory.                                                                                      | /tmp/skywalking-agent     |
-| skywalking_agent.server_addr                     | Address of skywalking oap server.                                                                                        | 127.0.0.1:11800           |
-| skywalking_agent.service_name                    | Application service name.                                                                                                | hello-skywalking          |
-| skywalking_agent.skywalking_version              | Skywalking version, 8 or 9.                                                                                              | 8                         |
-| skywalking_agent.authentication                  | Skywalking authentication token, let it empty if the backend isn't enabled.                                              |                           |
-| skywalking_agent.worker_threads                  | Skywalking worker threads, 0 will auto set as the cpu core size.                                                         | 0                         |
-| skywalking_agent.enable_tls                      | Wether to enable tls for gPRC, default is false.                                                                         | Off                       |
-| skywalking_agent.ssl_trusted_ca_path             | The gRPC SSL trusted ca file.                                                                                            |                           |
-| skywalking_agent.ssl_key_path                    | The private key file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist.                                   |                           |
-| skywalking_agent.ssl_cert_chain_path             | The certificate file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist.                                   |                           |
-| skywalking_agent.heartbeat_period                | Agent heartbeat report period. Unit, second.                                                                             | 30                        |
-| skywalking_agent.properties_report_period_factor | The agent sends the instance properties to the backend every heartbeat_period * properties_report_period_factor seconds. | 10                        |
-| skywalking_agent.enable_zend_observer            | Whether to use `zend observer` instead of `zend_execute_ex` to hook the functions, this feature is only available for PHP8+.  | Off                       |
+| Configuration Item                               | Description                                                                                                                           | Default Value             |
+| ------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------- | ------------------------- |
+| skywalking_agent.enable                          | Enable skywalking_agent extension or not.                                                                                             | Off                       |
+| skywalking_agent.log_file                        | Log file path.                                                                                                                        | /tmp/skywalking-agent.log |
+| skywalking_agent.log_level                       | Log level: one of `OFF`, `TRACE`, `DEBUG`, `INFO`, `WARN`, `ERROR`.                                                                   | INFO                      |
+| skywalking_agent.runtime_dir                     | Skywalking agent runtime directory.                                                                                                   | /tmp/skywalking-agent     |
+| skywalking_agent.server_addr                     | Address of skywalking oap server. Only available when `reporter_type` is `grpc`.                                                      | 127.0.0.1:11800           |
+| skywalking_agent.service_name                    | Application service name.                                                                                                             | hello-skywalking          |
+| skywalking_agent.skywalking_version              | Skywalking version, 8 or 9.                                                                                                           | 8                         |
+| skywalking_agent.authentication                  | Skywalking authentication token, let it empty if the backend isn't enabled. Only available when `reporter_type` is `grpc`.            |                           |
+| skywalking_agent.worker_threads                  | Skywalking worker threads, 0 will auto set as the cpu core size.                                                                      | 0                         |
+| skywalking_agent.enable_tls                      | Wether to enable tls for gPRC, default is false. Only available when `reporter_type` is `grpc`.                                       | Off                       |
+| skywalking_agent.ssl_trusted_ca_path             | The gRPC SSL trusted ca file. Only available when `reporter_type` is `grpc`.                                                          |                           |
+| skywalking_agent.ssl_key_path                    | The private key file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist. Only available when `reporter_type` is `grpc`. |                           |
+| skywalking_agent.ssl_cert_chain_path             | The certificate file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist. Only available when `reporter_type` is `grpc`. |                           |
+| skywalking_agent.heartbeat_period                | Agent heartbeat report period. Unit, second.                                                                                          | 30                        |
+| skywalking_agent.properties_report_period_factor | The agent sends the instance properties to the backend every heartbeat_period * properties_report_period_factor seconds.              | 10                        |
+| skywalking_agent.enable_zend_observer            | Whether to use `zend observer` instead of `zend_execute_ex` to hook the functions, this feature is only available for PHP8+.          | Off                       |
+| skywalking_agent.reporter_type                   | Reporter type, optional values are `grpc` and `kafka`.                                                                                | grpc                      |
+| skywalking_agent.kafka_bootstrap_servers         | A list of host/port pairs to use for connect to the Kafka cluster. Only available when `reporter_type` is `kafka`.                    |                           |
+| skywalking_agent.kafka_producer_config           | Configure Kafka Producer configuration in JSON format `{"key": "value}`. Only available when `reporter_type` is `kafka`.              | {}                        |
diff --git a/docs/en/reporter/kafka-reporter.md b/docs/en/reporter/kafka-reporter.md
new file mode 100644
index 0000000..10338c3
--- /dev/null
+++ b/docs/en/reporter/kafka-reporter.md
@@ -0,0 +1,56 @@
+# Kafka reporter
+
+By default, the configuration option `skywalking_agent.reporter_type` is `grpc`, means that the skywalking agent will report the traces, metrics, logs etc. to SkyWalking OAP Server by gPRC protocol.
+
+At the same time, SkyWalking also supports kafka-fetcher, so you can report traces, metrics, logs, etc. by kafka.
+
+But the skywalking agent does not compile the `kafka-reporter` feature by default, you need to enable the it.
+
+## Steps
+
+1. Compile the skywalking agent with feature `kafka-reporter`.
+
+   For pecl:
+
+   ```shell
+   pecl install skywalking_agent
+   ```
+
+   Enable the kafka reporter interactively:
+
+   ```txt
+   68 source files, building
+   running: phpize
+   Configuring for:
+   PHP Api Version:         20220829
+   Zend Module Api No:      20220829
+   Zend Extension Api No:   420220829
+   enable cargo debug? [no] : 
+   enable kafka reporter? [no] : yes
+   ```
+
+   Or, build from sources:
+
+   ```shell
+   phpize
+   ./configure --enable-kafka-reporter
+   make
+   make install
+   ```
+
+2. Config `php.ini`.
+
+   Switch to use kafka reporter.
+
+   ```ini
+   [skywalking_agent]
+   extension = skywalking_agent.so
+   skywalking_agent.reporter_type = kafka
+   skywalking_agent.kafka_bootstrap_servers = 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092
+   ```
+
+   If you want to custom the kafka reporter properties, you can specify it by JSON format:
+
+   ```ini
+   skywalking_agent.kafka_producer_config = {"delivery.timeout.ms": "12000"}
+   ```
diff --git a/docs/menu.yml b/docs/menu.yml
index 7e9c135..723db61 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -28,6 +28,10 @@
         path: "/en/configuration/ini-settings"
       - name: "Zend Observer"
         path: "/en/configuration/zend-observer"
+  - name: "Reporter"
+    catalog:
+      - name: "Kafka Reporter"
+        path: "/en/reporter/kafka-reporter"
   - name: "Contribution"
     catalog:
       - name: "Compiling Guidance"
diff --git a/package.tpl.xml b/package.tpl.xml
index 9a340cc..6c4bc2a 100644
--- a/package.tpl.xml
+++ b/package.tpl.xml
@@ -72,5 +72,6 @@
 	<providesextension>skywalking_agent</providesextension>
 	<extsrcrelease>
 		<configureoption default="no" name="enable-cargo-debug" prompt="enable cargo debug?" />
+		<configureoption default="no" name="enable-kafka-reporter" prompt="enable kafka reporter?" />
 	</extsrcrelease>
 </package>
diff --git a/src/lib.rs b/src/lib.rs
index 5a16850..25a56a0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -24,6 +24,7 @@
 mod execute;
 mod module;
 mod plugin;
+mod reporter;
 mod request;
 mod tag;
 mod util;
@@ -88,6 +89,17 @@
 /// PHP8's jit.
 const SKYWALKING_AGENT_ENABLE_ZEND_OBSERVER: &str = "skywalking_agent.enable_zend_observer";
 
+/// Reporter type, optional values are `grpc` and `kafka`, default is `grpc`.
+const SKYWALKING_AGENT_REPORTER_TYPE: &str = "skywalking_agent.reporter_type";
+
+/// A list of host/port pairs to use for establishing the initial connection to
+/// the Kafka cluster. Only available when the reporter type is `kafka`.
+const SKYWALKING_AGENT_KAFKA_BOOTSTRAP_SERVERS: &str = "skywalking_agent.kafka_bootstrap_servers";
+
+/// Configure Kafka Producer configuration in JSON format.
+/// Only available when the reporter type is `kafka`.
+const SKYWALKING_AGENT_KAFKA_PRODUCER_CONFIG: &str = "skywalking_agent.kafka_producer_config";
+
 #[php_get_module]
 pub fn get_module() -> Module {
     let mut module = Module::new(
@@ -153,6 +165,21 @@
         Policy::System,
     );
     module.add_ini(SKYWALKING_AGENT_ENABLE_ZEND_OBSERVER, false, Policy::System);
+    module.add_ini(
+        SKYWALKING_AGENT_REPORTER_TYPE,
+        "grpc".to_string(),
+        Policy::System,
+    );
+    module.add_ini(
+        SKYWALKING_AGENT_KAFKA_BOOTSTRAP_SERVERS,
+        "".to_string(),
+        Policy::System,
+    );
+    module.add_ini(
+        SKYWALKING_AGENT_KAFKA_PRODUCER_CONFIG,
+        "{}".to_string(),
+        Policy::System,
+    );
 
     // Hooks.
     module.on_module_init(module::init);
diff --git a/src/module.rs b/src/module.rs
index 662044e..37b82b5 100644
--- a/src/module.rs
+++ b/src/module.rs
@@ -16,7 +16,7 @@
 use crate::{
     channel::Reporter,
     execute::{register_execute_functions, register_observer_handlers},
-    util::{get_sapi_module_name, IPS},
+    util::{get_sapi_module_name, get_str_ini_with_default, IPS},
     worker::init_worker,
     *,
 };
@@ -28,7 +28,6 @@
     trace::tracer::{self, Tracer},
 };
 use std::{
-    borrow::ToOwned,
     ffi::{CStr, OsStr},
     fs::{self, OpenOptions},
     os::unix::prelude::OsStrExt,
@@ -39,13 +38,30 @@
 use tracing::{debug, error, info, metadata::LevelFilter};
 use tracing_subscriber::FmtSubscriber;
 
-pub static SERVICE_NAME: Lazy<String> = Lazy::new(|| {
-    ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SERVICE_NAME)
-        .and_then(|s| s.to_str().ok())
-        .map(ToOwned::to_owned)
-        .unwrap_or_default()
+static IS_ENABLE: Lazy<bool> = Lazy::new(|| {
+    if !ini_get::<bool>(SKYWALKING_AGENT_ENABLE) {
+        return false;
+    }
+
+    let sapi = get_sapi_module_name().to_bytes();
+
+    if sapi == b"fpm-fcgi" {
+        return true;
+    }
+
+    if sapi == b"cli" && get_module_registry().exists("swoole") {
+        return true;
+    }
+
+    false
 });
 
+pub static SERVER_ADDR: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SERVER_ADDR));
+
+pub static SERVICE_NAME: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SERVICE_NAME));
+
 pub static SERVICE_INSTANCE: Lazy<String> =
     Lazy::new(|| RandomGenerator::generate() + "@" + &IPS[0]);
 
@@ -76,35 +92,19 @@
     dir
 });
 
-pub static AUTHENTICATION: Lazy<String> = Lazy::new(|| {
-    ini_get::<Option<&CStr>>(SKYWALKING_AGENT_AUTHENTICATION)
-        .and_then(|s| s.to_str().ok())
-        .map(ToOwned::to_owned)
-        .unwrap_or_default()
-});
+pub static AUTHENTICATION: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_AUTHENTICATION));
 
 pub static ENABLE_TLS: Lazy<bool> = Lazy::new(|| ini_get::<bool>(SKYWALKING_AGENT_ENABLE_TLS));
 
-pub static SSL_TRUSTED_CA_PATH: Lazy<String> = Lazy::new(|| {
-    ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SSL_TRUSTED_CA_PATH)
-        .and_then(|s| s.to_str().ok())
-        .map(ToOwned::to_owned)
-        .unwrap_or_default()
-});
+pub static SSL_TRUSTED_CA_PATH: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SSL_TRUSTED_CA_PATH));
 
-pub static SSL_KEY_PATH: Lazy<String> = Lazy::new(|| {
-    ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SSL_KEY_PATH)
-        .and_then(|s| s.to_str().ok())
-        .map(ToOwned::to_owned)
-        .unwrap_or_default()
-});
+pub static SSL_KEY_PATH: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SSL_KEY_PATH));
 
-pub static SSL_CERT_CHAIN_PATH: Lazy<String> = Lazy::new(|| {
-    ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SSL_CERT_CHAIN_PATH)
-        .and_then(|s| s.to_str().ok())
-        .map(ToOwned::to_owned)
-        .unwrap_or_default()
-});
+pub static SSL_CERT_CHAIN_PATH: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SSL_CERT_CHAIN_PATH));
 
 pub static HEARTBEAT_PERIOD: Lazy<i64> =
     Lazy::new(|| ini_get::<i64>(SKYWALKING_AGENT_HEARTBEAT_PERIOD));
@@ -117,6 +117,18 @@
     sys::PHP_MAJOR_VERSION >= 8 && ini_get::<bool>(SKYWALKING_AGENT_ENABLE_ZEND_OBSERVER)
 });
 
+pub static WORKER_THREADS: Lazy<i64> =
+    Lazy::new(|| ini_get::<i64>(SKYWALKING_AGENT_WORKER_THREADS));
+
+pub static REPORTER_TYPE: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_REPORTER_TYPE));
+
+pub static KAFKA_BOOTSTRAP_SERVERS: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_KAFKA_BOOTSTRAP_SERVERS));
+
+pub static KAFKA_PRODUCER_CONFIG: Lazy<String> =
+    Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_KAFKA_PRODUCER_CONFIG));
+
 /// For PHP 8.2+, zend observer api are now also called for internal functions.
 ///
 /// Refer to this commit: <https://github.com/php/php-src/commit/625f1649639c2b9a9d76e4d42f88c264ddb8447d>
@@ -129,29 +141,43 @@
         return;
     }
 
+    // Initialize configuration properties.
+    Lazy::force(&SERVER_ADDR);
+    Lazy::force(&SERVICE_NAME);
+    Lazy::force(&SERVICE_INSTANCE);
+    Lazy::force(&SKYWALKING_VERSION);
+    Lazy::force(&RUNTIME_DIR);
+    Lazy::force(&SOCKET_FILE_PATH);
+    Lazy::force(&AUTHENTICATION);
+    Lazy::force(&ENABLE_TLS);
+    Lazy::force(&SSL_TRUSTED_CA_PATH);
+    Lazy::force(&SSL_KEY_PATH);
+    Lazy::force(&SSL_CERT_CHAIN_PATH);
+    Lazy::force(&HEARTBEAT_PERIOD);
+    Lazy::force(&PROPERTIES_REPORT_PERIOD_FACTOR);
+    Lazy::force(&ENABLE_ZEND_OBSERVER);
+    Lazy::force(&WORKER_THREADS);
+    Lazy::force(&REPORTER_TYPE);
+    Lazy::force(&KAFKA_BOOTSTRAP_SERVERS);
+    Lazy::force(&KAFKA_PRODUCER_CONFIG);
+
     if let Err(err) = try_init_logger() {
         eprintln!("skywalking_agent: initialize logger failed: {}", err);
     }
 
     // Skywalking agent info.
-    let service_name = Lazy::force(&SERVICE_NAME);
-    let service_instance = Lazy::force(&SERVICE_INSTANCE);
-    let skywalking_version = Lazy::force(&SKYWALKING_VERSION);
-    let authentication = Lazy::force(&AUTHENTICATION);
-    let heartbeat_period = Lazy::force(&HEARTBEAT_PERIOD);
-    let properties_report_period_factor = Lazy::force(&PROPERTIES_REPORT_PERIOD_FACTOR);
     info!(
-        service_name,
-        service_instance,
-        skywalking_version,
-        authentication,
-        heartbeat_period,
-        properties_report_period_factor,
+        service_name = &*SERVICE_NAME,
+        service_instance = &*SERVICE_INSTANCE,
+        skywalking_version = &*SKYWALKING_VERSION,
+        heartbeat_period = &*HEARTBEAT_PERIOD,
+        properties_report_period_factor = &*PROPERTIES_REPORT_PERIOD_FACTOR,
         "Starting skywalking agent"
     );
 
     // Skywalking version check.
-    if *skywalking_version < 8 {
+    let skywalking_version = *SKYWALKING_VERSION;
+    if skywalking_version < 8 {
         error!(
             skywalking_version,
             "The skywalking agent only supports versions after skywalking 8"
@@ -159,16 +185,6 @@
         return;
     }
 
-    // Initialize TLS if enabled.
-    let enable_tls = Lazy::force(&ENABLE_TLS);
-    let ssl_trusted_ca_path = Lazy::force(&SSL_TRUSTED_CA_PATH);
-    let ssl_key_path = Lazy::force(&SSL_KEY_PATH);
-    let ssl_cert_chain_path = Lazy::force(&SSL_CERT_CHAIN_PATH);
-    debug!(
-        enable_tls,
-        ssl_trusted_ca_path, ssl_key_path, ssl_cert_chain_path, "Skywalking TLS info"
-    );
-
     // Initialize runtime directory.
     if RUNTIME_DIR.as_os_str().is_empty() {
         error!("The skywalking agent runtime directory must not be empty");
@@ -180,12 +196,11 @@
     }
 
     // Initialize Agent worker.
-    Lazy::force(&SOCKET_FILE_PATH);
     init_worker();
 
     tracer::set_global_tracer(Tracer::new(
-        service_name,
-        service_instance,
+        &*SERVICE_NAME,
+        &*SERVICE_INSTANCE,
         Reporter::new(&*SOCKET_FILE_PATH),
     ));
 
@@ -243,27 +258,12 @@
     Ok(())
 }
 
+#[inline]
 fn get_module_registry() -> &'static ZArr {
     unsafe { ZArr::from_ptr(&sys::module_registry) }
 }
 
+#[inline]
 pub fn is_enable() -> bool {
-    static IS_ENABLE: Lazy<bool> = Lazy::new(|| {
-        if !ini_get::<bool>(SKYWALKING_AGENT_ENABLE) {
-            return false;
-        }
-
-        let sapi = get_sapi_module_name().to_bytes();
-
-        if sapi == b"fpm-fcgi" {
-            return true;
-        }
-
-        if sapi == b"cli" && get_module_registry().exists("swoole") {
-            return true;
-        }
-
-        false
-    });
     *IS_ENABLE
 }
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
new file mode 100644
index 0000000..7e50175
--- /dev/null
+++ b/src/reporter/mod.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod reporter_grpc;
+mod reporter_kafka;
+
+use crate::module::REPORTER_TYPE;
+use anyhow::bail;
+use skywalking::reporter::{CollectItemConsume, CollectItemProduce};
+
+pub async fn run_reporter(
+    producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+) -> anyhow::Result<()> {
+    match REPORTER_TYPE.as_str() {
+        "grpc" => reporter_grpc::run_reporter(producer, consumer).await,
+        #[cfg(feature = "kafka-reporter")]
+        "kafka" => reporter_kafka::run_reporter(producer, consumer).await,
+        typ => bail!("unknown reporter type, {}", typ),
+    }
+}
diff --git a/src/reporter/reporter_grpc.rs b/src/reporter/reporter_grpc.rs
new file mode 100644
index 0000000..7948f57
--- /dev/null
+++ b/src/reporter/reporter_grpc.rs
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::module::{
+    AUTHENTICATION, ENABLE_TLS, SERVER_ADDR, SSL_CERT_CHAIN_PATH, SSL_KEY_PATH, SSL_TRUSTED_CA_PATH,
+};
+use anyhow::anyhow;
+use skywalking::reporter::{grpc::GrpcReporter, CollectItemConsume, CollectItemProduce};
+use std::time::Duration;
+use tokio::time::sleep;
+use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity};
+use tracing::{debug, info, warn};
+
+pub async fn run_reporter(
+    producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+) -> anyhow::Result<()> {
+    let endpoint = create_endpoint(&SERVER_ADDR).await?;
+    let channel = connect(endpoint).await;
+
+    let mut reporter = GrpcReporter::new_with_pc(channel, producer, consumer);
+
+    if !AUTHENTICATION.is_empty() {
+        reporter = reporter.with_authentication(&*AUTHENTICATION);
+    }
+
+    info!("Worker is ready...");
+
+    let handle = reporter
+        .reporting()
+        .await
+        .with_status_handle(|message, status| {
+            warn!(?status, "Collect failed: {}", message);
+        })
+        .spawn();
+
+    handle
+        .await
+        .map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;
+
+    Ok(())
+}
+
+async fn create_endpoint(server_addr: &str) -> anyhow::Result<Endpoint> {
+    let scheme = if *ENABLE_TLS { "https" } else { "http" };
+
+    let url = format!("{}://{}", scheme, server_addr);
+    debug!(url, "Create Endpoint");
+    let mut endpoint = Endpoint::from_shared(url)?;
+
+    debug!(
+        enable_tls = *ENABLE_TLS,
+        ssl_trusted_ca_path = &*SSL_TRUSTED_CA_PATH,
+        ssl_key_path = &*SSL_KEY_PATH,
+        ssl_cert_chain_path = &*SSL_CERT_CHAIN_PATH,
+        "Skywalking TLS info"
+    );
+
+    if *ENABLE_TLS {
+        let domain_name = server_addr.split(':').next().unwrap_or_default();
+        debug!(domain_name, "Configure TLS domain");
+        let mut tls = ClientTlsConfig::new().domain_name(domain_name);
+
+        let ssl_trusted_ca_path = SSL_TRUSTED_CA_PATH.as_str();
+        if !ssl_trusted_ca_path.is_empty() {
+            debug!(ssl_trusted_ca_path, "Configure TLS CA");
+            let ca_cert = tokio::fs::read(&*SSL_TRUSTED_CA_PATH).await?;
+            let ca_cert = Certificate::from_pem(ca_cert);
+            tls = tls.ca_certificate(ca_cert);
+        }
+
+        let ssl_key_path = SSL_KEY_PATH.as_str();
+        let ssl_cert_chain_path = SSL_CERT_CHAIN_PATH.as_str();
+        if !ssl_key_path.is_empty() && !ssl_cert_chain_path.is_empty() {
+            debug!(ssl_trusted_ca_path, "Configure mTLS");
+            let client_cert = tokio::fs::read(&*SSL_CERT_CHAIN_PATH).await?;
+            let client_key = tokio::fs::read(&*SSL_KEY_PATH).await?;
+            let client_identity = Identity::from_pem(client_cert, client_key);
+            tls = tls.identity(client_identity);
+        }
+
+        endpoint = endpoint.tls_config(tls)?;
+    }
+
+    Ok(endpoint)
+}
+
+#[tracing::instrument(skip_all)]
+async fn connect(endpoint: Endpoint) -> Channel {
+    let channel = loop {
+        match endpoint.connect().await {
+            Ok(channel) => break channel,
+            Err(err) => {
+                warn!(?err, "Connect to skywalking server failed, retry after 10s");
+                sleep(Duration::from_secs(10)).await;
+            }
+        }
+    };
+
+    let uri = &*endpoint.uri().to_string();
+    info!(uri, "Skywalking server connected");
+
+    channel
+}
diff --git a/src/reporter/reporter_kafka.rs b/src/reporter/reporter_kafka.rs
new file mode 100644
index 0000000..61e4cf6
--- /dev/null
+++ b/src/reporter/reporter_kafka.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#![cfg(feature = "kafka-reporter")]
+
+use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG};
+use anyhow::{bail, Context};
+use skywalking::reporter::{
+    kafka::{KafkaReportBuilder, RDKafkaClientConfig},
+    CollectItemConsume, CollectItemProduce,
+};
+use std::collections::HashMap;
+
+pub async fn run_reporter(
+    producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+) -> anyhow::Result<()> {
+    let mut client_config = RDKafkaClientConfig::new();
+
+    client_config.set("bootstrap.servers", &*KAFKA_BOOTSTRAP_SERVERS);
+
+    let config = serde_json::from_str::<HashMap<String, String>>(&KAFKA_PRODUCER_CONFIG)
+        .context("parse kafka producer config failed")?;
+    for (key, value) in config {
+        client_config.set(key, value);
+    }
+
+    let (_, reporting) = KafkaReportBuilder::new_with_pc(client_config, producer, consumer)
+        .build()
+        .await?;
+    let handle = reporting.spawn();
+    if let Err(err) = handle.await {
+        bail!("wait handle failed: {:?}", err);
+    }
+
+    Ok(())
+}
diff --git a/src/util.rs b/src/util.rs
index 7dd575c..e9fb1ca 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -15,7 +15,7 @@
 
 use anyhow::anyhow;
 use once_cell::sync::Lazy;
-use phper::{sys, values::ZVal};
+use phper::{ini::ini_get, sys, values::ZVal};
 use std::{
     ffi::CStr,
     os::unix::prelude::OsStrExt,
@@ -96,3 +96,10 @@
         libc::chmod(path.as_ptr().cast(), mode);
     }
 }
+
+pub fn get_str_ini_with_default(name: &str) -> String {
+    ini_get::<Option<&CStr>>(name)
+        .and_then(|s| s.to_str().ok())
+        .map(ToOwned::to_owned)
+        .unwrap_or_default()
+}
diff --git a/src/worker.rs b/src/worker.rs
index 282e7d0..80c74a7 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -16,23 +16,22 @@
 use crate::{
     channel::{self, TxReporter},
     module::{
-        AUTHENTICATION, ENABLE_TLS, HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR,
-        SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH, SSL_CERT_CHAIN_PATH, SSL_KEY_PATH,
-        SSL_TRUSTED_CA_PATH,
+        HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR, SERVICE_INSTANCE, SERVICE_NAME,
+        SOCKET_FILE_PATH, WORKER_THREADS,
     },
+    reporter::run_reporter,
     util::change_permission,
-    SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS,
 };
-use anyhow::anyhow;
+
 use once_cell::sync::Lazy;
-use phper::ini::ini_get;
+
 use skywalking::{
     management::{instance::Properties, manager::Manager},
-    reporter::{grpc::GrpcReporter, CollectItem, CollectItemConsume},
+    reporter::{CollectItem, CollectItemConsume},
 };
 use std::{
-    cmp::Ordering, error::Error, ffi::CStr, fs, io, marker::PhantomData, num::NonZeroUsize,
-    process::exit, thread::available_parallelism, time::Duration,
+    cmp::Ordering, error::Error, fs, io, marker::PhantomData, num::NonZeroUsize, process::exit,
+    thread::available_parallelism, time::Duration,
 };
 use tokio::{
     net::UnixListener,
@@ -40,19 +39,11 @@
     select,
     signal::unix::{signal, SignalKind},
     sync::mpsc::{self, error::TrySendError},
-    time::sleep,
 };
-use tonic::{
-    async_trait,
-    transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity},
-};
+use tonic::async_trait;
 use tracing::{debug, error, info, warn};
 
 pub fn init_worker() {
-    let server_addr = ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SERVER_ADDR)
-        .and_then(|s| s.to_str().ok())
-        .unwrap_or_default()
-        .to_owned();
     let worker_threads = worker_threads();
 
     unsafe {
@@ -72,7 +63,7 @@
 
                 // Run the worker in subprocess.
                 let rt = new_tokio_runtime(worker_threads);
-                match rt.block_on(start_worker(server_addr)) {
+                match rt.block_on(start_worker()) {
                     Ok(_) => {
                         exit(0);
                     }
@@ -88,7 +79,7 @@
 }
 
 fn worker_threads() -> usize {
-    let worker_threads = ini_get::<i64>(SKYWALKING_AGENT_WORKER_THREADS);
+    let worker_threads = *WORKER_THREADS;
     if worker_threads <= 0 {
         available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
     } else {
@@ -105,7 +96,7 @@
         .unwrap()
 }
 
-async fn start_worker(server_addr: String) -> anyhow::Result<()> {
+async fn start_worker() -> anyhow::Result<()> {
     debug!("Starting worker...");
 
     // Ensure to cleanup resources when worker exits.
@@ -167,30 +158,10 @@
             }
         });
 
-        let endpoint = create_endpoint(&server_addr).await?;
-        let channel = connect(endpoint).await;
-
         report_properties_and_keep_alive(TxReporter(tx_));
 
-        let mut reporter = GrpcReporter::new_with_pc(channel, (), Consumer(rx));
-
-        if !AUTHENTICATION.is_empty() {
-            reporter = reporter.with_authentication(&*AUTHENTICATION);
-        }
-
-        info!("Worker is ready...");
-
-        let handle = reporter
-            .reporting()
-            .await
-            .with_status_handle(|message, status| {
-                warn!(?status, "Collect failed: {}", message);
-            })
-            .spawn();
-
-        handle
-            .await
-            .map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;
+        // Run reporter with blocking.
+        run_reporter((), Consumer(rx)).await?;
 
         Ok::<_, anyhow::Error>(())
     };
@@ -209,60 +180,6 @@
     Ok(())
 }
 
-async fn create_endpoint(server_addr: &str) -> anyhow::Result<Endpoint> {
-    let scheme = if *ENABLE_TLS { "https" } else { "http" };
-
-    let url = format!("{}://{}", scheme, server_addr);
-    debug!(url, "Create Endpoint");
-    let mut endpoint = Endpoint::from_shared(url)?;
-
-    if *ENABLE_TLS {
-        let domain_name = server_addr.split(':').next().unwrap_or_default();
-        debug!(domain_name, "Configure TLS domain");
-        let mut tls = ClientTlsConfig::new().domain_name(domain_name);
-
-        let ssl_trusted_ca_path = SSL_TRUSTED_CA_PATH.as_str();
-        if !ssl_trusted_ca_path.is_empty() {
-            debug!(ssl_trusted_ca_path, "Configure TLS CA");
-            let ca_cert = tokio::fs::read(&*SSL_TRUSTED_CA_PATH).await?;
-            let ca_cert = Certificate::from_pem(ca_cert);
-            tls = tls.ca_certificate(ca_cert);
-        }
-
-        let ssl_key_path = SSL_KEY_PATH.as_str();
-        let ssl_cert_chain_path = SSL_CERT_CHAIN_PATH.as_str();
-        if !ssl_key_path.is_empty() && !ssl_cert_chain_path.is_empty() {
-            debug!(ssl_trusted_ca_path, "Configure mTLS");
-            let client_cert = tokio::fs::read(&*SSL_CERT_CHAIN_PATH).await?;
-            let client_key = tokio::fs::read(&*SSL_KEY_PATH).await?;
-            let client_identity = Identity::from_pem(client_cert, client_key);
-            tls = tls.identity(client_identity);
-        }
-
-        endpoint = endpoint.tls_config(tls)?;
-    }
-
-    Ok(endpoint)
-}
-
-#[tracing::instrument(skip_all)]
-async fn connect(endpoint: Endpoint) -> Channel {
-    let channel = loop {
-        match endpoint.connect().await {
-            Ok(channel) => break channel,
-            Err(err) => {
-                warn!(?err, "Connect to skywalking server failed, retry after 10s");
-                sleep(Duration::from_secs(10)).await;
-            }
-        }
-    };
-
-    let uri = &*endpoint.uri().to_string();
-    info!(uri, "Skywalking server connected");
-
-    channel
-}
-
 struct Consumer(mpsc::Receiver<CollectItem>);
 
 #[async_trait]