const METRIC_TARGET: &str = "qe_metrics";
const METRIC_COUNTER: &str = "counter";
const METRIC_GAUGE: &str = "gauge";
const METRIC_HISTOGRAM: &str = "histogram";
const METRIC_DESCRIPTION: &str = "description";
mod common;
mod formatters;
mod recorder;
mod registry;
use once_cell::sync::Lazy;
use recorder::*;
pub use registry::MetricRegistry;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Once;
pub extern crate metrics;
pub use metrics::{
absolute_counter, decrement_gauge, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
increment_counter, increment_gauge,
};
pub const PRISMA_CLIENT_QUERIES_TOTAL: &str = "prisma_client_queries_total"; pub const PRISMA_DATASOURCE_QUERIES_TOTAL: &str = "prisma_datasource_queries_total"; pub const PRISMA_CLIENT_QUERIES_ACTIVE: &str = "prisma_client_queries_active"; pub const PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS: &str = "prisma_client_queries_duration_histogram_ms"; pub const PRISMA_DATASOURCE_QUERIES_DURATION_HISTOGRAM_MS: &str = "prisma_datasource_queries_duration_histogram_ms"; const MOBC_POOL_CONNECTIONS_OPENED_TOTAL: &str = "mobc_pool_connections_opened_total"; const MOBC_POOL_CONNECTIONS_CLOSED_TOTAL: &str = "mobc_pool_connections_closed_total"; const MOBC_POOL_CONNECTIONS_OPEN: &str = "mobc_pool_connections_open"; const MOBC_POOL_CONNECTIONS_BUSY: &str = "mobc_pool_connections_busy"; const MOBC_POOL_CONNECTIONS_IDLE: &str = "mobc_pool_connections_idle"; const MOBC_POOL_WAIT_COUNT: &str = "mobc_client_queries_wait"; const MOBC_POOL_WAIT_DURATION: &str = "mobc_client_queries_wait_histogram_ms"; pub const ACCEPT_LIST: &[&str] = &[
PRISMA_CLIENT_QUERIES_TOTAL,
PRISMA_DATASOURCE_QUERIES_TOTAL,
PRISMA_CLIENT_QUERIES_ACTIVE,
PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS,
PRISMA_DATASOURCE_QUERIES_DURATION_HISTOGRAM_MS,
MOBC_POOL_CONNECTIONS_OPENED_TOTAL,
MOBC_POOL_CONNECTIONS_CLOSED_TOTAL,
MOBC_POOL_CONNECTIONS_OPEN,
MOBC_POOL_CONNECTIONS_BUSY,
MOBC_POOL_CONNECTIONS_IDLE,
MOBC_POOL_WAIT_COUNT,
MOBC_POOL_WAIT_DURATION,
];
#[rustfmt::skip]
static METRIC_RENAMES: Lazy<HashMap<&'static str, (&'static str, &'static str)>> = Lazy::new(|| {
HashMap::from([
(MOBC_POOL_CONNECTIONS_OPENED_TOTAL, ("prisma_pool_connections_opened_total", "The total number of pool connections opened")),
(MOBC_POOL_CONNECTIONS_CLOSED_TOTAL, ("prisma_pool_connections_closed_total", "The total number of pool connections closed")),
(MOBC_POOL_CONNECTIONS_OPEN, ("prisma_pool_connections_open", "The number of pool connections currently open")),
(MOBC_POOL_CONNECTIONS_BUSY, ("prisma_pool_connections_busy", "The number of pool connections currently executing datasource queries")),
(MOBC_POOL_CONNECTIONS_IDLE, ("prisma_pool_connections_idle", "The number of pool connections that are not busy running a query")),
(MOBC_POOL_WAIT_COUNT, ("prisma_client_queries_wait", "The number of datasource queries currently waiting for a free connection")),
(MOBC_POOL_WAIT_DURATION, ("prisma_client_queries_wait_histogram_ms", "The distribution of the time all datasource queries spent waiting for a free connection")),
])
});
pub fn setup() {
set_recorder();
initialize_metrics();
}
static METRIC_RECORDER: Once = Once::new();
fn set_recorder() {
METRIC_RECORDER.call_once(|| {
metrics::set_boxed_recorder(Box::new(MetricRecorder)).unwrap();
});
}
pub fn initialize_metrics() {
initialize_metrics_descriptions();
initialize_metrics_values();
}
fn initialize_metrics_descriptions() {
describe_counter!(
PRISMA_CLIENT_QUERIES_TOTAL,
"The total number of Prisma Client queries executed"
);
describe_counter!(
PRISMA_DATASOURCE_QUERIES_TOTAL,
"The total number of datasource queries executed"
);
describe_gauge!(
PRISMA_CLIENT_QUERIES_ACTIVE,
"The number of currently active Prisma Client queries"
);
describe_histogram!(
PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS,
"The distribution of the time Prisma Client queries took to run end to end"
);
describe_histogram!(
PRISMA_DATASOURCE_QUERIES_DURATION_HISTOGRAM_MS,
"The distribution of the time datasource queries took to run"
);
}
fn initialize_metrics_values() {
absolute_counter!(PRISMA_CLIENT_QUERIES_TOTAL, 0);
absolute_counter!(PRISMA_DATASOURCE_QUERIES_TOTAL, 0);
gauge!(PRISMA_CLIENT_QUERIES_ACTIVE, 0.0);
absolute_counter!(MOBC_POOL_CONNECTIONS_OPENED_TOTAL, 0);
absolute_counter!(MOBC_POOL_CONNECTIONS_CLOSED_TOTAL, 0);
gauge!(MOBC_POOL_CONNECTIONS_OPEN, 0.0);
gauge!(MOBC_POOL_CONNECTIONS_BUSY, 0.0);
gauge!(MOBC_POOL_CONNECTIONS_IDLE, 0.0);
gauge!(MOBC_POOL_WAIT_COUNT, 0.0);
}
pub(crate) const HISTOGRAM_BOUNDS: [f64; 10] = [0.0, 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 50000.0];
#[derive(PartialEq, Eq, Debug, Deserialize)]
pub enum MetricFormat {
#[serde(alias = "json")]
Json,
#[serde(alias = "prometheus")]
Prometheus,
}
#[cfg(test)]
mod tests {
use super::*;
use metrics::{
absolute_counter, decrement_gauge, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
increment_counter, increment_gauge, register_counter, register_gauge, register_histogram,
};
use serde_json::json;
use std::collections::HashMap;
use std::time::Duration;
use tracing::instrument::WithSubscriber;
use tracing::{trace, Dispatch};
use tracing_subscriber::layer::SubscriberExt;
use once_cell::sync::Lazy;
use tokio::runtime::Runtime;
static RT: Lazy<Runtime> = Lazy::new(|| {
set_recorder();
Runtime::new().unwrap()
});
const TESTING_ACCEPT_LIST: &[&str] = &[
"test_counter",
"another_counter",
"test_gauge",
"another_gauge",
"test_histogram",
"counter_1",
"counter_2",
"gauge_1",
"gauge_2",
"histogram_1",
"histogram_2",
"test.counter",
];
#[test]
fn test_counters() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
let counter1 = register_counter!("test_counter");
counter1.increment(1);
increment_counter!("test_counter");
increment_counter!("test_counter");
increment_counter!("another_counter");
let val = metrics.counter_value("test_counter").unwrap();
assert_eq!(val, 3);
let val2 = metrics.counter_value("another_counter").unwrap();
assert_eq!(val2, 1);
absolute_counter!("test_counter", 5);
let val3 = metrics.counter_value("test_counter").unwrap();
assert_eq!(val3, 5);
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_gauges() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
let gauge1 = register_gauge!("test_gauge");
gauge1.increment(1.0);
increment_gauge!("test_gauge", 1.0);
increment_gauge!("test_gauge", 1.0);
increment_gauge!("another_gauge", 1.0);
let val = metrics.gauge_value("test_gauge").unwrap();
assert_eq!(val, 3.0);
let val2 = metrics.gauge_value("another_gauge").unwrap();
assert_eq!(val2, 1.0);
assert_eq!(None, metrics.counter_value("test_gauge"));
gauge!("test_gauge", 5.0);
let val3 = metrics.gauge_value("test_gauge").unwrap();
assert_eq!(val3, 5.0);
decrement_gauge!("test_gauge", 2.0);
let val4 = metrics.gauge_value("test_gauge").unwrap();
assert_eq!(val4, 3.0);
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_no_panic_and_ignore_other_traces() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
trace!("a fake trace");
increment_gauge!("test_gauge", 1.0);
increment_counter!("test_counter");
trace!("another fake trace");
assert_eq!(1.0, metrics.gauge_value("test_gauge").unwrap());
assert_eq!(1, metrics.counter_value("test_counter").unwrap());
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_ignore_non_accepted_metrics() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
increment_gauge!("not_accepted", 1.0);
increment_gauge!("test_gauge", 1.0);
assert_eq!(1.0, metrics.gauge_value("test_gauge").unwrap());
assert_eq!(None, metrics.gauge_value("not_accepted"));
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_histograms() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
let hist = register_histogram!("test_histogram");
hist.record(Duration::from_millis(9));
histogram!("test_histogram", Duration::from_millis(100));
histogram!("test_histogram", Duration::from_millis(1));
histogram!("test_histogram", Duration::from_millis(1999));
histogram!("test_histogram", Duration::from_millis(3999));
histogram!("test_histogram", Duration::from_millis(610));
let hist = metrics.histogram_values("test_histogram").unwrap();
let expected: Vec<(f64, u64)> = Vec::from([
(0.0, 0),
(1.0, 1),
(5.0, 1),
(10.0, 2),
(50.0, 2),
(100.0, 3),
(500.0, 3),
(1000.0, 4),
(5000.0, 6),
(50000.0, 6),
]);
assert_eq!(hist.buckets(), expected);
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_set_and_read_descriptions() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
describe_counter!("test_counter", "This is a counter");
let descriptions = metrics.get_descriptions();
let description = descriptions.get("test_counter").unwrap();
assert_eq!("This is a counter", description);
describe_gauge!("test_gauge", "This is a gauge");
let descriptions = metrics.get_descriptions();
let description = descriptions.get("test_gauge").unwrap();
assert_eq!("This is a gauge", description);
describe_histogram!("test_histogram", "This is a hist");
let descriptions = metrics.get_descriptions();
let description = descriptions.get("test_histogram").unwrap();
assert_eq!("This is a hist", description);
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_to_json() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
let empty = json!({
"counters": [],
"gauges": [],
"histograms": []
});
assert_eq!(metrics.to_json(Default::default()), empty);
absolute_counter!("counter_1", 4, "label" => "one");
describe_counter!("counter_2", "this is a description for counter 2");
absolute_counter!("counter_2", 2, "label" => "one", "another_label" => "two");
describe_gauge!("gauge_1", "a description for gauge 1");
gauge!("gauge_1", 7.0);
gauge!("gauge_2", 3.0, "label" => "three");
describe_histogram!("histogram_1", "a description for histogram");
let hist = register_histogram!("histogram_1", "label" => "one", "hist_two" => "two");
hist.record(Duration::from_millis(9));
histogram!("histogram_2", Duration::from_millis(9));
histogram!("histogram_2", Duration::from_millis(1000));
histogram!("histogram_2", Duration::from_millis(40));
let json = metrics.to_json(Default::default());
let expected = json!({
"counters":[{
"key":"counter_1",
"labels":{"label":"one"},
"value":4,
"description":""
},{
"key":"counter_2",
"labels":{"label":"one","another_label":"two"},
"value":2,
"description":"this is a description for counter 2"
}],
"gauges":[{
"key":"gauge_1",
"labels":{},
"value":7.0,
"description":"a description for gauge 1"
},{
"key":"gauge_2",
"labels":{"label":"three"},
"value":3.0,
"description":""
}],
"histograms":[{
"key":"histogram_1",
"labels":{"label":"one","hist_two":"two"},
"value":{
"buckets": [[0.0,0],[1.0,0],[5.0,0],[10.0,1],[50.0,0],[100.0,0],[500.0,0],[1000.0,0],[5000.0,0],[50000.0,0]],
"sum":9.0,
"count":1
},
"description":"a description for histogram"},{
"key":"histogram_2",
"labels":{},
"value":{
"buckets":[[0.0,0],[1.0,0],[5.0,0],[10.0,1],[50.0,1],[100.0,0],[500.0,0],[1000.0,1],[5000.0,0],[50000.0,0]],
"sum":1049.0,
"count":3
},
"description":""
}]
});
assert_eq!(json, expected);
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_global_and_metric_labels() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
let hist = register_histogram!("test_histogram", "label" => "one", "two" => "another");
hist.record(Duration::from_millis(9));
absolute_counter!("counter_1", 1);
let mut global_labels: HashMap<String, String> = HashMap::new();
global_labels.insert("global_one".to_string(), "one".to_string());
global_labels.insert("global_two".to_string(), "two".to_string());
let json = metrics.to_json(global_labels);
let expected = json!({
"counters":[{
"key":"counter_1",
"labels":{"global_one":"one","global_two":"two"},
"value":1,
"description":""
}],
"gauges":[],
"histograms":[{
"key":"test_histogram",
"labels":{"label":"one","two":"another","global_one":"one","global_two":"two"},
"value":{
"buckets": [[0.0,0],[1.0,0],[5.0,0],[10.0,1],[50.0,0],[100.0,0],[500.0,0],[1000.0,0],[5000.0,0],[50000.0,0]],
"sum": 9.0,
"count": 1
},
"description":""
}]
});
assert_eq!(expected, json);
}
.with_subscriber(dispatch)
.await;
});
}
#[test]
fn test_prometheus_format() {
RT.block_on(async {
let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec());
let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone()));
async {
absolute_counter!("counter_1", 4, "label" => "one");
describe_counter!("counter_2", "this is a description for counter 2");
absolute_counter!("counter_2", 2, "label" => "one", "another_label" => "two");
describe_gauge!("gauge_1", "a description for gauge 1");
gauge!("gauge_1", 7.0);
gauge!("gauge_2", 3.0, "label" => "three");
describe_histogram!("histogram_1", "a description for histogram");
let hist = register_histogram!("histogram_1", "label" => "one", "hist_two" => "two");
hist.record(Duration::from_millis(9));
histogram!("histogram_2", Duration::from_millis(1000));
let mut global_labels: HashMap<String, String> = HashMap::new();
global_labels.insert("global_two".to_string(), "two".to_string());
global_labels.insert("global_one".to_string(), "one".to_string());
let prometheus = metrics.to_prometheus(global_labels);
let snapshot = expect_test::expect![[r#"
# HELP counter_1
# TYPE counter_1 counter
counter_1{global_one="one",global_two="two",label="one"} 4
# HELP counter_2 this is a description for counter 2
# TYPE counter_2 counter
counter_2{another_label="two",global_one="one",global_two="two",label="one"} 2
# HELP gauge_1 a description for gauge 1
# TYPE gauge_1 gauge
gauge_1{global_one="one",global_two="two"} 7
# HELP gauge_2
# TYPE gauge_2 gauge
gauge_2{global_one="one",global_two="two",label="three"} 3
# HELP histogram_1 a description for histogram
# TYPE histogram_1 histogram
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="0"} 0
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="1"} 0
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="5"} 0
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="10"} 1
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="50"} 1
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="100"} 1
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="500"} 1
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="1000"} 1
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="5000"} 1
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="50000"} 1
histogram_1_bucket{global_one="one",global_two="two",hist_two="two",label="one",le="+Inf"} 1
histogram_1_sum{global_one="one",global_two="two",hist_two="two",label="one"} 9
histogram_1_count{global_one="one",global_two="two",hist_two="two",label="one"} 1
# HELP histogram_2
# TYPE histogram_2 histogram
histogram_2_bucket{global_one="one",global_two="two",le="0"} 0
histogram_2_bucket{global_one="one",global_two="two",le="1"} 0
histogram_2_bucket{global_one="one",global_two="two",le="5"} 0
histogram_2_bucket{global_one="one",global_two="two",le="10"} 0
histogram_2_bucket{global_one="one",global_two="two",le="50"} 0
histogram_2_bucket{global_one="one",global_two="two",le="100"} 0
histogram_2_bucket{global_one="one",global_two="two",le="500"} 0
histogram_2_bucket{global_one="one",global_two="two",le="1000"} 1
histogram_2_bucket{global_one="one",global_two="two",le="5000"} 1
histogram_2_bucket{global_one="one",global_two="two",le="50000"} 1
histogram_2_bucket{global_one="one",global_two="two",le="+Inf"} 1
histogram_2_sum{global_one="one",global_two="two"} 1000
histogram_2_count{global_one="one",global_two="two"} 1
"#]];
snapshot.assert_eq(&prometheus);
}
.with_subscriber(dispatch)
.await;
});
}
}