1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::CoreError;
use connector::Transaction;
use crosstarget_utils::time::ElapsedTimeCounter;
use serde::Deserialize;
use std::fmt::Display;
use tokio::time::Duration;

mod actor_manager;
mod actors;
mod error;
mod messages;

pub use error::*;

pub(crate) use actor_manager::*;
pub(crate) use actors::*;
pub(crate) use messages::*;

/// How Interactive Transactions work
/// The Interactive Transactions (iTx) follow an actor model design. Where each iTx is created in its own process.
/// When a prisma client requests to start a new transaction, the Transaction Actor Manager spawns a new ITXServer. The ITXServer runs in its own
/// process and waits for messages to arrive via its receive channel to process.
/// The Transaction Actor Manager will also create an ITXClient and add it to hashmap managed by an RwLock. The ITXClient is the only way to communicate
/// with the ITXServer.

/// Once Prisma Client receives the iTx Id it can perform database operations using that iTx id. When an operation request is received by the
/// TransactionActorManager, it looks for the client in the hashmap and passes the operation to the client. The ITXClient sends a message to the
/// ITXServer and waits for a response. The ITXServer will then perform the operation and return the result. The ITXServer will perform one
/// operation at a time. All other operations will sit in the message queue waiting to be processed.
///
/// The ITXServer will handle all messages until:
/// - It transitions state, e.g "rollback" or "commit"
/// - It exceeds its timeout, in which case the iTx is rolledback and the connection to the database is closed.

/// Once the ITXServer is done handling messages from the iTx Client, it sends a last message to the Background Client list Actor to say that it is completed and then shuts down.
/// The Background Client list Actor removes the client from the list of active clients and keeps in cache the iTx id of the closed transaction.

/// We keep a list of closed transactions so that if any further messages are received for this iTx id,
/// the TransactionActorManager can reply with a helpful error message which explains that no operation can be performed on a closed transaction
/// rather than an error message stating that the transaction does not exist.

#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize)]
pub struct TxId(String);

const MINIMUM_TX_ID_LENGTH: usize = 24;

impl Default for TxId {
    fn default() -> Self {
        #[allow(deprecated)]
        Self(cuid::cuid().unwrap())
    }
}

impl<T> From<T> for TxId
where
    T: Into<String>,
{
    fn from(s: T) -> Self {
        let contents = s.into();
        // This postcondition is to ensure that the TxId is long enough as to be able to derive
        // a TraceId from it.
        assert!(
            contents.len() >= MINIMUM_TX_ID_LENGTH,
            "minimum length for a TxId ({}) is {}, but was {}",
            contents,
            MINIMUM_TX_ID_LENGTH,
            contents.len()
        );
        Self(contents)
    }
}

impl Display for TxId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        Display::fmt(&self.0, f)
    }
}

pub enum CachedTx<'a> {
    Open(Box<dyn Transaction + 'a>),
    Committed,
    RolledBack,
    Expired,
}

impl Display for CachedTx<'_> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            CachedTx::Open(_) => f.write_str("Open"),
            CachedTx::Committed => f.write_str("Committed"),
            CachedTx::RolledBack => f.write_str("Rolled back"),
            CachedTx::Expired => f.write_str("Expired"),
        }
    }
}

impl<'a> CachedTx<'a> {
    /// Requires this cached TX to be `Open`, else an error will be raised that it is no longer valid.
    pub(crate) fn as_open(&mut self) -> crate::Result<&mut Box<dyn Transaction + 'a>> {
        if let Self::Open(ref mut otx) = self {
            Ok(otx)
        } else {
            let reason = format!("Transaction is no longer valid. Last state: '{self}'");
            Err(CoreError::from(TransactionError::Closed { reason }))
        }
    }

    pub(crate) fn to_closed(&self, start_time: ElapsedTimeCounter, timeout: Duration) -> Option<ClosedTx> {
        match self {
            CachedTx::Open(_) => None,
            CachedTx::Committed => Some(ClosedTx::Committed),
            CachedTx::RolledBack => Some(ClosedTx::RolledBack),
            CachedTx::Expired => Some(ClosedTx::Expired { start_time, timeout }),
        }
    }
}

pub(crate) enum ClosedTx {
    Committed,
    RolledBack,
    Expired {
        start_time: ElapsedTimeCounter,
        timeout: Duration,
    },
}