#[cfg(feature = "rt-async-std")]
use crate::runtime::AsyncStd;
use crate::runtime::Runtime;
#[cfg(feature = "rt-tokio")]
use crate::runtime::Tokio;
#[cfg(feature = "rt-tokio-current-thread")]
use crate::runtime::TokioCurrentThread;
use crate::sdk::trace::BatchMessage;
use crate::trace::TraceError;
use futures_util::stream::Stream;
use std::fmt::Debug;
#[cfg(any(
feature = "rt-tokio",
feature = "rt-tokio-current-thread",
feature = "rt-async-std"
))]
const CHANNEL_FULL_ERROR: &str =
"cannot send span to the batch span processor because the channel is full";
#[cfg(any(
feature = "rt-tokio",
feature = "rt-tokio-current-thread",
feature = "rt-async-std"
))]
const CHANNEL_CLOSED_ERROR: &str =
"cannot send span to the batch span processor because the channel is closed";
pub trait TraceRuntime: Runtime {
type Receiver: Stream<Item = BatchMessage> + Send;
type Sender: TrySend + Debug;
fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver);
}
pub trait TrySend: Sync + Send {
fn try_send(&self, item: BatchMessage) -> Result<(), TraceError>;
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
impl TrySend for tokio::sync::mpsc::Sender<BatchMessage> {
fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
self.try_send(item).map_err(|err| match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR),
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
TraceError::from(CHANNEL_CLOSED_ERROR)
}
})
}
}
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
impl TraceRuntime for Tokio {
type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
type Sender = tokio::sync::mpsc::Sender<BatchMessage>;
fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}
#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
impl TraceRuntime for TokioCurrentThread {
type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
type Sender = tokio::sync::mpsc::Sender<BatchMessage>;
fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}
#[cfg(feature = "rt-async-std")]
impl TrySend for async_std::channel::Sender<BatchMessage> {
fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
self.try_send(item).map_err(|err| match err {
async_std::channel::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR),
async_std::channel::TrySendError::Closed(_) => TraceError::from(CHANNEL_CLOSED_ERROR),
})
}
}
#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
impl TraceRuntime for AsyncStd {
type Receiver = async_std::channel::Receiver<BatchMessage>;
type Sender = async_std::channel::Sender<BatchMessage>;
fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
async_std::channel::bounded(capacity)
}
}