1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
//! SQL flavours implement behaviour specific to a given SQL implementation (PostgreSQL, SQLite...),
//! in order to avoid cluttering the connector with conditionals. This is a private implementation
//! detail of the SQL connector.

mod mssql;
mod mysql;
mod postgres;
mod sqlite;

pub(crate) use mssql::MssqlFlavour;
pub(crate) use mysql::MysqlFlavour;
pub(crate) use postgres::PostgresFlavour;
pub(crate) use sqlite::SqliteFlavour;

use crate::{
    sql_destructive_change_checker::DestructiveChangeCheckerFlavour, sql_renderer::SqlRenderer,
    sql_schema_calculator::SqlSchemaCalculatorFlavour, sql_schema_differ::SqlSchemaDifferFlavour,
};
use enumflags2::BitFlags;
use psl::{PreviewFeature, ValidatedSchema};
use quaint::prelude::{ConnectionInfo, Table};
use schema_connector::{
    migrations_directory::MigrationDirectory, BoxFuture, ConnectorError, ConnectorParams, ConnectorResult,
    IntrospectionContext, MigrationRecord, Namespaces, PersistenceNotInitializedError,
};
use sql_schema_describer::SqlSchema;
use std::fmt::Debug;

/// P is the params, C is a connection.
pub(crate) enum State<P, C> {
    Initial,
    WithParams(P),
    Connected(P, C),
}

impl<P, C> Debug for State<P, C> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            State::Initial => f.write_str("State::Initial"),
            State::WithParams(_) => f.write_str("State::Params(<CONFIDENTIAL>)"),
            State::Connected(_, _) => f.write_str("State::Connected(<CONFIDENTIAL>)"),
        }
    }
}

impl<P, C> State<P, C>
where
    P: 'static,
    C: 'static,
{
    fn params(&self) -> Option<&P> {
        match self {
            State::Initial => None,
            State::WithParams(p) | State::Connected(p, _) => Some(p),
        }
    }

    /// Unwrap the state's params. We do not return an error because we want to trigger a panic if
    /// that happens, because it means an internal logic error.
    ///
    /// This is useful when you want the params, but you do not care if a connection has been
    /// started or not.
    #[track_caller]
    fn get_unwrapped_params(&self) -> &P {
        match self {
            State::Initial => panic!("Internal logic error: get_unwrapped_params() on State::Initial"),
            State::WithParams(p) => p,
            State::Connected(p, _) => p,
        }
    }

    #[track_caller]
    fn set_params(&mut self, params: P) {
        match self {
            State::WithParams(_) | State::Connected(_, _) => panic!("state error"),
            State::Initial => *self = State::WithParams(params),
        }
    }

    /// Convenience wrapper to transition from WithParams to Connected.
    async fn try_connect(
        &mut self,
        f: impl for<'b> FnOnce(&'b P) -> BoxFuture<'b, ConnectorResult<C>>,
    ) -> ConnectorResult<()> {
        match std::mem::replace(self, State::Initial) {
            State::Initial => panic!("Attempted to connect from State::Initial"),
            State::Connected(_, _) => panic!("Attempted to connect from State::Connected"),
            State::WithParams(p) => match f(&p).await {
                Ok(c) => {
                    *self = State::Connected(p, c);
                    Ok(())
                }
                Err(err) => {
                    *self = State::WithParams(p);
                    Err(err)
                }
            },
        }
    }
}

