1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//! Telemetry Capturing is the process of recording the logs and traces happening during a request
//! to the binary engine, and rendering them in the response.
//!
//! The interaction diagram below (soorry width!) shows the different roles at play during telemetry
//! capturing. A textual explanatation follows it. For the sake of example a server environment
//! --the query-engine crate-- is assumed.
//! #                                                                                       ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                                                                   
//! #                                                                                                                                                                                                   
//! #                                                                                       │              <<concurrent>>           │                                                                   
//! #                                                                                                                                                                                                   
//! #                                                           ╔═══════════════════════╗   │╔═══════════════╗                      │                                                                   
//! #                                                           ║<<SpanProcessor, Sync>>║    ║ <<ch sender>> ║    ╔════════════════╗  ╔═══════════════════╗                                             
//! #      ┌───────────────────┐                                ║       PROCESSOR       ║   │║    Sender     ║    ║    Storage     ║│ ║      TRACER       ║                                             
//! #      │      Server       │                                ╚═══════════╦═══════════╝    ╚══════╦════════╝    ╚═══════╦════════╝  ╚═════════╦═════════╝                                             
//! #      └─────────┬─────────┘                                            │               │       │                     │         │           │                                                       
//! #                │                                                      │                       │                     │                     │                                                       
//! #                │                                                      │               │       │                     │         │           │                                                       
//! #      POST      │                                                      │                       │                     │                     │                                                       
//! # (body, headers)│                                                      │               │       │                     │         │           │                                                       
//! #    ──────────▶┌┴┐                                                     │                       │                     │                     │                                                       
//! #        ┌─┐    │ │new(headers)╔════════════╗                           │               │       │                     │         │           │                                                       
//! #        │1│    │ ├───────────▶║s: Settings ║                           │                       │                     │                     │                                                       
//! #        └─┘    │ │            ╚════════════╝                           │               │       │                     │         │           │                                                       
//! #               │ │                                                     │                       │                     │                     │                                                       
//! #               │ │                    ╔═══════════════════╗            │               │       │                     │         │           │                                                       
//! #               │ │                    ║ Capturer::Enabled ║            │                       │                     │                     │                     ┌────────────┐                    
//! #               │ │                    ╚═══════════════════╝            │               │       │                     │         │           │                     │<<Somewhere>│                    
//! #               │ │                              │                      │                       │                     │                     │                     └──────┬─────┘                    
//! #               │ │   ┌─┐  new(trace_id, s)      │                      │               │       │                     │         │           │                            │                          
//! #               │ ├───┤2├───────────────────────▶│                      │                       │                     │                     │                            │                          
//! #               │ │   └─┘                        │                      │               │       │                     │         │           │                            │                          
//! #               │ │                              │                      │                       │                     │                     │                            │                          
//! #               │ │   ┌─┐ start_capturing()      │   start_capturing    │               │       │                     │         │           │                            │                          
//! #               │ ├───┤3├───────────────────────▶│    (trace_id, s)     │                       │                     │                     │                            │                          
//! #               │ │   └─┘                        │                      │               │       │                     │         │           │                            │                          
//! #               │ │                              ├─────────────────────▶│ send(StartCapturing,  │                     │                     │                            │                          
//! #               │ │                              │                      │      trace_id)│       │                     │         │           │                            │                          
//! #               │ │                              │                      │── ── ── ── ── ── ── ─▶│                     │                     │                            │                          
//! #               │ │                              │                      │        ┌─┐    │       │insert(trace_id, s)  │         │           │                            │                          
//! #               │ │                              │                      │        │4│            │────────────────────▶│                     │                            │                          
//! #               │ │                              │                      │        └─┘    │       │                     │         │  ┌─┐      │          process_query     │                          
//! #               │ │──────────────────────────────┼──────────────────────┼───────────────────────┼─────────────────────┼────────────┤5├──────┼──────────────────────────▶┌┴┐                         
//! #               │ │                              │                      │               │       │                     │         │  └─┘      │                           │ │                         
//! #               │ │                              │                      │                       │                     │                     │                           │ │                         
//! #               │ │                              │                      │               │       │                     │         │           │                           │ │  ┌─────────────────────┐
//! #               │ │                              │                      │                       │                     │                     │     log! / span!     ┌─┐  │ │  │ res: PrismaResponse │
//! #               │ │                              │                      │               │       │                     │         │           │◀─────────────────────┤6├──│ │  └──────────┬──────────┘
//! #               │ │                              │                      │                       │    on_end(span_data)│            ┌─┐      │                      └─┘  │ │   new       │           
//! #               │ │                              │                      │◀──────────────┼───────┼─────────────────────┼─────────┼──┤7├──────┤                           │ │────────────▶│           
//! #               │ │                              │                      │ send(SpanDataProcessed│                     │            └─┘      │                           │ │             │           
//! #               │ │                              │                      │      , trace_id)      │   append(trace_id,  │         │           │                           │ │             │           
//! #               │ │                              │                      │── ── ── ── ── ── ── ─▶│     logs, traces)   │                     │                           │ │             │           
//! #               │ │                              │                      │        ┌─┐    │       ├────────────────────▶│         │           │                           │ │             │           
//! #               │ │                              │                      │        │8│            │                     │                     │                           │ │             │           
//! #               │ │      res: PrismaResponse     │ ┌─┐                  │        └─┘    │       │                     │         │           │                           │ │             │           
//! #               │ │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼ ┤9├ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─return ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─└┬┘             │           
//! #               │ │ ┌────┐ fetch_captures()      │ └─┘                  │               │       │                     │         │           │                            │              │           
//! #               │ ├─┤ 10 ├──────────────────────▶│      fetch_captures  │                       │                     │                     │                            │              │           
//! #               │ │ └────┘                       │        (trace_id)    │               │       │                     │         │           │                            │              │           
//! #               │ │                              ├─────────────────────▶│  send(FetchCaptures,  │                     │                     │                            x              │           
//! #               │ │                              │                      │       trace_id)       │                     │         │           │                                           │           
//! #               │ │                              │                      │── ── ── ── ── ── ── ─▶│   get logs/traces   │                     │                                           │           
//! #               │ │                              │                      │      ┌────┐   │       ├─────────────────────▶         │           │                                           │           
//! #               │ │                              │                      │      │ 11 │           │                     │                     │                                           │           
//! #               │ │                              │                      │      └────┘   │       │◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│         │           │                                           │           
//! #               │ │                              │                      │                       │                     │                     │                                           │           
//! #               │ │                              ◁ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│               │       │                     │         │           │                                           │           
//! #               │ │          logs, traces        │                      │                       │                     │                     │                                           │           
//! #               │ │◁─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│                      │               │       │                     │         │           │                                           │           
//! #               │ │                              x        ┌────┐        │                       │                     │                     │                res.set_extension(logs)    │           
//! #               │ ├───────────────────────────────────────┤ 12 ├────────┼───────────────┼───────┼─────────────────────┼─────────┼───────────┼──────────────────────────────────────────▶│           
//! #               │ │                                       └────┘        │                       │                     │                     │                res.set_extension(traces)  │           
//! #               │ ├─────────────────────────────────────────────────────┼───────────────┼───────┼─────────────────────┼─────────┼───────────┼──────────────────────────────────────────▶│           
//! #        ◀ ─ ─ ─└┬┘                                                     │                       │                     │                     │                                           x           
//! #     json!(res) │                                                                      │                                       │                                                                   
//! #        ┌────┐  │                                                                       ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                                                    
//! #        │ 13 │  │                                                                                                                                                                                  
//! #        └────┘                                                                                                                                                                                     
//! #                                                                                                                                                                                                   
//! #                                                                          ◀─────── call (pseudo-signatures)                                                                                        
//! #                                                                                                                                                                                                   
//! #                                                                          ◀─ ── ── async message passing (channels)                                                                                
//! #                                                                                                                                                                                                   
//! #                                                                          ◁─ ─ ─ ─ return                                                                                                          
//! #                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
//!  
//!  In the diagram, you will see objects whose lifetime is static. The boxes for those have a double
//!  width margin. These are:
//!  
//!    - The `server` itself
//!    - The  global `TRACER`, which handles `log!` and `span!` and uses the global `PROCESSOR` to
//!     process the data constituting a trace `Span`s and log `Event`s
//!    - The global `PROCESSOR`, which manages the `Storage` set of data structures, holding logs,
//!     traces (and capture settings) per request.
//!  
//!  Then, through the request lifecycle, different objects are created and dropped:
//!  
//!    - When a request comes in, its headers are processed and a [`Settings`] object is built, this
//!     object determines, for the request, how logging and tracing are going to be captured: if only
//!     traces, logs, or both, and which log levels are going to be captured.
//!    - Based on the settings, a new `Capturer` is created; a capturer is nothing but an exporter
//!     wrapped to start capturing / fetch the captures for this particular request.
//!    - An asynchronous task is spawned to own the storage of telemetry data without needing to share
//!     memory accross threads. Communication with this task is done through channels. The `Sender`
//!     part of the channel is kept in a global, so it can be cloned and used by a) the Capturer
//!     (to start capturing / fetch the captures) or by the tracer's SpanProcessor, to extract
//!     tracing and logging information that's eventually displayed to the user.
//!  
//!  Then the capturing process works in this way:
//!    
//!    - The server receives a query **[1]**
//!    - It grabs the HTTP headers and builds a `Capture` object **[2]**, which is configured with the settings
//!      denoted by the `X-capture-telemetry`
//!    - Now the server tells the `Capturer` to start capturing all the logs and traces occurring on
//!     the request **[3]** (denoted by a `trace_id`) The `trace_id` is either carried on the `traceparent`
//!     header or implicitly created on the first span of the request.
//!    - The `Capturer` sends a message to the task owning the storage to start capturing **[4]**.
//!      The tasks creates a new entry in the storage for the given trace_id. Spans without a
//!     corresponding trace_id in the storage are ignored.
//!    - The server dispatches the request and _Somewhere_ else in the code, it is processed **[5]**.
//!    - There the code logs events and emits traces asynchronously, as part of the processing **[6]**
//!    - Traces and Logs arrive at the `TRACER`, and get hydrated as SpanData in the `PROCESSOR`
//!     **[7]**.
//!    - This SpanData is sent through a channel to the task running in parallel, **[8]**.
//!     The task transforms the SpanData into `TraceSpans` and `LogEvents` depending on the capture
//!     settings and stores those spans and events in the storage.
//!    - When the code that dispatches the request is done it returns a `PrismaResponse` to the
//!     server **[9]**.
//!    - Then the server asks the `PROCESSOR` to fetch the captures **[10]**
//!    - Like before, the `PROCESSOR` sends a message to the task running in parallel,
//!     to fetch the captures from the `Storage` **[11]**. At that time, although
//!     that's not represented in the diagram, the captures are deleted from the storage, thus
//!     freeing any memory used for capturing during the request
//!    - Finally, the server sets the `logs` and `traces` extensions in the `PrismaResponse`**[12]**,
//!     it serializes the extended response in json format and returns it as an HTTP Response
//!     blob **[13]**.
//!
#![allow(unused_imports, dead_code)]
pub use self::capturer::Capturer;
pub use self::settings::Settings;
pub use tx_ext::TxTraceExt;

