blob: fdccb33574d3ab6f70d5e0c1c10f54a3e9c4cc54 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{channel, SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS};
use phper::ini::Ini;
use skywalking::reporter::grpc::GrpcReporter;
use std::{
cmp::Ordering, num::NonZeroUsize, process::exit, thread::available_parallelism, time::Duration,
};
use tokio::{
runtime::{self, Runtime},
select,
signal::unix::{signal, SignalKind},
time::sleep,
};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, error, info, warn};
pub fn init_worker() {
let server_addr = Ini::get::<String>(SKYWALKING_AGENT_SERVER_ADDR).unwrap_or_default();
let worker_threads = worker_threads();
unsafe {
// TODO Shutdown previous worker before fork if threre is a PHP-FPM reload
// operation.
// TODO Chagne the worker process name.
let pid = libc::fork();
match pid.cmp(&0) {
Ordering::Less => {
error!("fork failed");
}
Ordering::Equal => {
let rt = new_tokio_runtime(worker_threads);
rt.block_on(start_worker(server_addr));
exit(0);
}
Ordering::Greater => {}
}
}
}
fn worker_threads() -> usize {
let worker_threads = Ini::get::<i64>(SKYWALKING_AGENT_WORKER_THREADS).unwrap_or(0);
if worker_threads <= 0 {
available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
} else {
worker_threads as usize
}
}
fn new_tokio_runtime(worker_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.thread_name("sw: worker")
.enable_all()
.worker_threads(worker_threads)
.build()
.unwrap()
}
async fn start_worker(server_addr: String) {
debug!("Starting worker...");
// Graceful shutdown signal, put it on the top of program.
let mut sig = match signal(SignalKind::terminate()) {
Ok(signal) => signal,
Err(err) => {
error!(?err, "Signal terminate failed");
return;
}
};
let fut = async move {
let endpoint = match Endpoint::from_shared(server_addr) {
Ok(endpoint) => endpoint,
Err(err) => {
error!(?err, "Create endpoint failed");
return;
}
};
let channel = connect(endpoint).await;
let consumer = match channel::Consumer::new() {
Ok(consumer) => consumer,
Err(err) => {
error!(?err, "Create consumer failed");
return;
}
};
let reporter = GrpcReporter::new_with_pc(channel, (), consumer);
// report_instance_properties(channel.clone()).await;
// mark_ready_for_request();
info!("Worker is ready...");
let handle = reporter
.reporting()
.await
// .with_graceful_shutdown(async move {
// sig.recv().await;
// info!("Shutdown signal received");
// })
.with_staus_handle(|status| {
warn!(?status, "Collect failed");
})
.spawn();
if let Err(err) = handle.await {
error!(?err, "Tracer reporting failed");
}
};
// TODO Do graceful shutdown, and wait 10s then force quit.
select! {
_ = sig.recv() => {}
_ = fut => {}
}
}
#[tracing::instrument(skip_all)]
async fn connect(endpoint: Endpoint) -> Channel {
let channel = loop {
match endpoint.connect().await {
Ok(channel) => break channel,
Err(err) => {
warn!(?err, "Connect to skywalking server failed, retry after 10s");
sleep(Duration::from_secs(10)).await;
}
}
};
let uri = &*endpoint.uri().to_string();
info!(uri, "Skywalking server connected");
channel
}