pub(crate) trait SqlFlavour:
    DestructiveChangeCheckerFlavour
    + SqlRenderer
    + SqlSchemaDifferFlavour
    + SqlSchemaCalculatorFlavour
    + Send
    + Sync
    + Debug
{
    fn acquire_lock(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;

    fn apply_migration_script<'a>(
        &'a mut self,
        migration_name: &'a str,
        script: &'a str,
    ) -> BoxFuture<'a, ConnectorResult<()>>;

    fn check_database_version_compatibility(
        &self,
        _datamodel: &ValidatedSchema,
    ) -> Option<user_facing_errors::common::DatabaseVersionIncompatibility> {
        None
    }

    /// Check a schema for preview features not implemented in migrate/introspection.
    fn check_schema_features(&self, _schema: &psl::ValidatedSchema) -> ConnectorResult<()> {
        Ok(())
    }

    /// The connection string received in set_params().
    fn connection_string(&self) -> Option<&str>;

    /// See MigrationConnector::connector_type()
    fn connector_type(&self) -> &'static str;

    /// Create a database for the given URL on the server, if applicable.
    fn create_database(&mut self) -> BoxFuture<'_, ConnectorResult<String>>;

    /// Initialize the `_prisma_migrations` table.
    fn create_migrations_table(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;

    /// The datamodel connector corresponding to the flavour
    fn datamodel_connector(&self) -> &'static dyn psl::datamodel_connector::Connector;

    fn describe_schema(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<SqlSchema>>;

    /// Drop the database.
    fn drop_database(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;

    /// Drop the migrations table
    fn drop_migrations_table(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;

    /// List all visible tables in the given namespaces,
    /// including the search path.
    fn table_names(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<Vec<String>>>;

    /// Return an empty database schema. This happens in the flavour, because we need
    /// SqlSchema::connector_data to be set.
    fn empty_database_schema(&self) -> SqlSchema {
        SqlSchema::default()
    }

    /// Check a connection to make sure it is usable by the schema engine.
    /// This can include some set up on the database, like ensuring that the
    /// schema we connect to exists.
    fn ensure_connection_validity(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;

    /// Same as [describe_schema], but only called for introspection.
    fn introspect<'a>(
        &'a mut self,
        namespaces: Option<Namespaces>,
        _ctx: &'a IntrospectionContext,
    ) -> BoxFuture<'a, ConnectorResult<SqlSchema>> {
        self.describe_schema(namespaces)
    }

    fn load_migrations_table(
        &mut self,
    ) -> BoxFuture<'_, ConnectorResult<Result<Vec<MigrationRecord>, PersistenceNotInitializedError>>> {
        use quaint::prelude::*;
        Box::pin(async move {
            let select = Select::from_table(self.migrations_table())
                .column("id")
                .column("checksum")
                .column("finished_at")
                .column("migration_name")
                .column("logs")
                .column("rolled_back_at")
                .column("started_at")
                .column("applied_steps_count")
                .order_by("started_at".ascend());

            let rows = match self.query(select.into()).await {
                Ok(result) => result,
                Err(err)
                    if err.is_user_facing_error::<user_facing_errors::query_engine::TableDoesNotExist>()
                        || err.is_user_facing_error::<user_facing_errors::common::InvalidModel>() =>
                {
                    return Ok(Err(PersistenceNotInitializedError))
                }
                err @ Err(_) => err?,
            };

            let rows = rows
                .into_iter()
                .map(|row| -> ConnectorResult<_> {
                    Ok(MigrationRecord {
                        id: row.get("id").and_then(|v| v.to_string()).ok_or_else(|| {
                            ConnectorError::from_msg("Failed to extract `id` from `_prisma_migrations` row.".into())
                        })?,
                        checksum: row.get("checksum").and_then(|v| v.to_string()).ok_or_else(|| {
                            ConnectorError::from_msg(
                                "Failed to extract `checksum` from `_prisma_migrations` row.".into(),
                            )
                        })?,
                        finished_at: row.get("finished_at").and_then(|v| v.as_datetime()),
                        migration_name: row.get("migration_name").and_then(|v| v.to_string()).ok_or_else(|| {
                            ConnectorError::from_msg(
                                "Failed to extract `migration_name` from `_prisma_migrations` row.".into(),
                            )
                        })?,
                        logs: None,
                        rolled_back_at: row.get("rolled_back_at").and_then(|v| v.as_datetime()),
                        started_at: row.get("started_at").and_then(|v| v.as_datetime()).ok_or_else(|| {
                            ConnectorError::from_msg(
                                "Failed to extract `started_at` from `_prisma_migrations` row.".into(),
                            )
                        })?,
                        applied_steps_count: row.get("applied_steps_count").and_then(|v| v.as_integer()).ok_or_else(
                            || {
                                ConnectorError::from_msg(
                                    "Failed to extract `applied_steps_count` from `_prisma_migrations` row.".into(),
                                )
                            },
                        )? as u32,
                    })
                })
                .collect::<Result<Vec<_>, _>>()?;

            tracing::debug!("Found {} migrations in the migrations table.", rows.len());

            Ok(Ok(rows))
        })
    }

    fn query<'a>(
        &'a mut self,
        query: quaint::ast::Query<'a>,
    ) -> BoxFuture<'a, ConnectorResult<quaint::prelude::ResultSet>>;

    fn query_raw<'a>(
        &'a mut self,
        sql: &'a str,
        params: &'a [quaint::prelude::Value<'a>],
    ) -> BoxFuture<'_, ConnectorResult<quaint::prelude::ResultSet>>;

    fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>>;

    /// Drop the database and recreate it empty.
    fn reset(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>>;

    /// Optionally scan a migration script that could have been altered by users and emit warnings.
    fn scan_migration_script(&self, _script: &str) {}

    /// Apply the given migration history to a shadow database, and return
    /// the final introspected SQL schema. The third parameter is an optional shadow database url
    /// in case there is one at this point of the command, but not earlier in set_params().
    fn sql_schema_from_migration_history<'a>(
        &'a mut self,
        migrations: &'a [MigrationDirectory],
        shadow_database_url: Option<String>,
        namespaces: Option<Namespaces>,
    ) -> BoxFuture<'a, ConnectorResult<SqlSchema>>;

    /// Receive and validate connector params.
    fn set_params(&mut self, connector_params: ConnectorParams) -> ConnectorResult<()>;

    /// Sets the preview features. This is currently useful for MultiSchema, as we want to
    /// grab the namespaces we're expected to diff/work on, which are generally set in
    /// the schema.
    /// WARNING: This may silently not do anything if the connector is in the initial state.
    /// If this is ever a problem, considering returning an indicator of success.
    fn set_preview_features(&mut self, preview_features: BitFlags<psl::PreviewFeature>);

    /// Table to store applied migrations.
    fn migrations_table(&self) -> Table<'static> {
        crate::MIGRATIONS_TABLE_NAME.into()
    }

    fn version(&mut self) -> BoxFuture<'_, ConnectorResult<Option<String>>>;

    fn search_path(&self) -> &str;
}

