1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
use opentelemetry::{sdk::export::trace::SpanData, KeyValue, Value};
use serde::Serialize;
use serde_json::json;
use std::{
borrow::Cow,
collections::HashMap,
time::{Duration, SystemTime},
};
const ACCEPT_ATTRIBUTES: &[&str] = &["db.statement", "itx_id", "db.type"];
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub struct TraceSpan {
pub(super) trace_id: String,
pub(super) span_id: String,
pub(super) parent_span_id: String,
pub(super) name: String,
pub(super) start_time: HrTime,
pub(super) end_time: HrTime,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub(super) attributes: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub(super) events: Vec<Event>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub(super) links: Vec<Link>,
}
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Link {
trace_id: String,
span_id: String,
}
impl TraceSpan {
pub fn split_events(self) -> (Vec<Event>, TraceSpan) {
(self.events, Self { events: vec![], ..self })
}
}
impl From<SpanData> for TraceSpan {
fn from(span: SpanData) -> Self {
let attributes: HashMap<String, serde_json::Value> =
span.attributes
.iter()
.fold(HashMap::default(), |mut map, (key, value)| {
if ACCEPT_ATTRIBUTES.contains(&key.as_str()) {
map.insert(key.to_string(), to_json_value(value));
}
map
});
// TODO(fernandez@prisma.io) mongo query events and quaint query events
// have different attributes. both of them are queries, however the name
// of quaint queries is quaint::query and the name of mongodb queries is
// prisma::engine::db_query. Both of them are generated as spans but quaint
// contains the full query, while mongodb only contains the collection name
// and the operatiion. For this reason, we treat them differently when geneating
// query events in logging capturing and other places.
//
// What we are currently doing is to add a quaint attribute to quaint queries
// so we can transform span containing the query into a query event. For mongo
// this is not enough and we need to extract a particular event.
//
// If we unified these two ways of logging / tracing query information, we could get rid of
// a lot of spaghetti code.
let is_quaint_query = matches!(span.name, Cow::Borrowed("quaint:query"));
let name: Cow<'_, str> = if is_quaint_query {
"prisma:engine:db_query".into()
} else {
span.name.clone()
};
let hr_start_time = convert_to_high_res_time(span.start_time.duration_since(SystemTime::UNIX_EPOCH).unwrap());
let hr_end_time = convert_to_high_res_time(span.end_time.duration_since(SystemTime::UNIX_EPOCH).unwrap());
let links = span
.links
.iter()
.map(|link| {
let ctx = link.span_context();
Link {
trace_id: ctx.trace_id().to_string(),
span_id: ctx.span_id().to_string(),
}
})
.collect();
let span_id = span.span_context.span_id().to_string();
let events = span
.events
.into_iter()
.map(|e| Event::from(e).with_span_id(span_id.clone()))
.collect();
Self {
span_id,
trace_id: span.span_context.trace_id().to_string(),
parent_span_id: span.parent_span_id.to_string(),
name: name.into_owned(),
start_time: hr_start_time,
end_time: hr_end_time,
attributes,
links,
events,
}
}
}
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Event {
pub(super) span_id: Option<String>,
#[serde(skip_serializing_if = "String::is_empty")]
pub(super) name: String,
pub(super) level: String,
pub(super) timestamp: HrTime,
pub(super) attributes: HashMap<String, serde_json::Value>,
}
impl Event {
pub(super) fn with_span_id(mut self, spain_id: String) -> Self {
self.span_id = Some(spain_id);
self
}
}
impl From<opentelemetry::trace::Event> for Event {
fn from(event: opentelemetry::trace::Event) -> Self {
let name = event.name.to_string();
let timestamp = convert_to_high_res_time(event.timestamp.duration_since(SystemTime::UNIX_EPOCH).unwrap());
let mut attributes: HashMap<String, serde_json::Value> =
event
.attributes
.into_iter()
.fold(Default::default(), |mut map, KeyValue { key, value }| {
map.insert(key.to_string(), to_json_value(&value));
map
});
let level = attributes
.remove("level")
.unwrap_or_else(|| serde_json::Value::String("unknown".to_owned()))
.to_string()
.to_ascii_lowercase();
Self {
span_id: None, // already attached to the span
name,
level,
timestamp,
attributes,
}
}
}
/// logs are modeled as span events
pub type LogEvent = Event;
/// metrics are modeled as span events
pub type MetricEvent = Event;
pub type HrTime = [u64; 2];
/// Take from the otel library on what the format should be for High-Resolution time
/// Defines High-Resolution Time.
///
/// The first number, HrTime[0], is UNIX Epoch time in seconds since 00:00:00 UTC on 1 January 1970.
/// The second number, HrTime[1], represents the partial second elapsed since Unix Epoch time represented by first number in nanoseconds.
/// For example, 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds is represented as 1609504210150.
/// The first number is calculated by converting and truncating the Epoch time in milliseconds to seconds:
/// HrTime[0] = Math.trunc(1609504210150 / 1000) = 1609504210.
/// The second number is calculated by converting the digits after the decimal point of the subtraction, (1609504210150 / 1000) - HrTime[0], to nanoseconds:
/// HrTime[1] = Number((1609504210.150 - HrTime[0]).toFixed(9)) * 1e9 = 150000000.
/// This is represented in HrTime format as [1609504210, 150000000].
fn convert_to_high_res_time(time: Duration) -> HrTime {
let secs = time.as_secs();
let partial = time.subsec_nanos();
[secs, partial as u64]
}
/// Transforms an [`opentelemetry::Value`] to a [`serde_json::Value`]
/// This is because we want to flatten the JSON representation for a value, which by default will
/// be a nested structure informing of the type. For instance a float [`opentelemetry::Value`]
/// would be represented as json as `{"f64": 1.0}`. This function will flatten it to just `1.0`.
fn to_json_value(value: &Value) -> serde_json::Value {
match value {
Value::String(s) => json!(s),
Value::F64(f) => json!(f),
Value::Bool(b) => json!(b),
Value::I64(i) => json!(i),
Value::Array(ary) => match ary {
opentelemetry::Array::Bool(b_vec) => json!(b_vec),
opentelemetry::Array::I64(i_vec) => json!(i_vec),
opentelemetry::Array::F64(f_vec) => json!(f_vec),
opentelemetry::Array::String(s_vec) => json!(s_vec),
},
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_high_resolution_time_works() {
// 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds
let time_val = Duration::from_millis(1609504210150);
assert_eq!([1609504210, 150000000], convert_to_high_res_time(time_val));
}
}