Add kafka reporter. (#88)
diff --git a/.github/workflows/pecl.yml b/.github/workflows/pecl.yml
index 5535c87..4384173 100644
--- a/.github/workflows/pecl.yml
+++ b/.github/workflows/pecl.yml
@@ -52,6 +52,9 @@
version:
- php: "8.2"
swoole: "5.0.0"
+ option:
+ - 'enable-cargo-debug=\"no\" enable-kafka-reporter=\"no\"'
+ - 'enable-cargo-debug=\"no\" enable-kafka-reporter=\"yes\"'
runs-on: ${{ matrix.os }}
steps:
@@ -77,14 +80,14 @@
- name: Install Rust Stable Globally
run: |
- curl https://sh.rustup.rs -sSf | sudo -E sh -s -- -y --no-modify-path
- sudo ln -sf $CARGO_HOME/bin/rustup /usr/local/bin/rustup
- sudo ln -sf $CARGO_HOME/bin/rustc /usr/local/bin/rustc
- sudo ln -sf $CARGO_HOME/bin/cargo /usr/local/bin/cargo
+ curl https://sh.rustup.rs -sSf | sudo -E sh -s -- -y --default-toolchain none
- name: PECL install
run: |
- sudo -E cargo run -p scripts --release -- create-package-xml --version 0.0.0 --notes "Just for TEST."
- cat package.xml
- printf "\n" | sudo -E pecl install package.xml
+ sudo bash -c "\
+ source /tmp/cargo/env && \
+ cargo version && \
+ cargo run -p scripts --release -- create-package-xml --version 0.0.0 --notes 'Just for TEST.' && \
+ cat package.xml && \
+ pecl install -D '${{ matrix.option }}' package.xml"
php -d "extension=skywalking_agent" --ri skywalking_agent
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 73bd75c..4ca62e4 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -52,35 +52,48 @@
os:
- ubuntu-20.04
- macos-12
- version:
+ flag:
# Many composer dependencies need PHP 7.2+
- - php: "7.2"
- swoole: "4.6.7"
+ - php_version: "7.2"
+ swoole_version: "4.6.7"
enable_zend_observer: "Off"
- - php: "7.3"
- swoole: "4.7.1"
+ cargo_features: ""
+ - php_version: "7.3"
+ swoole_version: "4.7.1"
enable_zend_observer: "Off"
- - php: "7.4"
- swoole: "4.8.10"
+ cargo_features: ""
+ - php_version: "7.4"
+ swoole_version: "4.8.10"
enable_zend_observer: "Off"
- - php: "8.0"
- swoole: "5.0.0"
+ cargo_features: ""
+ - php_version: "8.0"
+ swoole_version: "5.0.0"
enable_zend_observer: "Off"
- - php: "8.0"
- swoole: "5.0.0"
+ cargo_features: ""
+ - php_version: "8.0"
+ swoole_version: "5.0.0"
enable_zend_observer: "On"
- - php: "8.1"
- swoole: "5.0.0"
+ cargo_features: ""
+ - php_version: "8.1"
+ swoole_version: "5.0.0"
enable_zend_observer: "Off"
- - php: "8.1"
- swoole: "5.0.0"
+ cargo_features: ""
+ - php_version: "8.1"
+ swoole_version: "5.0.0"
enable_zend_observer: "On"
- - php: "8.2"
- swoole: "5.0.0"
+ cargo_features: ""
+ - php_version: "8.2"
+ swoole_version: "5.0.0"
enable_zend_observer: "Off"
- - php: "8.2"
- swoole: "5.0.0"
+ cargo_features: ""
+ - php_version: "8.2"
+ swoole_version: "5.0.0"
enable_zend_observer: "On"
+ cargo_features: ""
+ - php_version: "8.2"
+ swoole_version: "5.0.0"
+ enable_zend_observer: "On"
+ cargo_features: "--features kafka-reporter"
runs-on: ${{ matrix.os }}
steps:
@@ -100,19 +113,19 @@
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
- php-version: ${{ matrix.version.php }}
+ php-version: ${{ matrix.flag.php_version }}
tools: php-config, composer:v2
extensions: >
bcmath, calendar, ctype, dom, exif, gettext, iconv, intl, json, mbstring,
mysqli, mysqlnd, opcache, pdo, pdo_mysql, phar, posix, readline, redis,
- memcached, swoole-${{ matrix.version.swoole }}, xml, xmlreader, xmlwriter,
+ memcached, swoole-${{ matrix.flag.swoole_version }}, xml, xmlreader, xmlwriter,
yaml, zip, mongodb
- name: Setup php-fpm for Linux
if: matrix.os == 'ubuntu-20.04'
run: |
- sudo apt-get install -y php${{ matrix.version.php }}-fpm
- sudo ln -sf /usr/sbin/php-fpm${{ matrix.version.php }} /usr/sbin/php-fpm
+ sudo apt-get install -y php${{ matrix.flag.php_version }}-fpm
+ sudo ln -sf /usr/sbin/php-fpm${{ matrix.flag.php_version }} /usr/sbin/php-fpm
- name: PHP version
run: |
@@ -122,9 +135,9 @@
php -r 'echo "Swoole version: " . phpversion("swoole") . "\n";'
composer --version
- [[ `php --version` == PHP\ ${{ matrix.version.php }}.* ]] || exit 1;
- [[ `php-fpm --version` == PHP\ ${{ matrix.version.php }}.* ]] || exit 1;
- [[ `php-config --version` == ${{ matrix.version.php }}.* ]] || exit 1;
+ [[ `php --version` == PHP\ ${{ matrix.flag.php_version }}.* ]] || exit 1;
+ [[ `php-fpm --version` == PHP\ ${{ matrix.flag.php_version }}.* ]] || exit 1;
+ [[ `php-config --version` == ${{ matrix.flag.php_version }}.* ]] || exit 1;
- name: Install Rust
uses: actions-rs/toolchain@v1
@@ -142,21 +155,21 @@
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
- key: ${{ matrix.os }}-test-${{ matrix.version.php }}-${{ hashFiles('**/Cargo.lock') }}
+ key: ${{ matrix.os }}-test-${{ matrix.flag.php_version }}-${{ hashFiles('**/Cargo.lock') }}
- name: Cargo clippy
uses: actions-rs/cargo@v1
with:
toolchain: ${{ env.RUST_STABLE_TOOLCHAIN }}
command: clippy
- args: --release --workspace
+ args: --release --workspace ${{ matrix.flag.cargo_features }}
- name: Cargo build
uses: actions-rs/cargo@v1
with:
toolchain: ${{ env.RUST_STABLE_TOOLCHAIN }}
command: build
- args: --release --workspace
+ args: --release --workspace ${{ matrix.flag.cargo_features }}
- name: Composer install
run: composer install --working-dir=tests/php
@@ -178,9 +191,9 @@
with:
toolchain: ${{ env.RUST_STABLE_TOOLCHAIN }}
command: test
- args: --release --workspace
+ args: --release --workspace ${{ matrix.flag.cargo_features }}
env:
- ENABLE_ZEND_OBSERVER: ${{ matrix.version.enable_zend_observer }}
+ ENABLE_ZEND_OBSERVER: ${{ matrix.flag.enable_zend_observer }}
continue-on-error: true
# Rebuild the mixture when cargo test failed.
@@ -207,7 +220,7 @@
command: test
args: --release --workspace
env:
- ENABLE_ZEND_OBSERVER: ${{ matrix.version.enable_zend_observer }}
+ ENABLE_ZEND_OBSERVER: ${{ matrix.flag.enable_zend_observer }}
- name: View logs
if: always()
diff --git a/Cargo.lock b/Cargo.lock
index b92eaad..c123d6f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1207,6 +1207,27 @@
]
[[package]]
+name = "num_enum"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
+dependencies = [
+ "num_enum_derive",
+]
+
+[[package]]
+name = "num_enum_derive"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
+dependencies = [
+ "proc-macro-crate",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
name = "object"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1534,6 +1555,16 @@
]
[[package]]
+name = "proc-macro-crate"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919"
+dependencies = [
+ "once_cell",
+ "toml_edit",
+]
+
+[[package]]
name = "proc-macro2"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1642,6 +1673,36 @@
]
[[package]]
+name = "rdkafka"
+version = "0.32.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8733bc5dc0b192d1a4b28073f9bff1326ad9e4fecd4d9b025d6fc358d1c3e79"
+dependencies = [
+ "futures-channel",
+ "futures-util",
+ "libc",
+ "log",
+ "rdkafka-sys",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "slab",
+ "tokio",
+]
+
+[[package]]
+name = "rdkafka-sys"
+version = "4.5.0+1.9.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1bb0676c2112342ac7165decdedbc4e7086c0af384479ccce534546b10687a5d"
+dependencies = [
+ "libc",
+ "libz-sys",
+ "num_enum",
+ "pkg-config",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2000,6 +2061,7 @@
"portable-atomic 0.3.20",
"prost",
"prost-derive",
+ "rdkafka",
"serde",
"systemstat",
"thiserror",
@@ -2322,6 +2384,23 @@
]
[[package]]
+name = "toml_datetime"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
+
+[[package]]
+name = "toml_edit"
+version = "0.19.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
+dependencies = [
+ "indexmap 2.0.0",
+ "toml_datetime",
+ "winnow",
+]
+
+[[package]]
name = "tonic"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2911,6 +2990,15 @@
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
+name = "winnow"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f46aab759304e4d7b2075a9aecba26228bb073ee8c50db796b2c72c676b5d807"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 0345bd0..771a788 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,6 +35,9 @@
name = "skywalking_agent"
crate-type = ["lib", "cdylib"]
+[features]
+kafka-reporter = ["skywalking/kafka-reporter"]
+
[dependencies]
anyhow = { version = "1.0.72", features = ["backtrace"] }
bincode = "1.3.3"
diff --git a/config.m4 b/config.m4
index b4f9560..edd4c0e 100644
--- a/config.m4
+++ b/config.m4
@@ -22,6 +22,9 @@
PHP_ARG_ENABLE([cargo_debug], [whether to enable cargo debug mode],
[ --enable-cargo-debug Enable cargo debug], no, no)
+PHP_ARG_ENABLE([kafka_reporter], [whether to enable kafka reporter],
+[ --enable-kafka-reporter Enable kafka reporter], no, no)
+
if test "$PHP_THREAD_SAFETY" == "yes"; then
AC_MSG_ERROR([skywalking_agent does not support ZTS])
fi
@@ -43,19 +46,24 @@
CARGO_MODE_FLAGS="--release"
CARGO_MODE_DIR="release"
+ CARGO_FEATURES_FLAGS=""
if test "$PHP_CARGO_DEBUG" != "no"; then
CARGO_MODE_FLAGS=""
CARGO_MODE_DIR="debug"
fi
+ if test "$PHP_KAFKA_REPORTER" != "no"; then
+ CARGO_FEATURES_FLAGS="--features kafka-reporter"
+ fi
+
cat >>Makefile.objects<< EOF
all: cargo_build
clean: cargo_clean
cargo_build:
- PHP_CONFIG=$PHP_PHP_CONFIG cargo build $CARGO_MODE_FLAGS
+ PHP_CONFIG=$PHP_PHP_CONFIG cargo build $CARGO_MODE_FLAGS $CARGO_FEATURES_FLAGS
if [[ -f ./target/$CARGO_MODE_DIR/libskywalking_agent.dylib ]] ; then \\
cp ./target/$CARGO_MODE_DIR/libskywalking_agent.dylib ./modules/skywalking_agent.so ; fi
if [[ -f ./target/$CARGO_MODE_DIR/libskywalking_agent.so ]] ; then \\
@@ -73,6 +81,7 @@
Cargo.toml:Cargo.toml \
build.rs:build.rs \
docker-compose.yml:docker-compose.yml \
+ rust-toolchain.toml:rust-toolchain.toml \
scripts:scripts \
src:src \
tests:tests \
diff --git a/dist-material/LICENSE b/dist-material/LICENSE
index 35c499b..70b7c5a 100644
--- a/dist-material/LICENSE
+++ b/dist-material/LICENSE
@@ -237,6 +237,15 @@
https://crates.io/crates/wasi/0.11.0+wasi-snapshot-preview1 0.11.0+wasi-snapshot-preview1 Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT
========================================================================
+Apache-2.0 OR BSD-3-Clause OR MIT licenses
+========================================================================
+The following components are provided under the Apache-2.0 OR BSD-3-Clause OR MIT License. See project link for details.
+The text of each license is also included in licenses/LICENSE-[project].txt.
+
+ https://crates.io/crates/num_enum/0.5.11 0.5.11 Apache-2.0 OR BSD-3-Clause OR MIT
+ https://crates.io/crates/num_enum_derive/0.5.11 0.5.11 Apache-2.0 OR BSD-3-Clause OR MIT
+
+========================================================================
Apache-2.0 OR BSL-1.0 licenses
========================================================================
The following components are provided under the Apache-2.0 OR BSL-1.0 License. See project link for details.
@@ -374,6 +383,7 @@
https://crates.io/crates/portable-atomic/1.4.2 1.4.2 Apache-2.0 OR MIT
https://crates.io/crates/ppv-lite86/0.2.17 0.2.17 Apache-2.0 OR MIT
https://crates.io/crates/prettyplease/0.1.25 0.1.25 Apache-2.0 OR MIT
+ https://crates.io/crates/proc-macro-crate/1.3.1 1.3.1 Apache-2.0 OR MIT
https://crates.io/crates/proc-macro2/1.0.66 1.0.66 Apache-2.0 OR MIT
https://crates.io/crates/quick-error/1.2.3 1.2.3 Apache-2.0 OR MIT
https://crates.io/crates/quote/1.0.32 1.0.32 Apache-2.0 OR MIT
@@ -416,6 +426,8 @@
https://crates.io/crates/time-core/0.1.1 0.1.1 Apache-2.0 OR MIT
https://crates.io/crates/tokio-io-timeout/1.2.0 1.2.0 Apache-2.0 OR MIT
https://crates.io/crates/tokio-rustls/0.23.4 0.23.4 Apache-2.0 OR MIT
+ https://crates.io/crates/toml_datetime/0.6.3 0.6.3 Apache-2.0 OR MIT
+ https://crates.io/crates/toml_edit/0.19.14 0.19.14 Apache-2.0 OR MIT
https://crates.io/crates/trust-dns-proto/0.22.0 0.22.0 Apache-2.0 OR MIT
https://crates.io/crates/trust-dns-resolver/0.22.0 0.22.0 Apache-2.0 OR MIT
https://crates.io/crates/typenum/1.16.0 1.16.0 Apache-2.0 OR MIT
@@ -529,6 +541,8 @@
https://crates.io/crates/phf_codegen/0.10.0 0.10.0 MIT
https://crates.io/crates/phf_generator/0.10.0 0.10.0 MIT
https://crates.io/crates/phf_shared/0.10.0 0.10.0 MIT
+ https://crates.io/crates/rdkafka/0.32.2 0.32.2 MIT
+ https://crates.io/crates/rdkafka-sys/4.5.0+1.9.2 4.5.0+1.9.2 MIT
https://crates.io/crates/redox_syscall/0.3.5 0.3.5 MIT
https://crates.io/crates/schannel/0.1.22 0.1.22 MIT
https://crates.io/crates/sharded-slab/0.1.4 0.1.4 MIT
@@ -556,6 +570,7 @@
https://crates.io/crates/valuable/0.1.0 0.1.0 MIT
https://crates.io/crates/want/0.3.1 0.3.1 MIT
https://crates.io/crates/which/4.4.0 4.4.0 MIT
+ https://crates.io/crates/winnow/0.5.3 0.5.3 MIT
https://crates.io/crates/winreg/0.10.1 0.10.1 MIT
https://crates.io/crates/winreg/0.50.0 0.50.0 MIT
diff --git a/docs/en/configuration/ini-settings.md b/docs/en/configuration/ini-settings.md
index bc48b4d..b8a24fd 100644
--- a/docs/en/configuration/ini-settings.md
+++ b/docs/en/configuration/ini-settings.md
@@ -2,21 +2,24 @@
This is the configuration list supported in `php.ini`.
-| Configuration Item | Description | Default Value |
-| ------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------ | ------------------------- |
-| skywalking_agent.enable | Enable skywalking_agent extension or not. | Off |
-| skywalking_agent.log_file | Log file path. | /tmp/skywalking-agent.log |
-| skywalking_agent.log_level | Log level: one of `OFF`, `TRACE`, `DEBUG`, `INFO`, `WARN`, `ERROR`. | INFO |
-| skywalking_agent.runtime_dir | Skywalking agent runtime directory. | /tmp/skywalking-agent |
-| skywalking_agent.server_addr | Address of skywalking oap server. | 127.0.0.1:11800 |
-| skywalking_agent.service_name | Application service name. | hello-skywalking |
-| skywalking_agent.skywalking_version | Skywalking version, 8 or 9. | 8 |
-| skywalking_agent.authentication | Skywalking authentication token, let it empty if the backend isn't enabled. | |
-| skywalking_agent.worker_threads | Skywalking worker threads, 0 will auto set as the cpu core size. | 0 |
-| skywalking_agent.enable_tls | Wether to enable tls for gPRC, default is false. | Off |
-| skywalking_agent.ssl_trusted_ca_path | The gRPC SSL trusted ca file. | |
-| skywalking_agent.ssl_key_path | The private key file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist. | |
-| skywalking_agent.ssl_cert_chain_path | The certificate file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist. | |
-| skywalking_agent.heartbeat_period | Agent heartbeat report period. Unit, second. | 30 |
-| skywalking_agent.properties_report_period_factor | The agent sends the instance properties to the backend every heartbeat_period * properties_report_period_factor seconds. | 10 |
-| skywalking_agent.enable_zend_observer | Whether to use `zend observer` instead of `zend_execute_ex` to hook the functions, this feature is only available for PHP8+. | Off |
+| Configuration Item | Description | Default Value |
+| ------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------- | ------------------------- |
+| skywalking_agent.enable | Enable skywalking_agent extension or not. | Off |
+| skywalking_agent.log_file | Log file path. | /tmp/skywalking-agent.log |
+| skywalking_agent.log_level | Log level: one of `OFF`, `TRACE`, `DEBUG`, `INFO`, `WARN`, `ERROR`. | INFO |
+| skywalking_agent.runtime_dir | Skywalking agent runtime directory. | /tmp/skywalking-agent |
+| skywalking_agent.server_addr | Address of skywalking oap server. Only available when `reporter_type` is `grpc`. | 127.0.0.1:11800 |
+| skywalking_agent.service_name | Application service name. | hello-skywalking |
+| skywalking_agent.skywalking_version | Skywalking version, 8 or 9. | 8 |
+| skywalking_agent.authentication | Skywalking authentication token, let it empty if the backend isn't enabled. Only available when `reporter_type` is `grpc`. | |
+| skywalking_agent.worker_threads | Skywalking worker threads, 0 will auto set as the cpu core size. | 0 |
+| skywalking_agent.enable_tls | Wether to enable tls for gPRC, default is false. Only available when `reporter_type` is `grpc`. | Off |
+| skywalking_agent.ssl_trusted_ca_path | The gRPC SSL trusted ca file. Only available when `reporter_type` is `grpc`. | |
+| skywalking_agent.ssl_key_path | The private key file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist. Only available when `reporter_type` is `grpc`. | |
+| skywalking_agent.ssl_cert_chain_path | The certificate file. Enable mTLS when `ssl_key_path` and `ssl_cert_chain_path` exist. Only available when `reporter_type` is `grpc`. | |
+| skywalking_agent.heartbeat_period | Agent heartbeat report period. Unit, second. | 30 |
+| skywalking_agent.properties_report_period_factor | The agent sends the instance properties to the backend every heartbeat_period * properties_report_period_factor seconds. | 10 |
+| skywalking_agent.enable_zend_observer | Whether to use `zend observer` instead of `zend_execute_ex` to hook the functions, this feature is only available for PHP8+. | Off |
+| skywalking_agent.reporter_type | Reporter type, optional values are `grpc` and `kafka`. | grpc |
+| skywalking_agent.kafka_bootstrap_servers | A list of host/port pairs to use for connect to the Kafka cluster. Only available when `reporter_type` is `kafka`. | |
+| skywalking_agent.kafka_producer_config | Configure Kafka Producer configuration in JSON format `{"key": "value}`. Only available when `reporter_type` is `kafka`. | {} |
diff --git a/docs/en/reporter/kafka-reporter.md b/docs/en/reporter/kafka-reporter.md
new file mode 100644
index 0000000..10338c3
--- /dev/null
+++ b/docs/en/reporter/kafka-reporter.md
@@ -0,0 +1,56 @@
+# Kafka reporter
+
+By default, the configuration option `skywalking_agent.reporter_type` is `grpc`, means that the skywalking agent will report the traces, metrics, logs etc. to SkyWalking OAP Server by gPRC protocol.
+
+At the same time, SkyWalking also supports kafka-fetcher, so you can report traces, metrics, logs, etc. by kafka.
+
+But the skywalking agent does not compile the `kafka-reporter` feature by default, you need to enable the it.
+
+## Steps
+
+1. Compile the skywalking agent with feature `kafka-reporter`.
+
+ For pecl:
+
+ ```shell
+ pecl install skywalking_agent
+ ```
+
+ Enable the kafka reporter interactively:
+
+ ```txt
+ 68 source files, building
+ running: phpize
+ Configuring for:
+ PHP Api Version: 20220829
+ Zend Module Api No: 20220829
+ Zend Extension Api No: 420220829
+ enable cargo debug? [no] :
+ enable kafka reporter? [no] : yes
+ ```
+
+ Or, build from sources:
+
+ ```shell
+ phpize
+ ./configure --enable-kafka-reporter
+ make
+ make install
+ ```
+
+2. Config `php.ini`.
+
+ Switch to use kafka reporter.
+
+ ```ini
+ [skywalking_agent]
+ extension = skywalking_agent.so
+ skywalking_agent.reporter_type = kafka
+ skywalking_agent.kafka_bootstrap_servers = 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092
+ ```
+
+ If you want to custom the kafka reporter properties, you can specify it by JSON format:
+
+ ```ini
+ skywalking_agent.kafka_producer_config = {"delivery.timeout.ms": "12000"}
+ ```
diff --git a/docs/menu.yml b/docs/menu.yml
index 7e9c135..723db61 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -28,6 +28,10 @@
path: "/en/configuration/ini-settings"
- name: "Zend Observer"
path: "/en/configuration/zend-observer"
+ - name: "Reporter"
+ catalog:
+ - name: "Kafka Reporter"
+ path: "/en/reporter/kafka-reporter"
- name: "Contribution"
catalog:
- name: "Compiling Guidance"
diff --git a/package.tpl.xml b/package.tpl.xml
index 9a340cc..6c4bc2a 100644
--- a/package.tpl.xml
+++ b/package.tpl.xml
@@ -72,5 +72,6 @@
<providesextension>skywalking_agent</providesextension>
<extsrcrelease>
<configureoption default="no" name="enable-cargo-debug" prompt="enable cargo debug?" />
+ <configureoption default="no" name="enable-kafka-reporter" prompt="enable kafka reporter?" />
</extsrcrelease>
</package>
diff --git a/src/lib.rs b/src/lib.rs
index 5a16850..25a56a0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -24,6 +24,7 @@
mod execute;
mod module;
mod plugin;
+mod reporter;
mod request;
mod tag;
mod util;
@@ -88,6 +89,17 @@
/// PHP8's jit.
const SKYWALKING_AGENT_ENABLE_ZEND_OBSERVER: &str = "skywalking_agent.enable_zend_observer";
+/// Reporter type, optional values are `grpc` and `kafka`, default is `grpc`.
+const SKYWALKING_AGENT_REPORTER_TYPE: &str = "skywalking_agent.reporter_type";
+
+/// A list of host/port pairs to use for establishing the initial connection to
+/// the Kafka cluster. Only available when the reporter type is `kafka`.
+const SKYWALKING_AGENT_KAFKA_BOOTSTRAP_SERVERS: &str = "skywalking_agent.kafka_bootstrap_servers";
+
+/// Configure Kafka Producer configuration in JSON format.
+/// Only available when the reporter type is `kafka`.
+const SKYWALKING_AGENT_KAFKA_PRODUCER_CONFIG: &str = "skywalking_agent.kafka_producer_config";
+
#[php_get_module]
pub fn get_module() -> Module {
let mut module = Module::new(
@@ -153,6 +165,21 @@
Policy::System,
);
module.add_ini(SKYWALKING_AGENT_ENABLE_ZEND_OBSERVER, false, Policy::System);
+ module.add_ini(
+ SKYWALKING_AGENT_REPORTER_TYPE,
+ "grpc".to_string(),
+ Policy::System,
+ );
+ module.add_ini(
+ SKYWALKING_AGENT_KAFKA_BOOTSTRAP_SERVERS,
+ "".to_string(),
+ Policy::System,
+ );
+ module.add_ini(
+ SKYWALKING_AGENT_KAFKA_PRODUCER_CONFIG,
+ "{}".to_string(),
+ Policy::System,
+ );
// Hooks.
module.on_module_init(module::init);
diff --git a/src/module.rs b/src/module.rs
index 662044e..37b82b5 100644
--- a/src/module.rs
+++ b/src/module.rs
@@ -16,7 +16,7 @@
use crate::{
channel::Reporter,
execute::{register_execute_functions, register_observer_handlers},
- util::{get_sapi_module_name, IPS},
+ util::{get_sapi_module_name, get_str_ini_with_default, IPS},
worker::init_worker,
*,
};
@@ -28,7 +28,6 @@
trace::tracer::{self, Tracer},
};
use std::{
- borrow::ToOwned,
ffi::{CStr, OsStr},
fs::{self, OpenOptions},
os::unix::prelude::OsStrExt,
@@ -39,13 +38,30 @@
use tracing::{debug, error, info, metadata::LevelFilter};
use tracing_subscriber::FmtSubscriber;
-pub static SERVICE_NAME: Lazy<String> = Lazy::new(|| {
- ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SERVICE_NAME)
- .and_then(|s| s.to_str().ok())
- .map(ToOwned::to_owned)
- .unwrap_or_default()
+static IS_ENABLE: Lazy<bool> = Lazy::new(|| {
+ if !ini_get::<bool>(SKYWALKING_AGENT_ENABLE) {
+ return false;
+ }
+
+ let sapi = get_sapi_module_name().to_bytes();
+
+ if sapi == b"fpm-fcgi" {
+ return true;
+ }
+
+ if sapi == b"cli" && get_module_registry().exists("swoole") {
+ return true;
+ }
+
+ false
});
+pub static SERVER_ADDR: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SERVER_ADDR));
+
+pub static SERVICE_NAME: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SERVICE_NAME));
+
pub static SERVICE_INSTANCE: Lazy<String> =
Lazy::new(|| RandomGenerator::generate() + "@" + &IPS[0]);
@@ -76,35 +92,19 @@
dir
});
-pub static AUTHENTICATION: Lazy<String> = Lazy::new(|| {
- ini_get::<Option<&CStr>>(SKYWALKING_AGENT_AUTHENTICATION)
- .and_then(|s| s.to_str().ok())
- .map(ToOwned::to_owned)
- .unwrap_or_default()
-});
+pub static AUTHENTICATION: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_AUTHENTICATION));
pub static ENABLE_TLS: Lazy<bool> = Lazy::new(|| ini_get::<bool>(SKYWALKING_AGENT_ENABLE_TLS));
-pub static SSL_TRUSTED_CA_PATH: Lazy<String> = Lazy::new(|| {
- ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SSL_TRUSTED_CA_PATH)
- .and_then(|s| s.to_str().ok())
- .map(ToOwned::to_owned)
- .unwrap_or_default()
-});
+pub static SSL_TRUSTED_CA_PATH: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SSL_TRUSTED_CA_PATH));
-pub static SSL_KEY_PATH: Lazy<String> = Lazy::new(|| {
- ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SSL_KEY_PATH)
- .and_then(|s| s.to_str().ok())
- .map(ToOwned::to_owned)
- .unwrap_or_default()
-});
+pub static SSL_KEY_PATH: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SSL_KEY_PATH));
-pub static SSL_CERT_CHAIN_PATH: Lazy<String> = Lazy::new(|| {
- ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SSL_CERT_CHAIN_PATH)
- .and_then(|s| s.to_str().ok())
- .map(ToOwned::to_owned)
- .unwrap_or_default()
-});
+pub static SSL_CERT_CHAIN_PATH: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_SSL_CERT_CHAIN_PATH));
pub static HEARTBEAT_PERIOD: Lazy<i64> =
Lazy::new(|| ini_get::<i64>(SKYWALKING_AGENT_HEARTBEAT_PERIOD));
@@ -117,6 +117,18 @@
sys::PHP_MAJOR_VERSION >= 8 && ini_get::<bool>(SKYWALKING_AGENT_ENABLE_ZEND_OBSERVER)
});
+pub static WORKER_THREADS: Lazy<i64> =
+ Lazy::new(|| ini_get::<i64>(SKYWALKING_AGENT_WORKER_THREADS));
+
+pub static REPORTER_TYPE: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_REPORTER_TYPE));
+
+pub static KAFKA_BOOTSTRAP_SERVERS: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_KAFKA_BOOTSTRAP_SERVERS));
+
+pub static KAFKA_PRODUCER_CONFIG: Lazy<String> =
+ Lazy::new(|| get_str_ini_with_default(SKYWALKING_AGENT_KAFKA_PRODUCER_CONFIG));
+
/// For PHP 8.2+, zend observer api are now also called for internal functions.
///
/// Refer to this commit: <https://github.com/php/php-src/commit/625f1649639c2b9a9d76e4d42f88c264ddb8447d>
@@ -129,29 +141,43 @@
return;
}
+ // Initialize configuration properties.
+ Lazy::force(&SERVER_ADDR);
+ Lazy::force(&SERVICE_NAME);
+ Lazy::force(&SERVICE_INSTANCE);
+ Lazy::force(&SKYWALKING_VERSION);
+ Lazy::force(&RUNTIME_DIR);
+ Lazy::force(&SOCKET_FILE_PATH);
+ Lazy::force(&AUTHENTICATION);
+ Lazy::force(&ENABLE_TLS);
+ Lazy::force(&SSL_TRUSTED_CA_PATH);
+ Lazy::force(&SSL_KEY_PATH);
+ Lazy::force(&SSL_CERT_CHAIN_PATH);
+ Lazy::force(&HEARTBEAT_PERIOD);
+ Lazy::force(&PROPERTIES_REPORT_PERIOD_FACTOR);
+ Lazy::force(&ENABLE_ZEND_OBSERVER);
+ Lazy::force(&WORKER_THREADS);
+ Lazy::force(&REPORTER_TYPE);
+ Lazy::force(&KAFKA_BOOTSTRAP_SERVERS);
+ Lazy::force(&KAFKA_PRODUCER_CONFIG);
+
if let Err(err) = try_init_logger() {
eprintln!("skywalking_agent: initialize logger failed: {}", err);
}
// Skywalking agent info.
- let service_name = Lazy::force(&SERVICE_NAME);
- let service_instance = Lazy::force(&SERVICE_INSTANCE);
- let skywalking_version = Lazy::force(&SKYWALKING_VERSION);
- let authentication = Lazy::force(&AUTHENTICATION);
- let heartbeat_period = Lazy::force(&HEARTBEAT_PERIOD);
- let properties_report_period_factor = Lazy::force(&PROPERTIES_REPORT_PERIOD_FACTOR);
info!(
- service_name,
- service_instance,
- skywalking_version,
- authentication,
- heartbeat_period,
- properties_report_period_factor,
+ service_name = &*SERVICE_NAME,
+ service_instance = &*SERVICE_INSTANCE,
+ skywalking_version = &*SKYWALKING_VERSION,
+ heartbeat_period = &*HEARTBEAT_PERIOD,
+ properties_report_period_factor = &*PROPERTIES_REPORT_PERIOD_FACTOR,
"Starting skywalking agent"
);
// Skywalking version check.
- if *skywalking_version < 8 {
+ let skywalking_version = *SKYWALKING_VERSION;
+ if skywalking_version < 8 {
error!(
skywalking_version,
"The skywalking agent only supports versions after skywalking 8"
@@ -159,16 +185,6 @@
return;
}
- // Initialize TLS if enabled.
- let enable_tls = Lazy::force(&ENABLE_TLS);
- let ssl_trusted_ca_path = Lazy::force(&SSL_TRUSTED_CA_PATH);
- let ssl_key_path = Lazy::force(&SSL_KEY_PATH);
- let ssl_cert_chain_path = Lazy::force(&SSL_CERT_CHAIN_PATH);
- debug!(
- enable_tls,
- ssl_trusted_ca_path, ssl_key_path, ssl_cert_chain_path, "Skywalking TLS info"
- );
-
// Initialize runtime directory.
if RUNTIME_DIR.as_os_str().is_empty() {
error!("The skywalking agent runtime directory must not be empty");
@@ -180,12 +196,11 @@
}
// Initialize Agent worker.
- Lazy::force(&SOCKET_FILE_PATH);
init_worker();
tracer::set_global_tracer(Tracer::new(
- service_name,
- service_instance,
+ &*SERVICE_NAME,
+ &*SERVICE_INSTANCE,
Reporter::new(&*SOCKET_FILE_PATH),
));
@@ -243,27 +258,12 @@
Ok(())
}
+#[inline]
fn get_module_registry() -> &'static ZArr {
unsafe { ZArr::from_ptr(&sys::module_registry) }
}
+#[inline]
pub fn is_enable() -> bool {
- static IS_ENABLE: Lazy<bool> = Lazy::new(|| {
- if !ini_get::<bool>(SKYWALKING_AGENT_ENABLE) {
- return false;
- }
-
- let sapi = get_sapi_module_name().to_bytes();
-
- if sapi == b"fpm-fcgi" {
- return true;
- }
-
- if sapi == b"cli" && get_module_registry().exists("swoole") {
- return true;
- }
-
- false
- });
*IS_ENABLE
}
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
new file mode 100644
index 0000000..7e50175
--- /dev/null
+++ b/src/reporter/mod.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod reporter_grpc;
+mod reporter_kafka;
+
+use crate::module::REPORTER_TYPE;
+use anyhow::bail;
+use skywalking::reporter::{CollectItemConsume, CollectItemProduce};
+
+pub async fn run_reporter(
+ producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+) -> anyhow::Result<()> {
+ match REPORTER_TYPE.as_str() {
+ "grpc" => reporter_grpc::run_reporter(producer, consumer).await,
+ #[cfg(feature = "kafka-reporter")]
+ "kafka" => reporter_kafka::run_reporter(producer, consumer).await,
+ typ => bail!("unknown reporter type, {}", typ),
+ }
+}
diff --git a/src/reporter/reporter_grpc.rs b/src/reporter/reporter_grpc.rs
new file mode 100644
index 0000000..7948f57
--- /dev/null
+++ b/src/reporter/reporter_grpc.rs
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::module::{
+ AUTHENTICATION, ENABLE_TLS, SERVER_ADDR, SSL_CERT_CHAIN_PATH, SSL_KEY_PATH, SSL_TRUSTED_CA_PATH,
+};
+use anyhow::anyhow;
+use skywalking::reporter::{grpc::GrpcReporter, CollectItemConsume, CollectItemProduce};
+use std::time::Duration;
+use tokio::time::sleep;
+use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity};
+use tracing::{debug, info, warn};
+
+pub async fn run_reporter(
+ producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+) -> anyhow::Result<()> {
+ let endpoint = create_endpoint(&SERVER_ADDR).await?;
+ let channel = connect(endpoint).await;
+
+ let mut reporter = GrpcReporter::new_with_pc(channel, producer, consumer);
+
+ if !AUTHENTICATION.is_empty() {
+ reporter = reporter.with_authentication(&*AUTHENTICATION);
+ }
+
+ info!("Worker is ready...");
+
+ let handle = reporter
+ .reporting()
+ .await
+ .with_status_handle(|message, status| {
+ warn!(?status, "Collect failed: {}", message);
+ })
+ .spawn();
+
+ handle
+ .await
+ .map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;
+
+ Ok(())
+}
+
+async fn create_endpoint(server_addr: &str) -> anyhow::Result<Endpoint> {
+ let scheme = if *ENABLE_TLS { "https" } else { "http" };
+
+ let url = format!("{}://{}", scheme, server_addr);
+ debug!(url, "Create Endpoint");
+ let mut endpoint = Endpoint::from_shared(url)?;
+
+ debug!(
+ enable_tls = *ENABLE_TLS,
+ ssl_trusted_ca_path = &*SSL_TRUSTED_CA_PATH,
+ ssl_key_path = &*SSL_KEY_PATH,
+ ssl_cert_chain_path = &*SSL_CERT_CHAIN_PATH,
+ "Skywalking TLS info"
+ );
+
+ if *ENABLE_TLS {
+ let domain_name = server_addr.split(':').next().unwrap_or_default();
+ debug!(domain_name, "Configure TLS domain");
+ let mut tls = ClientTlsConfig::new().domain_name(domain_name);
+
+ let ssl_trusted_ca_path = SSL_TRUSTED_CA_PATH.as_str();
+ if !ssl_trusted_ca_path.is_empty() {
+ debug!(ssl_trusted_ca_path, "Configure TLS CA");
+ let ca_cert = tokio::fs::read(&*SSL_TRUSTED_CA_PATH).await?;
+ let ca_cert = Certificate::from_pem(ca_cert);
+ tls = tls.ca_certificate(ca_cert);
+ }
+
+ let ssl_key_path = SSL_KEY_PATH.as_str();
+ let ssl_cert_chain_path = SSL_CERT_CHAIN_PATH.as_str();
+ if !ssl_key_path.is_empty() && !ssl_cert_chain_path.is_empty() {
+ debug!(ssl_trusted_ca_path, "Configure mTLS");
+ let client_cert = tokio::fs::read(&*SSL_CERT_CHAIN_PATH).await?;
+ let client_key = tokio::fs::read(&*SSL_KEY_PATH).await?;
+ let client_identity = Identity::from_pem(client_cert, client_key);
+ tls = tls.identity(client_identity);
+ }
+
+ endpoint = endpoint.tls_config(tls)?;
+ }
+
+ Ok(endpoint)
+}
+
+#[tracing::instrument(skip_all)]
+async fn connect(endpoint: Endpoint) -> Channel {
+ let channel = loop {
+ match endpoint.connect().await {
+ Ok(channel) => break channel,
+ Err(err) => {
+ warn!(?err, "Connect to skywalking server failed, retry after 10s");
+ sleep(Duration::from_secs(10)).await;
+ }
+ }
+ };
+
+ let uri = &*endpoint.uri().to_string();
+ info!(uri, "Skywalking server connected");
+
+ channel
+}
diff --git a/src/reporter/reporter_kafka.rs b/src/reporter/reporter_kafka.rs
new file mode 100644
index 0000000..61e4cf6
--- /dev/null
+++ b/src/reporter/reporter_kafka.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#![cfg(feature = "kafka-reporter")]
+
+use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG};
+use anyhow::{bail, Context};
+use skywalking::reporter::{
+ kafka::{KafkaReportBuilder, RDKafkaClientConfig},
+ CollectItemConsume, CollectItemProduce,
+};
+use std::collections::HashMap;
+
+pub async fn run_reporter(
+ producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+) -> anyhow::Result<()> {
+ let mut client_config = RDKafkaClientConfig::new();
+
+ client_config.set("bootstrap.servers", &*KAFKA_BOOTSTRAP_SERVERS);
+
+ let config = serde_json::from_str::<HashMap<String, String>>(&KAFKA_PRODUCER_CONFIG)
+ .context("parse kafka producer config failed")?;
+ for (key, value) in config {
+ client_config.set(key, value);
+ }
+
+ let (_, reporting) = KafkaReportBuilder::new_with_pc(client_config, producer, consumer)
+ .build()
+ .await?;
+ let handle = reporting.spawn();
+ if let Err(err) = handle.await {
+ bail!("wait handle failed: {:?}", err);
+ }
+
+ Ok(())
+}
diff --git a/src/util.rs b/src/util.rs
index 7dd575c..e9fb1ca 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -15,7 +15,7 @@
use anyhow::anyhow;
use once_cell::sync::Lazy;
-use phper::{sys, values::ZVal};
+use phper::{ini::ini_get, sys, values::ZVal};
use std::{
ffi::CStr,
os::unix::prelude::OsStrExt,
@@ -96,3 +96,10 @@
libc::chmod(path.as_ptr().cast(), mode);
}
}
+
+pub fn get_str_ini_with_default(name: &str) -> String {
+ ini_get::<Option<&CStr>>(name)
+ .and_then(|s| s.to_str().ok())
+ .map(ToOwned::to_owned)
+ .unwrap_or_default()
+}
diff --git a/src/worker.rs b/src/worker.rs
index 282e7d0..80c74a7 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -16,23 +16,22 @@
use crate::{
channel::{self, TxReporter},
module::{
- AUTHENTICATION, ENABLE_TLS, HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR,
- SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH, SSL_CERT_CHAIN_PATH, SSL_KEY_PATH,
- SSL_TRUSTED_CA_PATH,
+ HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR, SERVICE_INSTANCE, SERVICE_NAME,
+ SOCKET_FILE_PATH, WORKER_THREADS,
},
+ reporter::run_reporter,
util::change_permission,
- SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS,
};
-use anyhow::anyhow;
+
use once_cell::sync::Lazy;
-use phper::ini::ini_get;
+
use skywalking::{
management::{instance::Properties, manager::Manager},
- reporter::{grpc::GrpcReporter, CollectItem, CollectItemConsume},
+ reporter::{CollectItem, CollectItemConsume},
};
use std::{
- cmp::Ordering, error::Error, ffi::CStr, fs, io, marker::PhantomData, num::NonZeroUsize,
- process::exit, thread::available_parallelism, time::Duration,
+ cmp::Ordering, error::Error, fs, io, marker::PhantomData, num::NonZeroUsize, process::exit,
+ thread::available_parallelism, time::Duration,
};
use tokio::{
net::UnixListener,
@@ -40,19 +39,11 @@
select,
signal::unix::{signal, SignalKind},
sync::mpsc::{self, error::TrySendError},
- time::sleep,
};
-use tonic::{
- async_trait,
- transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity},
-};
+use tonic::async_trait;
use tracing::{debug, error, info, warn};
pub fn init_worker() {
- let server_addr = ini_get::<Option<&CStr>>(SKYWALKING_AGENT_SERVER_ADDR)
- .and_then(|s| s.to_str().ok())
- .unwrap_or_default()
- .to_owned();
let worker_threads = worker_threads();
unsafe {
@@ -72,7 +63,7 @@
// Run the worker in subprocess.
let rt = new_tokio_runtime(worker_threads);
- match rt.block_on(start_worker(server_addr)) {
+ match rt.block_on(start_worker()) {
Ok(_) => {
exit(0);
}
@@ -88,7 +79,7 @@
}
fn worker_threads() -> usize {
- let worker_threads = ini_get::<i64>(SKYWALKING_AGENT_WORKER_THREADS);
+ let worker_threads = *WORKER_THREADS;
if worker_threads <= 0 {
available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
} else {
@@ -105,7 +96,7 @@
.unwrap()
}
-async fn start_worker(server_addr: String) -> anyhow::Result<()> {
+async fn start_worker() -> anyhow::Result<()> {
debug!("Starting worker...");
// Ensure to cleanup resources when worker exits.
@@ -167,30 +158,10 @@
}
});
- let endpoint = create_endpoint(&server_addr).await?;
- let channel = connect(endpoint).await;
-
report_properties_and_keep_alive(TxReporter(tx_));
- let mut reporter = GrpcReporter::new_with_pc(channel, (), Consumer(rx));
-
- if !AUTHENTICATION.is_empty() {
- reporter = reporter.with_authentication(&*AUTHENTICATION);
- }
-
- info!("Worker is ready...");
-
- let handle = reporter
- .reporting()
- .await
- .with_status_handle(|message, status| {
- warn!(?status, "Collect failed: {}", message);
- })
- .spawn();
-
- handle
- .await
- .map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;
+ // Run reporter with blocking.
+ run_reporter((), Consumer(rx)).await?;
Ok::<_, anyhow::Error>(())
};
@@ -209,60 +180,6 @@
Ok(())
}
-async fn create_endpoint(server_addr: &str) -> anyhow::Result<Endpoint> {
- let scheme = if *ENABLE_TLS { "https" } else { "http" };
-
- let url = format!("{}://{}", scheme, server_addr);
- debug!(url, "Create Endpoint");
- let mut endpoint = Endpoint::from_shared(url)?;
-
- if *ENABLE_TLS {
- let domain_name = server_addr.split(':').next().unwrap_or_default();
- debug!(domain_name, "Configure TLS domain");
- let mut tls = ClientTlsConfig::new().domain_name(domain_name);
-
- let ssl_trusted_ca_path = SSL_TRUSTED_CA_PATH.as_str();
- if !ssl_trusted_ca_path.is_empty() {
- debug!(ssl_trusted_ca_path, "Configure TLS CA");
- let ca_cert = tokio::fs::read(&*SSL_TRUSTED_CA_PATH).await?;
- let ca_cert = Certificate::from_pem(ca_cert);
- tls = tls.ca_certificate(ca_cert);
- }
-
- let ssl_key_path = SSL_KEY_PATH.as_str();
- let ssl_cert_chain_path = SSL_CERT_CHAIN_PATH.as_str();
- if !ssl_key_path.is_empty() && !ssl_cert_chain_path.is_empty() {
- debug!(ssl_trusted_ca_path, "Configure mTLS");
- let client_cert = tokio::fs::read(&*SSL_CERT_CHAIN_PATH).await?;
- let client_key = tokio::fs::read(&*SSL_KEY_PATH).await?;
- let client_identity = Identity::from_pem(client_cert, client_key);
- tls = tls.identity(client_identity);
- }
-
- endpoint = endpoint.tls_config(tls)?;
- }
-
- Ok(endpoint)
-}
-
-#[tracing::instrument(skip_all)]
-async fn connect(endpoint: Endpoint) -> Channel {
- let channel = loop {
- match endpoint.connect().await {
- Ok(channel) => break channel,
- Err(err) => {
- warn!(?err, "Connect to skywalking server failed, retry after 10s");
- sleep(Duration::from_secs(10)).await;
- }
- }
- };
-
- let uri = &*endpoint.uri().to_string();
- info!(uri, "Skywalking server connected");
-
- channel
-}
-
struct Consumer(mpsc::Receiver<CollectItem>);
#[async_trait]