use crate::CoreResult;
use schema_connector::{
migrations_directory::*, ConnectorError, DiffTarget, MigrationRecord, Namespaces, PersistenceNotInitializedError,
SchemaConnector,
};
use serde::{Deserialize, Serialize};
use std::path::Path;
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct DiagnoseMigrationHistoryInput {
pub migrations_directory_path: String,
pub opt_in_to_shadow_database: bool,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct DiagnoseMigrationHistoryOutput {
#[serde(skip)]
pub drift: Option<DriftDiagnostic>,
pub history: Option<HistoryDiagnostic>,
pub failed_migration_names: Vec<String>,
pub edited_migration_names: Vec<String>,
#[serde(skip)]
pub error_in_unapplied_migration: Option<ConnectorError>,
pub has_migrations_table: bool,
}
impl DiagnoseMigrationHistoryOutput {
pub fn is_empty(&self) -> bool {
matches!(
self,
DiagnoseMigrationHistoryOutput {
drift,
history,
has_migrations_table: _,
failed_migration_names,
edited_migration_names,
error_in_unapplied_migration,
} if drift.is_none() && history.is_none() && failed_migration_names.is_empty() && edited_migration_names.is_empty() && error_in_unapplied_migration.is_none()
)
}
}
pub async fn diagnose_migration_history(
input: DiagnoseMigrationHistoryInput,
namespaces: Option<Namespaces>,
connector: &mut dyn SchemaConnector,
) -> CoreResult<DiagnoseMigrationHistoryOutput> {
tracing::debug!("Diagnosing migration history");
error_on_changed_provider(&input.migrations_directory_path, connector.connector_type())?;
let migrations_from_filesystem = list_migrations(Path::new(&input.migrations_directory_path))?;
let (migrations_from_database, has_migrations_table) =
match connector.migration_persistence().list_migrations().await? {
Ok(migrations) => (migrations, true),
Err(PersistenceNotInitializedError {}) => (vec![], false),
};
let mut diagnostics = Diagnostics::new(&migrations_from_filesystem);
for (index, fs_migration) in migrations_from_filesystem.iter().enumerate() {
let corresponding_db_migration = migrations_from_database
.iter()
.find(|db_migration| db_migration.migration_name == fs_migration.migration_name());
match corresponding_db_migration {
Some(db_migration)
if !fs_migration
.matches_checksum(&db_migration.checksum)
.map_err(ConnectorError::from)? =>
{
diagnostics.edited_migrations.push(db_migration);
}
Some(_) => (),
None => diagnostics.fs_migrations_not_in_db.push((index, fs_migration)),
}
}
for (index, db_migration) in migrations_from_database.iter().enumerate() {
let corresponding_fs_migration = migrations_from_filesystem
.iter()
.find(|fs_migration| db_migration.migration_name == fs_migration.migration_name());
if db_migration.finished_at.is_none() && db_migration.rolled_back_at.is_none() {
diagnostics.failed_migrations.push(db_migration);
}
if corresponding_fs_migration.is_none() {
diagnostics.db_migrations_not_in_fs.push((index, db_migration))
}
}
let applied_migrations: Vec<_> = migrations_from_filesystem
.iter()
.filter(|fs_migration| {
migrations_from_database
.iter()
.filter(|db_migration| db_migration.finished_at.is_some() && db_migration.rolled_back_at.is_none())
.any(|db_migration| db_migration.migration_name == fs_migration.migration_name())
})
.cloned()
.collect();
let (drift, error_in_unapplied_migration) = {
if input.opt_in_to_shadow_database {
let from = connector
.database_schema_from_diff_target(DiffTarget::Migrations(&applied_migrations), None, namespaces.clone())
.await;
let to = connector
.database_schema_from_diff_target(DiffTarget::Database, None, namespaces.clone())
.await;
let drift = match from.and_then(|from| to.map(|to| connector.diff(from, to))).map(|mig| {
if connector.migration_is_empty(&mig) {
None
} else {
Some(mig)
}
}) {
Ok(Some(drift)) => Some(DriftDiagnostic::DriftDetected {
summary: connector.migration_summary(&drift),
}),
Err(error) => Some(DriftDiagnostic::MigrationFailedToApply { error }),
_ => None,
};
let error_in_unapplied_migration = if !matches!(drift, Some(DriftDiagnostic::MigrationFailedToApply { .. }))
{
connector
.validate_migrations(&migrations_from_filesystem, namespaces)
.await
.err()
} else {
None
};
(drift, error_in_unapplied_migration)
} else {
(None, None)
}
};
Ok(DiagnoseMigrationHistoryOutput {
drift,
history: diagnostics.history(),
failed_migration_names: diagnostics.failed_migration_names(),
edited_migration_names: diagnostics.edited_migration_names(),
error_in_unapplied_migration,
has_migrations_table,
})
}
#[derive(Debug)]
struct Diagnostics<'a> {
fs_migrations_not_in_db: Vec<(usize, &'a MigrationDirectory)>,
db_migrations_not_in_fs: Vec<(usize, &'a MigrationRecord)>,
edited_migrations: Vec<&'a MigrationRecord>,
failed_migrations: Vec<&'a MigrationRecord>,
fs_migrations: &'a [MigrationDirectory],
}
impl<'a> Diagnostics<'a> {
fn new(fs_migrations: &'a [MigrationDirectory]) -> Self {
Diagnostics {
fs_migrations_not_in_db: Vec::new(),
db_migrations_not_in_fs: Vec::new(),
edited_migrations: Vec::new(),
failed_migrations: Vec::new(),
fs_migrations,
}
}
fn db_migration_names(&self) -> Vec<String> {
self.db_migrations_not_in_fs
.iter()
.map(|(_, migration)| migration.migration_name.clone())
.collect()
}
fn edited_migration_names(&self) -> Vec<String> {
self.edited_migrations
.iter()
.map(|migration| migration.migration_name.clone())
.collect()
}
fn failed_migration_names(&self) -> Vec<String> {
self.failed_migrations
.iter()
.map(|migration| migration.migration_name.clone())
.collect()
}
fn fs_migration_names(&self) -> Vec<String> {
self.fs_migrations_not_in_db
.iter()
.map(|(_, migration)| migration.migration_name().to_owned())
.collect()
}
fn history(&self) -> Option<HistoryDiagnostic> {
match (self.fs_migrations_not_in_db.len(), self.db_migrations_not_in_fs.len()) {
(0, 0) => None,
(_, 0) => Some(HistoryDiagnostic::DatabaseIsBehind {
unapplied_migration_names: self.fs_migration_names(),
}),
(0, _) => Some(HistoryDiagnostic::MigrationsDirectoryIsBehind {
unpersisted_migration_names: self.db_migration_names(),
}),
(_, _) => Some(HistoryDiagnostic::HistoriesDiverge {
last_common_migration_name: self.fs_migrations_not_in_db.first().and_then(|(idx, _)| {
if *idx == 0 {
None
} else {
Some(self.fs_migrations[idx - 1].migration_name().to_owned())
}
}),
unpersisted_migration_names: self.db_migration_names(),
unapplied_migration_names: self.fs_migration_names(),
}),
}
}
}
#[derive(Debug, PartialEq, Serialize)]
#[serde(tag = "diagnostic", rename_all = "camelCase")]
pub enum HistoryDiagnostic {
#[serde(rename_all = "camelCase")]
DatabaseIsBehind {
unapplied_migration_names: Vec<String>,
},
#[serde(rename_all = "camelCase")]
MigrationsDirectoryIsBehind {
unpersisted_migration_names: Vec<String>,
},
#[serde(rename_all = "camelCase")]
HistoriesDiverge {
last_common_migration_name: Option<String>,
unpersisted_migration_names: Vec<String>,
unapplied_migration_names: Vec<String>,
},
}
#[derive(Debug)]
pub enum DriftDiagnostic {
DriftDetected {
summary: String,
},
MigrationFailedToApply {
error: ConnectorError,
},
}
impl DriftDiagnostic {
pub fn unwrap_drift_detected(self) -> String {
match self {
DriftDiagnostic::DriftDetected { summary } => summary,
other => panic!("unwrap_drift_detected on {other:?}"),
}
}
}