blob: c927216f68ab78be46c3acd772bcef9f88a26493 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{channel, SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS};
use libc::{fork, prctl, PR_SET_PDEATHSIG, SIGTERM};
use phper::ini::Ini;
use skywalking::reporter::grpc::GrpcReporter;
use std::{
cmp::Ordering, num::NonZeroUsize, process::exit, thread::available_parallelism, time::Duration,
};
use tokio::{
runtime::{self, Runtime},
select,
signal::unix::{signal, SignalKind},
time::sleep,
};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, error, info, warn};
pub fn init_worker() {
let server_addr = Ini::get::<String>(SKYWALKING_AGENT_SERVER_ADDR).unwrap_or_default();
let worker_threads = worker_threads();
unsafe {
// TODO Shutdown previous worker before fork if threre is a PHP-FPM reload
// operation.
// TODO Chagne the worker process name.
let pid = fork();
match pid.cmp(&0) {
Ordering::Less => {
error!("fork failed");
}
Ordering::Equal => {
prctl(PR_SET_PDEATHSIG, SIGTERM);
let rt = new_tokio_runtime(worker_threads);
rt.block_on(start_worker(server_addr));
exit(0);
}
Ordering::Greater => {}
}
}
}
fn worker_threads() -> usize {
let worker_threads = Ini::get::<i64>(SKYWALKING_AGENT_WORKER_THREADS).unwrap_or(0);
if worker_threads <= 0 {
available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
} else {
worker_threads as usize
}
}
fn new_tokio_runtime(worker_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(worker_threads)
.build()
.unwrap()
}
async fn start_worker(server_addr: String) {
debug!("Starting worker...");
// Graceful shutdown signal, put it on the top of program.
let mut sig = match signal(SignalKind::terminate()) {
Ok(signal) => signal,
Err(err) => {
error!(?err, "Signal terminate failed");
return;
}
};
let fut = async move {
let endpoint = match Endpoint::from_shared(server_addr) {
Ok(endpoint) => endpoint,
Err(err) => {
error!(?err, "Create endpoint failed");
return;
}
};
let channel = connect(endpoint).await;
let reporter = GrpcReporter::new_with_pc(channel, (), channel::Consumer);
// report_instance_properties(channel.clone()).await;
// mark_ready_for_request();
info!("Worker is ready...");
let handle = reporter
.reporting()
.await
// .with_graceful_shutdown(async move {
// sig.recv().await;
// info!("Shutdown signal received");
// })
.with_staus_handle(|status| {
warn!(?status, "Collect failed");
})
.spawn();
if let Err(err) = handle.await {
error!(?err, "Tracer reporting failed");
}
};
// TODO Do graceful shutdown, and wait 10s then force quit.
select! {
_ = sig.recv() => {}
_ = fut => {}
}
}
#[tracing::instrument(skip_all)]
async fn connect(endpoint: Endpoint) -> Channel {
let channel = loop {
match endpoint.connect().await {
Ok(channel) => break channel,
Err(err) => {
warn!(?err, "Connect to skywalking server failed, retry after 10s");
sleep(Duration::from_secs(10)).await;
}
}
};
let uri = &*endpoint.uri().to_string();
info!(uri, "Skywalking server connected");
channel
}
// #[tracing::instrument(skip_all)]
// async fn report_instance_properties(channel: Channel) {
// let mut manage_client = ManagementServiceClient::new(channel);
// loop {
// let properties = vec![
// KeyStringValuePair {
// key: "language".to_owned(),
// value: "php".to_owned(),
// },
// KeyStringValuePair {
// key: "OS Name".to_owned(),
// value: OS_NAME.to_owned(),
// },
// KeyStringValuePair {
// key: "hostname".to_owned(),
// value: HOST_NAME.to_owned(),
// },
// KeyStringValuePair {
// key: "Process No.".to_owned(),
// value: process::id().to_string(),
// },
// KeyStringValuePair {
// key: "ipv4".to_owned(),
// value: IPS.join(","),
// },
// KeyStringValuePair {
// key: "Start Time".to_owned(),
// value: current_formatted_time(),
// },
// ];
// let properties = InstanceProperties {
// service: SERVICE_NAME.clone(),
// service_instance: SERVICE_INSTANCE.clone(),
// properties: properties.clone(),
// layer: "".to_string(),
// };
// match manage_client
// .report_instance_properties(properties.clone())
// .await
// {
// Ok(_) => {
// debug!("Report instance properties, properties: {:?}",
// properties); break;
// }
// Err(e) => {
// warn!("Report instance properties failed, retry after 10s:
// {}", e); sleep(Duration::from_secs(10)).await;
// }
// }
// }
// }
// #[tracing::instrument(skip_all)]
// async fn receive_and_trace(channel: Channel) {
// warn!("Start");
// let mut report_client = TraceSegmentReportServiceClient::new(channel);
// loop {
// let f = async {
// let data = channel_receive()?;
// warn!(length = data.len(), "Channel received");
// // let segment: SegmentObject = Message::decode(&*data)?;
// // report_client
// // .collect(tokio_stream::iter(vec![segment]))
// // .await?;
// Ok::<_, anyhow::Error>(())
// };
// if let Err(e) = f.await {
// error!("Receive and trace failed: {}", e);
// sleep(Duration::from_secs(10)).await;
// }
// }
// }