Report instance properties and keep alived. (#46)
diff --git a/Cargo.lock b/Cargo.lock
index 740b8b7..b5345e2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1979,11 +1979,13 @@
"cfg-if",
"futures-core",
"futures-util",
+ "hostname",
"once_cell",
"portable-atomic",
"prost",
"prost-derive",
"serde",
+ "systemstat",
"thiserror",
"tokio",
"tonic",
diff --git a/Cargo.toml b/Cargo.toml
index 1add6f5..36d9a18 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,7 +47,7 @@
phper = "0.10.2"
prost = "0.11.6"
serde_json = { version = "1.0.91", features = ["preserve_order"] }
-skywalking = "0.5.0"
+skywalking = { version = "0.5.0", features = ["management"] }
systemstat = "0.2.2"
thiserror = "1.0.38"
tokio = { version = "1.24.1", features = ["full"] }
diff --git a/src/channel.rs b/src/channel.rs
index fe802a0..e29617b 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -17,9 +17,14 @@
use once_cell::sync::OnceCell;
use skywalking::reporter::{CollectItem, Report};
use std::{
- io::Write, mem::size_of, ops::DerefMut, os::unix::net::UnixStream, path::Path, sync::Mutex,
+ io::Write,
+ mem::size_of,
+ ops::DerefMut,
+ os::unix::net::UnixStream,
+ path::{Path, PathBuf},
+ sync::Mutex,
};
-use tokio::io::AsyncReadExt;
+use tokio::{io::AsyncReadExt, sync::mpsc};
use tracing::error;
fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
@@ -47,15 +52,15 @@
Ok(item)
}
-pub struct Reporter<T: AsRef<Path>> {
- worker_addr: T,
+pub struct Reporter {
+ worker_addr: PathBuf,
stream: OnceCell<Mutex<UnixStream>>,
}
-impl<T: AsRef<Path>> Reporter<T> {
- pub fn new(worker_addr: T) -> Self {
+impl Reporter {
+ pub fn new(worker_addr: impl AsRef<Path>) -> Self {
Self {
- worker_addr,
+ worker_addr: worker_addr.as_ref().to_path_buf(),
stream: OnceCell::new(),
}
}
@@ -71,10 +76,20 @@
}
}
-impl<T: AsRef<Path>> Report for Reporter<T> {
+impl Report for Reporter {
fn report(&self, item: CollectItem) {
if let Err(err) = self.try_report(item) {
error!(?err, "channel send failed");
}
}
}
+
+pub struct TxReporter(pub mpsc::Sender<CollectItem>);
+
+impl Report for TxReporter {
+ fn report(&self, item: CollectItem) {
+ if let Err(err) = self.0.try_send(item) {
+ error!(?err, "Send collect item failed");
+ }
+ }
+}
diff --git a/src/module.rs b/src/module.rs
index 2c1cdc0..364655b 100644
--- a/src/module.rs
+++ b/src/module.rs
@@ -28,7 +28,7 @@
trace::tracer::{self, Tracer},
};
use std::{borrow::ToOwned, env, ffi::CStr, path::Path, str::FromStr, time::SystemTime};
-use tracing::{info, metadata::LevelFilter};
+use tracing::{error, info, metadata::LevelFilter};
use tracing_subscriber::FmtSubscriber;
pub static SERVICE_NAME: Lazy<String> = Lazy::new(|| {
@@ -72,6 +72,15 @@
service_instance, skywalking_version, "Starting skywalking agent"
);
+ // Skywalking version check
+ if *skywalking_version < 8 {
+ error!(
+ skywalking_version,
+ "The skywalking agent only supports versions after skywalking 8"
+ );
+ return;
+ }
+
Lazy::force(&SOCKET_FILE_PATH);
init_worker();
diff --git a/src/worker.rs b/src/worker.rs
index d4f52b4..1e6bb94 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -14,18 +14,24 @@
// limitations under the License.
use crate::{
- channel, module::SOCKET_FILE_PATH, util::change_permission, SKYWALKING_AGENT_SERVER_ADDR,
- SKYWALKING_AGENT_WORKER_THREADS,
+ channel::{self, TxReporter},
+ module::{SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH},
+ util::change_permission,
+ SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS,
};
+use anyhow::anyhow;
use once_cell::sync::Lazy;
use phper::ini::ini_get;
-use skywalking::reporter::{
- grpc::{CollectItemConsume, GrpcReporter},
- CollectItem,
+use skywalking::{
+ management::{instance::Properties, manager::Manager},
+ reporter::{
+ grpc::{CollectItemConsume, GrpcReporter},
+ CollectItem,
+ },
};
use std::{
- cmp::Ordering, error::Error, ffi::CStr, fs, io, num::NonZeroUsize, process::exit,
- thread::available_parallelism, time::Duration,
+ cmp::Ordering, error::Error, ffi::CStr, fs, io, marker::PhantomData, num::NonZeroUsize,
+ process::exit, thread::available_parallelism, time::Duration,
};
use tokio::{
net::UnixListener,
@@ -51,7 +57,7 @@
unsafe {
// TODO Shutdown previous worker before fork if there is a PHP-FPM reload
// operation.
- // TODO Chagne the worker process name.
+ // TODO Change the worker process name.
let pid = libc::fork();
match pid.cmp(&0) {
@@ -63,9 +69,17 @@
#[cfg(target_os = "linux")]
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
+ // Run the worker in subprocess.
let rt = new_tokio_runtime(worker_threads);
- rt.block_on(start_worker(server_addr));
- exit(0);
+ match rt.block_on(start_worker(server_addr)) {
+ Ok(_) => {
+ exit(0);
+ }
+ Err(err) => {
+ error!(?err, "worker exit unexpectedly");
+ exit(1);
+ }
+ }
}
Ordering::Greater => {}
}
@@ -90,39 +104,25 @@
.unwrap()
}
-async fn start_worker(server_addr: String) {
+async fn start_worker(server_addr: String) -> anyhow::Result<()> {
debug!("Starting worker...");
+ // Ensure to cleanup resources when worker exits.
+ let _guard = WorkerExitGuard::default();
+
// Graceful shutdown signal, put it on the top of program.
- let mut sig_term = match signal(SignalKind::terminate()) {
- Ok(signal) => signal,
- Err(err) => {
- error!(?err, "Signal terminate failed");
- return;
- }
- };
- let mut sig_int = match signal(SignalKind::interrupt()) {
- Ok(signal) => signal,
- Err(err) => {
- error!(?err, "Signal interrupt failed");
- return;
- }
- };
+ let mut sig_term = signal(SignalKind::terminate())?;
+ let mut sig_int = signal(SignalKind::interrupt())?;
let socket_file = SOCKET_FILE_PATH.as_str();
let fut = async move {
debug!(socket_file, "Bind unix stream");
- let listener = match UnixListener::bind(socket_file) {
- Ok(listener) => listener,
- Err(err) => {
- error!(?err, "Bind failed");
- return;
- }
- };
+ let listener = UnixListener::bind(socket_file)?;
change_permission(socket_file, 0o777);
- let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + Send>>>(255);
+ let (tx, rx) = mpsc::channel::<CollectItem>(255);
+ let tx_ = tx.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
@@ -139,13 +139,19 @@
debug!("Leaving channel_receive loop");
return;
}
- _ => Err(err.into()),
+ _ => {
+ error!(?err, "channel_receive failed");
+ continue;
+ }
},
- Ok(i) => Ok(i),
+ Ok(i) => i,
};
+ // Try send here, to prevent the ipc blocking caused by the channel
+ // bursting (too late to report),
+ // which affects the pool process of php-fpm.
if let Err(err) = tx.try_send(r) {
- error!(?err, "Send failed");
+ error!(?err, "Send collect item failed");
if !matches!(err, TrySendError::Full(_)) {
return;
}
@@ -160,48 +166,42 @@
}
});
- let endpoint = match Endpoint::from_shared(server_addr) {
- Ok(endpoint) => endpoint,
- Err(err) => {
- error!(?err, "Create endpoint failed");
- return;
- }
- };
+ let endpoint = Endpoint::from_shared(server_addr)?;
let channel = connect(endpoint).await;
+ report_properties_and_keep_alive(TxReporter(tx_));
+
let reporter = GrpcReporter::new_with_pc(channel, (), Consumer(rx));
- // report_instance_properties(channel.clone()).await;
- // mark_ready_for_request();
info!("Worker is ready...");
let handle = reporter
.reporting()
.await
- // .with_graceful_shutdown(async move {
- // sig.recv().await;
- // info!("Shutdown signal received");
- // })
.with_status_handle(|status| {
warn!(?status, "Collect failed");
})
.spawn();
- if let Err(err) = handle.await {
- error!(?err, "Tracer reporting failed");
- }
+ handle
+ .await
+ .map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;
+
+ Ok::<_, anyhow::Error>(())
};
// TODO Do graceful shutdown, and wait 10s then force quit.
select! {
_ = sig_term.recv() => {}
_ = sig_int.recv() => {}
- _ = fut => {}
+ r = fut => {
+ r?;
+ }
}
info!("Start to shutdown skywalking grpc reporter");
- worker_exit();
+ Ok(())
}
#[tracing::instrument(skip_all)]
@@ -222,36 +222,50 @@
channel
}
-struct Consumer(mpsc::Receiver<Result<CollectItem, Box<dyn Error + Send>>>);
+struct Consumer(mpsc::Receiver<CollectItem>);
#[async_trait]
impl CollectItemConsume for Consumer {
async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- self.0
- .recv()
- .await
- .map(|result| result.map(Some))
- .unwrap_or(Ok(None))
+ Ok(self.0.recv().await)
}
async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- self.0
- .try_recv()
- .map(|result| result.map(Some))
- .unwrap_or(Ok(None))
+ Ok(self.0.try_recv().ok())
}
}
-fn worker_exit() {
- match Lazy::get(&SOCKET_FILE_PATH) {
- Some(socket_file) => {
- info!(socket_file, "Remove socket file");
- if let Err(err) = fs::remove_file(socket_file) {
- error!(?err, "Remove socket file failed");
+#[derive(Default)]
+struct WorkerExitGuard(PhantomData<()>);
+
+impl Drop for WorkerExitGuard {
+ fn drop(&mut self) {
+ match Lazy::get(&SOCKET_FILE_PATH) {
+ Some(socket_file) => {
+ info!(socket_file, "Remove socket file");
+ if let Err(err) = fs::remove_file(socket_file) {
+ error!(?err, "Remove socket file failed");
+ }
+ }
+ None => {
+ warn!("Socket file not created");
}
}
- None => {
- warn!("Socket file not created");
- }
}
}
+
+fn report_properties_and_keep_alive(reporter: TxReporter) {
+ let manager = Manager::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter);
+
+ let mut props = Properties::new();
+ props.insert_os_info();
+ props.update(Properties::KEY_LANGUAGE, "php");
+ props.update(Properties::KEY_PROCESS_NO, unsafe {
+ libc::getppid().to_string()
+ });
+ debug!(?props, "Report instance properties");
+
+ manager.report_properties(props);
+
+ manager.keep_alive(Duration::from_secs(10));
+}