use self::capturer::Processor;
use once_cell::sync::Lazy;
use opentelemetry::{global, sdk, trace};
use tracing::subscriber;
use tracing_subscriber::{
    filter::filter_fn, layer::Layered, prelude::__tracing_subscriber_SubscriberExt, Layer, Registry,
};

static PROCESSOR: Lazy<capturer::Processor> = Lazy::new(Processor::default);

/// Creates a new capturer, which is configured to export traces and log events happening during a
/// particular request
pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer {
    Capturer::new(PROCESSOR.to_owned(), trace_id, settings)
}

/// Adds a capturing layer to the given subscriber and installs the transformed subscriber as the
/// global, default subscriber
#[cfg(feature = "metrics")]
#[allow(clippy::type_complexity)]
pub fn install_capturing_layer(
    subscriber: Layered<
        Option<query_engine_metrics::MetricRegistry>,
        Layered<Box<dyn Layer<Registry> + Send + Sync>, Registry>,
    >,
    log_queries: bool,
) {
    // set a trace context propagator, so that the trace context is propagated via the
    // `traceparent` header from other systems
    global::set_text_map_propagator(sdk::propagation::TraceContextPropagator::new());
    // create a tracer provider that is configured to use our custom processor to process spans
    let provider = sdk::trace::TracerProvider::builder()
        .with_span_processor(PROCESSOR.to_owned())
        .build();
    // create a tracer out of the provider
    let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "opentelemetry");
    // set the provider as the global provider
    global::set_tracer_provider(provider);
    // create a layer that will filter initial events and spans based on the log level configuration
    // from the environment and a specific filter to discard things that we are not interested in
    // from a capturiong perspective
    let telemetry_layer = tracing_opentelemetry::layer()
        .with_tracer(tracer)
        .with_filter(crate::helpers::env_filter(
            log_queries,
            crate::helpers::QueryEngineLogLevel::FromEnv,
        ))
        .with_filter(filter_fn(helpers::span_and_event_filter));
    // decorate the given subscriber (more layers were added before this one) with the telemetry layer
    let subscriber = subscriber.with(telemetry_layer);
    // and finally set the subscriber as the global, default subscriber
    subscriber::set_global_default(subscriber).unwrap();
}

mod capturer;
mod helpers;
mod settings;
pub mod storage;
mod tx_ext;