// Utility function shared by multiple flavours to compare shadow database and main connection.
fn validate_connection_infos_do_not_match(previous: &str, next: &str) -> ConnectorResult<()> {
    if previous == next {
        Err(ConnectorError::from_msg("The shadow database you configured appears to be the same as the main database. Please specify another shadow database.".into()))
    } else {
        Ok(())
    }
}

/// Remove all usage of non-enabled preview feature elements from the SqlSchema.
fn normalize_sql_schema(sql_schema: &mut SqlSchema, preview_features: BitFlags<PreviewFeature>) {
    // Remove this when the feature is GA
    if !preview_features.contains(PreviewFeature::FullTextIndex) {
        sql_schema.make_fulltext_indexes_normal();
    }

    if !preview_features.contains(PreviewFeature::MultiSchema) {
        sql_schema.clear_namespaces();
    }
}

fn quaint_error_to_connector_error(error: quaint::error::Error, connection_info: &ConnectionInfo) -> ConnectorError {
    match user_facing_errors::quaint::render_quaint_error(error.kind(), connection_info) {
        Some(user_facing_error) => user_facing_error.into(),
        None => {
            let msg = error
                .original_message()
                .map(String::from)
                .unwrap_or_else(|| error.to_string());
            ConnectorError::from_msg(msg)
        }
    }
}