1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#![allow(unused_imports)]

use psl::{builtin_connectors::*, Datasource, PreviewFeatures};
use quaint::connector::ExternalConnector;
use query_core::{executor::InterpretingExecutor, Connector, QueryExecutor};
use sql_query_connector::*;
use std::collections::HashMap;
use std::env;
use std::marker::PhantomData;
use std::sync::Arc;
use url::Url;

pub enum ConnectorKind<'a> {
    #[cfg(feature = "native")]
    Rust { url: String, datasource: &'a Datasource },
    Js {
        adapter: Arc<dyn ExternalConnector>,
        _phantom: PhantomData<&'a ()>, // required for WASM target, where JS is the only variant and lifetime gets unused
    },
}

/// Loads a query executor based on the parsed Prisma schema (datasource).
pub async fn load(
    connector_kind: ConnectorKind<'_>,
    features: PreviewFeatures,
) -> query_core::Result<Box<dyn QueryExecutor + Send + Sync + 'static>> {
    match connector_kind {
        ConnectorKind::Js { adapter, _phantom } => {
            #[cfg(not(feature = "driver-adapters"))]
            panic!("Driver adapters are not enabled, but connector mode is set to JS");

            #[cfg(feature = "driver-adapters")]
            driver_adapter(adapter, features).await
        }

        #[cfg(feature = "native")]
        ConnectorKind::Rust { url, datasource } => {
            if let Ok(value) = env::var("PRISMA_DISABLE_QUAINT_EXECUTORS") {
                let disable = value.to_uppercase();
                if disable == "TRUE" || disable == "1" {
                    panic!("Quaint executors are disabled, as per env var PRISMA_DISABLE_QUAINT_EXECUTORS.");
                }
            }

            match datasource.active_provider {
                p if SQLITE.is_provider(p) => native::sqlite(datasource, &url, features).await,
                p if MYSQL.is_provider(p) => native::mysql(datasource, &url, features).await,
                p if POSTGRES.is_provider(p) => native::postgres(datasource, &url, features).await,
                p if MSSQL.is_provider(p) => native::mssql(datasource, &url, features).await,
                p if COCKROACH.is_provider(p) => native::postgres(datasource, &url, features).await,

                #[cfg(feature = "mongodb")]
                p if MONGODB.is_provider(p) => native::mongodb(datasource, &url, features).await,

                x => Err(query_core::CoreError::ConfigurationError(format!(
                    "Unsupported connector type: {x}"
                ))),
            }
        }
    }
}

#[cfg(feature = "driver-adapters")]
async fn driver_adapter(
    driver_adapter: Arc<dyn ExternalConnector>,
    features: PreviewFeatures,
) -> Result<Box<dyn QueryExecutor + Send + Sync>, query_core::CoreError> {
    use quaint::connector::ExternalConnector;

    let js = Js::new(driver_adapter, features).await?;
    Ok(executor_for(js, false))
}

#[cfg(feature = "native")]
mod native {
    use super::*;
    use tracing::trace;

    pub(crate) async fn sqlite(
        source: &Datasource,
        url: &str,
        features: PreviewFeatures,
    ) -> query_core::Result<Box<dyn QueryExecutor + Send + Sync>> {
        trace!("Loading SQLite query connector...");
        let sqlite = Sqlite::from_source(source, url, features).await?;
        trace!("Loaded SQLite query connector.");
        Ok(executor_for(sqlite, false))
    }

    pub(crate) async fn postgres(
        source: &Datasource,
        url: &str,
        features: PreviewFeatures,
    ) -> query_core::Result<Box<dyn QueryExecutor + Send + Sync>> {
        trace!("Loading Postgres query connector...");
        let database_str = url;
        let psql = PostgreSql::from_source(source, url, features).await?;

        let url = Url::parse(database_str).map_err(|err| {
            query_core::CoreError::ConfigurationError(format!("Error parsing connection string: {err}"))
        })?;
        let params: HashMap<String, String> = url.query_pairs().into_owned().collect();

        let force_transactions = params
            .get("pgbouncer")
            .and_then(|flag| flag.parse().ok())
            .unwrap_or(false);
        trace!("Loaded Postgres query connector.");
        Ok(executor_for(psql, force_transactions))
    }

    pub(crate) async fn mysql(
        source: &Datasource,
        url: &str,
        features: PreviewFeatures,
    ) -> query_core::Result<Box<dyn QueryExecutor + Send + Sync>> {
        let mysql = Mysql::from_source(source, url, features).await?;
        trace!("Loaded MySQL query connector.");
        Ok(executor_for(mysql, false))
    }

    pub(crate) async fn mssql(
        source: &Datasource,
        url: &str,
        features: PreviewFeatures,
    ) -> query_core::Result<Box<dyn QueryExecutor + Send + Sync>> {
        trace!("Loading SQL Server query connector...");
        let mssql = Mssql::from_source(source, url, features).await?;
        trace!("Loaded SQL Server query connector.");
        Ok(executor_for(mssql, false))
    }

    #[cfg(feature = "mongodb")]
    pub(crate) async fn mongodb(
        source: &Datasource,
        url: &str,
        _features: PreviewFeatures,
    ) -> query_core::Result<Box<dyn QueryExecutor + Send + Sync>> {
        use mongodb_query_connector::MongoDb;

        trace!("Loading MongoDB query connector...");
        let mongo = MongoDb::new(source, url).await?;
        trace!("Loaded MongoDB query connector.");
        Ok(executor_for(mongo, false))
    }
}

fn executor_for<T>(connector: T, force_transactions: bool) -> Box<dyn QueryExecutor + Send + Sync>
where
    T: Connector + Send + Sync + 'static,
{
    Box::new(InterpretingExecutor::new(connector, force_transactions))
}