| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| use anyhow::anyhow; |
| use once_cell::sync::OnceCell; |
| use skywalking::reporter::{CollectItem, Report}; |
| use std::{ |
| io::Write, mem::size_of, ops::DerefMut, os::unix::net::UnixStream, path::Path, sync::Mutex, |
| }; |
| use tokio::io::AsyncReadExt; |
| use tracing::error; |
| |
| fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()> |
| where |
| T: DerefMut<Target = UnixStream>, |
| { |
| let content = bincode::serialize(&data)?; |
| |
| sender.write_all(&content.len().to_le_bytes())?; |
| sender.write_all(&content)?; |
| sender.flush()?; |
| |
| Ok(()) |
| } |
| |
| pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) -> anyhow::Result<CollectItem> { |
| let mut size_buf = [0u8; size_of::<usize>()]; |
| receiver.read_exact(&mut size_buf).await?; |
| let size = usize::from_le_bytes(size_buf); |
| |
| let mut content = vec![0u8; size]; |
| receiver.read_exact(&mut content).await?; |
| |
| let item = bincode::deserialize(&content)?; |
| Ok(item) |
| } |
| |
| pub struct Reporter<T: AsRef<Path>> { |
| worker_addr: T, |
| stream: OnceCell<Mutex<UnixStream>>, |
| } |
| |
| impl<T: AsRef<Path>> Reporter<T> { |
| pub fn new(worker_addr: T) -> Self { |
| Self { |
| worker_addr, |
| stream: OnceCell::new(), |
| } |
| } |
| |
| fn try_report(&self, item: CollectItem) -> anyhow::Result<()> { |
| let stream = self |
| .stream |
| .get_or_try_init(|| UnixStream::connect(&self.worker_addr).map(Mutex::new))? |
| .lock() |
| .map_err(|_| anyhow!("Get Lock failed"))?; |
| |
| channel_send(item, stream) |
| } |
| } |
| |
| impl<T: AsRef<Path>> Report for Reporter<T> { |
| fn report(&self, item: CollectItem) { |
| if let Err(err) = self.try_report(item) { |
| error!(?err, "channel send failed"); |
| } |
| } |
| } |