1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::{json_rpc::types::*, CoreError, CoreResult};
use schema_connector::{
    migrations_directory::{error_on_changed_provider, list_migrations, MigrationDirectory},
    ConnectorError, MigrationRecord, Namespaces, PersistenceNotInitializedError, SchemaConnector,
};
use std::{path::Path, time::Instant};
use tracing::Instrument;
use user_facing_errors::schema_engine::FoundFailedMigrations;

pub async fn apply_migrations(
    input: ApplyMigrationsInput,
    connector: &mut dyn SchemaConnector,
    namespaces: Option<Namespaces>,
) -> CoreResult<ApplyMigrationsOutput> {
    let start = Instant::now();

    error_on_changed_provider(&input.migrations_directory_path, connector.connector_type())?;

    connector.acquire_lock().await?;
    connector.migration_persistence().initialize(namespaces).await?;

    let migrations_from_filesystem = list_migrations(Path::new(&input.migrations_directory_path))?;
    let migrations_from_database = connector
        .migration_persistence()
        .list_migrations()
        .await?
        .map_err(PersistenceNotInitializedError::into_connector_error)?;

    detect_failed_migrations(&migrations_from_database)?;

    // We are now on the Happy Path™.
    tracing::debug!("Migration history is OK, applying unapplied migrations.");
    let unapplied_migrations: Vec<&MigrationDirectory> = migrations_from_filesystem
        .iter()
        .filter(|fs_migration| {
            !migrations_from_database
                .iter()
                .filter(|db_migration| db_migration.rolled_back_at.is_none())
                .any(|db_migration| fs_migration.migration_name() == db_migration.migration_name)
        })
        .collect();

    let analysis_duration_ms = Instant::now().duration_since(start).as_millis() as u64;
    tracing::info!(analysis_duration_ms, "Analysis run in {}ms", analysis_duration_ms,);

    let mut applied_migration_names: Vec<String> = Vec::with_capacity(unapplied_migrations.len());

    for unapplied_migration in unapplied_migrations {
        let fut = async {
            let script = unapplied_migration
                .read_migration_script()
                .map_err(ConnectorError::from)?;

            tracing::info!(
                script = script.as_str(),
                "Applying `{}`",
                unapplied_migration.migration_name()
            );

            let migration_id = connector
                .migration_persistence()
                .record_migration_started(unapplied_migration.migration_name(), &script)
                .await?;

            match connector
                .apply_script(unapplied_migration.migration_name(), &script)
                .await
            {
                Ok(()) => {
                    tracing::debug!("Successfully applied the script.");
                    let p = connector.migration_persistence();
                    p.record_successful_step(&migration_id).await?;
                    p.record_migration_finished(&migration_id).await?;
                    applied_migration_names.push(unapplied_migration.migration_name().to_owned());
                    Ok(())
                }
                Err(err) => {
                    tracing::debug!("Failed to apply the script.");

                    let logs = err.to_string();

                    connector
                        .migration_persistence()
                        .record_failed_step(&migration_id, &logs)
                        .await?;

                    Err(err)
                }
            }
        };
        fut.instrument(tracing::info_span!(
            "Applying migration",
            migration_name = unapplied_migration.migration_name(),
        ))
        .await?
    }

    Ok(ApplyMigrationsOutput {
        applied_migration_names,
    })
}

fn detect_failed_migrations(migrations_from_database: &[MigrationRecord]) -> CoreResult<()> {
    use std::fmt::Write as _;

    tracing::debug!("Checking for failed migrations.");

    let mut failed_migrations = migrations_from_database
        .iter()
        .filter(|migration| migration.finished_at.is_none() && migration.rolled_back_at.is_none())
        .peekable();

    if failed_migrations.peek().is_none() {
        return Ok(());
    }

    let mut details = String::new();

    for failed_migration in failed_migrations {
        let logs = failed_migration
            .logs
            .as_deref()
            .map(|s| s.trim())
            .filter(|s| !s.is_empty())
            .map(|s| format!(" with the following logs:\n{s}"))
            .unwrap_or_default();

        writeln!(
            details,
            "The `{name}` migration started at {started_at} failed{logs}",
            name = failed_migration.migration_name,
            started_at = failed_migration.started_at,
        )
        .unwrap();
    }

    Err(CoreError::user_facing(FoundFailedMigrations { details }))
}