1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use super::{
    check::Check, database_inspection_results::DatabaseInspectionResults,
    unexecutable_step_check::UnexecutableStepCheck, warning_check::SqlMigrationWarningCheck,
};
use crate::flavour::SqlFlavour;
use schema_connector::{
    ConnectorError, ConnectorResult, DestructiveChangeDiagnostics, MigrationWarning, UnexecutableMigration,
};
use std::time::Duration;
use tokio::time::{error::Elapsed, timeout};

const DESTRUCTIVE_TIMEOUT_DURATION: Duration = Duration::from_secs(60);

/// A DestructiveCheckPlan is the collection of destructive change checks
/// ([Check](trait.Check.html)) for a given migration. It has an `execute` method that performs
/// database inspection and renders user-facing messages based on the checks.
#[derive(Debug)]
pub(crate) struct DestructiveCheckPlan {
    warnings: Vec<(SqlMigrationWarningCheck, usize)>,
    unexecutable_migrations: Vec<(UnexecutableStepCheck, usize)>,
}

impl DestructiveCheckPlan {
    pub(super) fn new() -> Self {
        DestructiveCheckPlan {
            warnings: Vec::new(),
            unexecutable_migrations: Vec::new(),
        }
    }

    pub(super) fn push_warning(&mut self, warning: SqlMigrationWarningCheck, step_index: usize) {
        self.warnings.push((warning, step_index))
    }

    pub(super) fn push_unexecutable(&mut self, unexecutable_migration: UnexecutableStepCheck, step_index: usize) {
        self.unexecutable_migrations.push((unexecutable_migration, step_index))
    }

    /// Inspect the current database state to qualify and render destructive change warnings and
    /// errors.
    ///
    /// For example, dropping a table that has 0 rows can be considered safe.
    #[tracing::instrument(skip(flavour), level = "debug")]
    pub(super) async fn execute(
        &self,
        flavour: &mut (dyn SqlFlavour + Send + Sync),
    ) -> ConnectorResult<DestructiveChangeDiagnostics> {
        let mut results = DatabaseInspectionResults::default();

        let inspection = async {
            for (unexecutable, _idx) in &self.unexecutable_migrations {
                self.inspect_for_check(unexecutable, flavour, &mut results).await?;
            }

            for (warning, _idx) in &self.warnings {
                self.inspect_for_check(warning, flavour, &mut results).await?;
            }

            Ok::<(), ConnectorError>(())
        };

        // Ignore the timeout error, we will still return useful warnings.
        match timeout(DESTRUCTIVE_TIMEOUT_DURATION, inspection).await {
            Ok(Ok(())) | Err(Elapsed { .. }) => (),
            Ok(Err(err)) => return Err(err),
        };

        let mut diagnostics = DestructiveChangeDiagnostics::new();

        for (unexecutable, step_index) in &self.unexecutable_migrations {
            if let Some(message) = unexecutable.evaluate(&results) {
                diagnostics.unexecutable_migrations.push(UnexecutableMigration {
                    description: message,
                    step_index: *step_index,
                })
            }
        }

        for (warning, step_index) in &self.warnings {
            if let Some(message) = warning.evaluate(&results) {
                diagnostics.warnings.push(MigrationWarning {
                    description: message,
                    step_index: *step_index,
                })
            }
        }

        Ok(diagnostics)
    }

    /// Perform the database inspection for a given [`Check`](trait.Check.html).
    pub(super) async fn inspect_for_check(
        &self,
        check: &(dyn Check + Send + Sync + 'static),
        flavour: &mut (dyn SqlFlavour + Send + Sync),
        results: &mut DatabaseInspectionResults,
    ) -> ConnectorResult<()> {
        if let Some(table) = check.needed_table_row_count() {
            if results.get_row_count(&table).is_none() {
                let count = flavour.count_rows_in_table(&table).await?;
                results.set_row_count(table.to_owned(), count)
            }
        }

        if let Some(column) = check.needed_column_value_count() {
            if let (_, None) = results.get_row_and_non_null_value_count(&column) {
                let count = flavour.count_values_in_column(&column).await?;
                results.set_value_count(column, count);
            }
        }

        Ok(())
    }

    /// Return hypothetical warnings and errors, without performing any database
    /// IO. This is useful when we want to return diagnostics in reference to a
    /// database we cannot check directly. For example when we want to emit
    /// warnings about the production database, when creating a migration in
    /// development.
    pub(super) fn pure_check(&self) -> DestructiveChangeDiagnostics {
        let results = DatabaseInspectionResults::default();
        let mut diagnostics = DestructiveChangeDiagnostics::new();

        for (unexecutable, step_index) in &self.unexecutable_migrations {
            if let Some(message) = unexecutable.evaluate(&results) {
                diagnostics.unexecutable_migrations.push(UnexecutableMigration {
                    description: message,
                    step_index: *step_index,
                })
            }
        }

        for (warning, step_index) in &self.warnings {
            if let Some(message) = warning.evaluate(&results) {
                diagnostics.warnings.push(MigrationWarning {
                    description: message,
                    step_index: *step_index,
                })
            }
        }

        diagnostics
    }
}