Report instance properties and keep alived. (#46)

diff --git a/Cargo.lock b/Cargo.lock
index 740b8b7..b5345e2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1979,11 +1979,13 @@
  "cfg-if",
  "futures-core",
  "futures-util",
+ "hostname",
  "once_cell",
  "portable-atomic",
  "prost",
  "prost-derive",
  "serde",
+ "systemstat",
  "thiserror",
  "tokio",
  "tonic",
diff --git a/Cargo.toml b/Cargo.toml
index 1add6f5..36d9a18 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,7 +47,7 @@
 phper = "0.10.2"
 prost = "0.11.6"
 serde_json = { version = "1.0.91", features = ["preserve_order"] }
-skywalking = "0.5.0"
+skywalking = { version = "0.5.0", features = ["management"] }
 systemstat = "0.2.2"
 thiserror = "1.0.38"
 tokio = { version = "1.24.1", features = ["full"] }
diff --git a/src/channel.rs b/src/channel.rs
index fe802a0..e29617b 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -17,9 +17,14 @@
 use once_cell::sync::OnceCell;
 use skywalking::reporter::{CollectItem, Report};
 use std::{
-    io::Write, mem::size_of, ops::DerefMut, os::unix::net::UnixStream, path::Path, sync::Mutex,
+    io::Write,
+    mem::size_of,
+    ops::DerefMut,
+    os::unix::net::UnixStream,
+    path::{Path, PathBuf},
+    sync::Mutex,
 };
-use tokio::io::AsyncReadExt;
+use tokio::{io::AsyncReadExt, sync::mpsc};
 use tracing::error;
 
 fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
@@ -47,15 +52,15 @@
     Ok(item)
 }
 
