1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use crate::{json_rpc::types::*, CoreError, CoreResult};
use schema_connector::{
    migrations_directory::{error_on_changed_provider, MigrationDirectory},
    SchemaConnector,
};
use std::path::Path;
use user_facing_errors::schema_engine::{MigrationAlreadyApplied, MigrationToMarkAppliedNotFound};

/// Mark a migration as applied.
pub async fn mark_migration_applied(
    input: MarkMigrationAppliedInput,
    connector: &mut dyn SchemaConnector,
) -> CoreResult<MarkMigrationAppliedOutput> {
    error_on_changed_provider(&input.migrations_directory_path, connector.connector_type())?;

    connector.acquire_lock().await?;

    let migration_directory =
        MigrationDirectory::new(Path::new(&input.migrations_directory_path).join(&input.migration_name));

    let script = migration_directory.read_migration_script().map_err(|_err| {
        CoreError::user_facing(MigrationToMarkAppliedNotFound {
            migration_name: input.migration_name.clone(),
        })
    })?;

    let relevant_migrations = match connector.migration_persistence().list_migrations().await? {
        Ok(migrations) => migrations
            .into_iter()
            .filter(|migration| migration.migration_name == input.migration_name)
            .collect(),
        Err(_) => {
            connector.migration_persistence().baseline_initialize().await?;

            vec![]
        }
    };

    if relevant_migrations
        .iter()
        .any(|migration| migration.finished_at.is_some())
    {
        return Err(CoreError::user_facing(MigrationAlreadyApplied {
            migration_name: input.migration_name.clone(),
        }));
    }

    let migrations_to_mark_rolled_back = relevant_migrations
        .iter()
        .filter(|migration| migration.finished_at.is_none() && migration.rolled_back_at.is_none());

    for migration in migrations_to_mark_rolled_back {
        connector
            .migration_persistence()
            .mark_migration_rolled_back_by_id(&migration.id)
            .await?;
    }

    connector
        .migration_persistence()
        .mark_migration_applied(migration_directory.migration_name(), &script)
        .await?;

    Ok(MarkMigrationAppliedOutput {})
}