blob: b294270131e5d1c9ccfed7b23e14dedd09e19ded [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use futures::{Stream, StreamExt, channel::mpsc};
use opentelemetry_sdk::runtime::{Runtime, RuntimeChannel, TrySend};
use send_wrapper::SendWrapper;
use std::{pin::Pin, time::Duration};
#[derive(Clone)]
pub struct CompioRuntime;
impl Runtime for CompioRuntime {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
// This is mainly used to run batch span processing in the background. Note, that the function
// does not return a handle. OpenTelemetry will use a different way to wait for the future to
// finish when the caller shuts down.
compio::runtime::spawn(future).detach();
}
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
SendWrapper::new(compio::time::sleep(duration))
}
}
#[derive(Debug)]
pub struct CompioSender<T> {
sender: mpsc::UnboundedSender<T>,
}
impl<T> CompioSender<T> {
pub fn new(sender: mpsc::UnboundedSender<T>) -> Self {
Self { sender }
}
}
// Safety: Since we use compio runtime which is single-threaded, or rather the Future: !Send + !Sync,
// we can implement those traits, to satisfy the trait bounds from `Runtime` and `RuntimeChannel` traits.
unsafe impl<T> Send for CompioSender<T> {}
unsafe impl<T> Sync for CompioSender<T> {}
impl<T: std::fmt::Debug + Send> TrySend for CompioSender<T> {
type Message = T;
fn try_send(
&self,
item: Self::Message,
) -> Result<(), opentelemetry_sdk::runtime::TrySendError> {
self.sender.unbounded_send(item).map_err(|_err| {
// Unbounded channels can only fail if disconnected, never full
opentelemetry_sdk::runtime::TrySendError::ChannelClosed
})
}
}
pub struct CompioReceiver<T> {
receiver: mpsc::UnboundedReceiver<T>,
}
impl<T> CompioReceiver<T> {
pub fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
Self { receiver }
}
}
impl<T: std::fmt::Debug + Send> Stream for CompioReceiver<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver.poll_next_unpin(cx)
}
}
impl RuntimeChannel for CompioRuntime {
type Receiver<T: std::fmt::Debug + Send> = CompioReceiver<T>;
type Sender<T: std::fmt::Debug + Send> = CompioSender<T>;
fn batch_message_channel<T: std::fmt::Debug + Send>(
&self,
_capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
// Use the unbounded channel, this trait is used for batch processing, which naturally will limit the number of messages.
let (sender, receiver) = mpsc::unbounded();
(CompioSender::new(sender), CompioReceiver::new(receiver))
}
}