-pub struct Reporter<T: AsRef<Path>> {
-    worker_addr: T,
+pub struct Reporter {
+    worker_addr: PathBuf,
     stream: OnceCell<Mutex<UnixStream>>,
 }
 
-impl<T: AsRef<Path>> Reporter<T> {
-    pub fn new(worker_addr: T) -> Self {
+impl Reporter {
+    pub fn new(worker_addr: impl AsRef<Path>) -> Self {
         Self {
-            worker_addr,
+            worker_addr: worker_addr.as_ref().to_path_buf(),
             stream: OnceCell::new(),
         }
     }
@@ -71,10 +76,20 @@
     }
 }
 
-impl<T: AsRef<Path>> Report for Reporter<T> {
+impl Report for Reporter {
     fn report(&self, item: CollectItem) {
         if let Err(err) = self.try_report(item) {
             error!(?err, "channel send failed");
         }
     }
 }
+
+pub struct TxReporter(pub mpsc::Sender<CollectItem>);
+
+impl Report for TxReporter {
+    fn report(&self, item: CollectItem) {
+        if let Err(err) = self.0.try_send(item) {
+            error!(?err, "Send collect item failed");
+        }
+    }
+}
diff --git a/src/module.rs b/src/module.rs
index 2c1cdc0..364655b 100644
--- a/src/module.rs
+++ b/src/module.rs
@@ -28,7 +28,7 @@
     trace::tracer::{self, Tracer},
 };
 use std::{borrow::ToOwned, env, ffi::CStr, path::Path, str::FromStr, time::SystemTime};
-use tracing::{info, metadata::LevelFilter};
+use tracing::{error, info, metadata::LevelFilter};
 use tracing_subscriber::FmtSubscriber;
 
 pub static SERVICE_NAME: Lazy<String> = Lazy::new(|| {
@@ -72,6 +72,15 @@
         service_instance, skywalking_version, "Starting skywalking agent"
     );
 
+    // Skywalking version check
+    if *skywalking_version < 8 {
+        error!(
+            skywalking_version,
+            "The skywalking agent only supports versions after skywalking 8"
+        );
+        return;
+    }
+
     Lazy::force(&SOCKET_FILE_PATH);
     init_worker();
 
diff --git a/src/worker.rs b/src/worker.rs
index d4f52b4..1e6bb94 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -14,18 +14,24 @@
 // limitations under the License.
 
 use crate::{
-    channel, module::SOCKET_FILE_PATH, util::change_permission, SKYWALKING_AGENT_SERVER_ADDR,
-    SKYWALKING_AGENT_WORKER_THREADS,
+    channel::{self, TxReporter},
+    module::{SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH},
+    util::change_permission,
+    SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS,
 };
+use anyhow::anyhow;
 use once_cell::sync::Lazy;
 use phper::ini::ini_get;
-use skywalking::reporter::{
-    grpc::{CollectItemConsume, GrpcReporter},
-    CollectItem,
+use skywalking::{
+    management::{instance::Properties, manager::Manager},
+    reporter::{
+        grpc::{CollectItemConsume, GrpcReporter},
+        CollectItem,
+    },
 };
 use std::{
-    cmp::Ordering, error::Error, ffi::CStr, fs, io, num::NonZeroUsize, process::exit,
-    thread::available_parallelism, time::Duration,
+    cmp::Ordering, error::Error, ffi::CStr, fs, io, marker::PhantomData, num::NonZeroUsize,
+    process::exit, thread::available_parallelism, time::Duration,
 };
 use tokio::{
     net::UnixListener,
@@ -51,7 +57,7 @@
     unsafe {
         // TODO Shutdown previous worker before fork if there is a PHP-FPM reload
         // operation.
-        // TODO Chagne the worker process name.
+        // TODO Change the worker process name.
 
         let pid = libc::fork();
         match pid.cmp(&0) {
@@ -63,9 +69,17 @@
                 #[cfg(target_os = "linux")]
                 libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
 
+                // Run the worker in subprocess.
                 let rt = new_tokio_runtime(worker_threads);
-                rt.block_on(start_worker(server_addr));
-                exit(0);
+                match rt.block_on(start_worker(server_addr)) {
+                    Ok(_) => {
+                        exit(0);
+                    }
+                    Err(err) => {
+                        error!(?err, "worker exit unexpectedly");
+                        exit(1);
+                    }
+                }
             }
             Ordering::Greater => {}
         }
@@ -90,39 +104,25 @@
         .unwrap()
 }
 
-async fn start_worker(server_addr: String) {
+async fn start_worker(server_addr: String) -> anyhow::Result<()> {
     debug!("Starting worker...");
 
+    // Ensure to cleanup resources when worker exits.
+    let _guard = WorkerExitGuard::default();
+
     // Graceful shutdown signal, put it on the top of program.
-    let mut sig_term = match signal(SignalKind::terminate()) {
-        Ok(signal) => signal,
-        Err(err) => {
-            error!(?err, "Signal terminate failed");
-            return;
-        }
-    };
-    let mut sig_int = match signal(SignalKind::interrupt()) {
-        Ok(signal) => signal,
-        Err(err) => {
-            error!(?err, "Signal interrupt failed");
-            return;
-        }
-    };
+    let mut sig_term = signal(SignalKind::terminate())?;
+    let mut sig_int = signal(SignalKind::interrupt())?;
 
     let socket_file = SOCKET_FILE_PATH.as_str();
 
     let fut = async move {
         debug!(socket_file, "Bind unix stream");
-        let listener = match UnixListener::bind(socket_file) {
-            Ok(listener) => listener,
-            Err(err) => {
-                error!(?err, "Bind failed");
-                return;
-            }
-        };
+        let listener = UnixListener::bind(socket_file)?;
         change_permission(socket_file, 0o777);
 
-        let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + Send>>>(255);
+        let (tx, rx) = mpsc::channel::<CollectItem>(255);
+        let tx_ = tx.clone();
         tokio::spawn(async move {
             loop {
                 match listener.accept().await {
@@ -139,13 +139,19 @@
                                             debug!("Leaving channel_receive loop");
                                             return;
                                         }
-                                        _ => Err(err.into()),
+                                        _ => {
+                                            error!(?err, "channel_receive failed");
+                                            continue;
+                                        }
                                     },
-                                    Ok(i) => Ok(i),
+                                    Ok(i) => i,
                                 };
 
+                                // Try send here, to prevent the ipc blocking caused by the channel
+                                // bursting (too late to report),
+                                // which affects the pool process of php-fpm.
                                 if let Err(err) = tx.try_send(r) {
-                                    error!(?err, "Send failed");
+                                    error!(?err, "Send collect item failed");
                                     if !matches!(err, TrySendError::Full(_)) {
                                         return;
                                     }
@@ -160,48 +166,42 @@
             }
         });
 
-        let endpoint = match Endpoint::from_shared(server_addr) {
-            Ok(endpoint) => endpoint,
-            Err(err) => {
-                error!(?err, "Create endpoint failed");
-                return;
-            }
-        };
+        let endpoint = Endpoint::from_shared(server_addr)?;
         let channel = connect(endpoint).await;
 
+        report_properties_and_keep_alive(TxReporter(tx_));
+
         let reporter = GrpcReporter::new_with_pc(channel, (), Consumer(rx));
 
-        // report_instance_properties(channel.clone()).await;
-        // mark_ready_for_request();
         info!("Worker is ready...");
 
         let handle = reporter
             .reporting()
             .await
-            // .with_graceful_shutdown(async move {
-            //     sig.recv().await;
-            //     info!("Shutdown signal received");
-            // })
             .with_status_handle(|status| {
                 warn!(?status, "Collect failed");
             })
             .spawn();
 
-        if let Err(err) = handle.await {
-            error!(?err, "Tracer reporting failed");
-        }
+        handle
+            .await
+            .map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;
+
+        Ok::<_, anyhow::Error>(())
     };
 
     // TODO Do graceful shutdown, and wait 10s then force quit.
     select! {
         _ = sig_term.recv() => {}
         _ = sig_int.recv() => {}
-        _ = fut => {}
+        r = fut => {
+            r?;
+        }
     }
 
     info!("Start to shutdown skywalking grpc reporter");
 
-    worker_exit();
+    Ok(())
 }
 
 #[tracing::instrument(skip_all)]
@@ -222,36 +222,50 @@
     channel
 }
 
-struct Consumer(mpsc::Receiver<Result<CollectItem, Box<dyn Error + Send>>>);
+struct Consumer(mpsc::Receiver<CollectItem>);
 
 #[async_trait]
 impl CollectItemConsume for Consumer {
     async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        self.0
-            .recv()
-            .await
-            .map(|result| result.map(Some))
-            .unwrap_or(Ok(None))
+        Ok(self.0.recv().await)
     }
 
     async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        self.0
-            .try_recv()
-            .map(|result| result.map(Some))
-            .unwrap_or(Ok(None))
+        Ok(self.0.try_recv().ok())
     }
 }
 
-fn worker_exit() {
-    match Lazy::get(&SOCKET_FILE_PATH) {
-        Some(socket_file) => {
-            info!(socket_file, "Remove socket file");
-            if let Err(err) = fs::remove_file(socket_file) {
-                error!(?err, "Remove socket file failed");
+#[derive(Default)]
+struct WorkerExitGuard(PhantomData<()>);
+
+impl Drop for WorkerExitGuard {
+    fn drop(&mut self) {
+        match Lazy::get(&SOCKET_FILE_PATH) {
+            Some(socket_file) => {
+                info!(socket_file, "Remove socket file");
+                if let Err(err) = fs::remove_file(socket_file) {
+                    error!(?err, "Remove socket file failed");
+                }
+            }
+            None => {
+                warn!("Socket file not created");
             }
         }
-        None => {
-            warn!("Socket file not created");
-        }
     }
 }
+
+fn report_properties_and_keep_alive(reporter: TxReporter) {
+    let manager = Manager::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter);
+
+    let mut props = Properties::new();
+    props.insert_os_info();
+    props.update(Properties::KEY_LANGUAGE, "php");
+    props.update(Properties::KEY_PROCESS_NO, unsafe {
+        libc::getppid().to_string()
+    });
+    debug!(?props, "Report instance properties");
+
+    manager.report_properties(props);
+
+    manager.keep_alive(Duration::from_secs(10));
+}