1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! Provides an abstraction of several async runtimes
//!
//! This  allows OpenTelemetry to work with any current or future runtime. There are currently
//! builtin implementations for [Tokio] and [async-std].
//!
//! [Tokio]: https://crates.io/crates/tokio
//! [async-std]: https://crates.io/crates/async-std

use futures_util::{future::BoxFuture, stream::Stream};
use std::{future::Future, time::Duration};

/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
/// OpenTelemetry to work with any current and hopefully future runtime implementation.
///
/// [Tokio]: https://crates.io/crates/tokio
/// [async-std]: https://crates.io/crates/async-std
pub trait Runtime: Clone + Send + Sync + 'static {
    /// A future stream, which returns items in a previously specified interval. The item type is
    /// not important.
    type Interval: Stream + Send;

    /// A future, which resolves after a previously specified amount of time. The output type is
    /// not important.
    type Delay: Future + Send;

    /// Create a [Stream][futures_util::stream::Stream], which returns a new item every
    /// [Duration][std::time::Duration].
    fn interval(&self, duration: Duration) -> Self::Interval;

    /// Spawn a new task or thread, which executes the given future.
    ///
    /// # Note
    ///
    /// This is mainly used to run batch span processing in the background. Note, that the function
    /// does not return a handle. OpenTelemetry will use a different way to wait for the future to
    /// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
    /// current thread. This means runtime implementations need to make sure they can still execute
    /// the given future even if the main thread is blocked.
    fn spawn(&self, future: BoxFuture<'static, ()>);

    /// Return a new future, which resolves after the specified [Duration][std::time::Duration].
    fn delay(&self, duration: Duration) -> Self::Delay;
}

/// Runtime implementation, which works with Tokio's multi thread runtime.
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
#[derive(Debug, Clone)]
pub struct Tokio;

#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
impl Runtime for Tokio {
    type Interval = tokio_stream::wrappers::IntervalStream;
    type Delay = tokio::time::Sleep;

    fn interval(&self, duration: Duration) -> Self::Interval {
        crate::util::tokio_interval_stream(duration)
    }

    fn spawn(&self, future: BoxFuture<'static, ()>) {
        let _ = tokio::spawn(future);
    }

    fn delay(&self, duration: Duration) -> Self::Delay {
        tokio::time::sleep(duration)
    }
}

/// Runtime implementation, which works with Tokio's current thread runtime.
#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
#[derive(Debug, Clone)]
pub struct TokioCurrentThread;

#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
impl Runtime for TokioCurrentThread {
    type Interval = tokio_stream::wrappers::IntervalStream;
    type Delay = tokio::time::Sleep;

    fn interval(&self, duration: Duration) -> Self::Interval {
        crate::util::tokio_interval_stream(duration)
    }

    fn spawn(&self, future: BoxFuture<'static, ()>) {
        // We cannot force push tracing in current thread tokio scheduler because we rely on
        // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
        // shutdown function so that the runtime will not finish the blocked task and kill any
        // remaining tasks. But there is only one thread to run task, so it's a deadlock
        //
        // Thus, we spawn the background task in a separate thread.
        std::thread::spawn(move || {
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
            rt.block_on(future);
        });
    }

    fn delay(&self, duration: Duration) -> Self::Delay {
        tokio::time::sleep(duration)
    }
}

/// Runtime implementation, which works with async-std.
#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
#[derive(Debug, Clone)]
pub struct AsyncStd;

#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
impl Runtime for AsyncStd {
    type Interval = async_std::stream::Interval;
    type Delay = BoxFuture<'static, ()>;

    fn interval(&self, duration: Duration) -> Self::Interval {
        async_std::stream::interval(duration)
    }

    fn spawn(&self, future: BoxFuture<'static, ()>) {
        let _ = async_std::task::spawn(future);
    }

    fn delay(&self, duration: Duration) -> Self::Delay {
        Box::pin(async_std::task::sleep(duration))
    }
}