1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
use crate::CoreError;
use connector::Transaction;
use crosstarget_utils::time::ElapsedTimeCounter;
use serde::Deserialize;
use std::fmt::Display;
use tokio::time::Duration;
mod actor_manager;
mod actors;
mod error;
mod messages;
pub use error::*;
pub(crate) use actor_manager::*;
pub(crate) use actors::*;
pub(crate) use messages::*;
/// How Interactive Transactions work
/// The Interactive Transactions (iTx) follow an actor model design. Where each iTx is created in its own process.
/// When a prisma client requests to start a new transaction, the Transaction Actor Manager spawns a new ITXServer. The ITXServer runs in its own
/// process and waits for messages to arrive via its receive channel to process.
/// The Transaction Actor Manager will also create an ITXClient and add it to hashmap managed by an RwLock. The ITXClient is the only way to communicate
/// with the ITXServer.
/// Once Prisma Client receives the iTx Id it can perform database operations using that iTx id. When an operation request is received by the
/// TransactionActorManager, it looks for the client in the hashmap and passes the operation to the client. The ITXClient sends a message to the
/// ITXServer and waits for a response. The ITXServer will then perform the operation and return the result. The ITXServer will perform one
/// operation at a time. All other operations will sit in the message queue waiting to be processed.
///
/// The ITXServer will handle all messages until:
/// - It transitions state, e.g "rollback" or "commit"
/// - It exceeds its timeout, in which case the iTx is rolledback and the connection to the database is closed.
/// Once the ITXServer is done handling messages from the iTx Client, it sends a last message to the Background Client list Actor to say that it is completed and then shuts down.
/// The Background Client list Actor removes the client from the list of active clients and keeps in cache the iTx id of the closed transaction.
/// We keep a list of closed transactions so that if any further messages are received for this iTx id,
/// the TransactionActorManager can reply with a helpful error message which explains that no operation can be performed on a closed transaction
/// rather than an error message stating that the transaction does not exist.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize)]
pub struct TxId(String);
const MINIMUM_TX_ID_LENGTH: usize = 24;
impl Default for TxId {
fn default() -> Self {
#[allow(deprecated)]
Self(cuid::cuid().unwrap())
}
}
impl<T> From<T> for TxId
where
T: Into<String>,
{
fn from(s: T) -> Self {
let contents = s.into();
// This postcondition is to ensure that the TxId is long enough as to be able to derive
// a TraceId from it.
assert!(
contents.len() >= MINIMUM_TX_ID_LENGTH,
"minimum length for a TxId ({}) is {}, but was {}",
contents,
MINIMUM_TX_ID_LENGTH,
contents.len()
);
Self(contents)
}
}
impl Display for TxId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}
pub enum CachedTx<'a> {
Open(Box<dyn Transaction + 'a>),
Committed,
RolledBack,
Expired,
}
impl Display for CachedTx<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CachedTx::Open(_) => f.write_str("Open"),
CachedTx::Committed => f.write_str("Committed"),
CachedTx::RolledBack => f.write_str("Rolled back"),
CachedTx::Expired => f.write_str("Expired"),
}
}
}
impl<'a> CachedTx<'a> {
/// Requires this cached TX to be `Open`, else an error will be raised that it is no longer valid.
pub(crate) fn as_open(&mut self) -> crate::Result<&mut Box<dyn Transaction + 'a>> {
if let Self::Open(ref mut otx) = self {
Ok(otx)
} else {
let reason = format!("Transaction is no longer valid. Last state: '{self}'");
Err(CoreError::from(TransactionError::Closed { reason }))
}
}
pub(crate) fn to_closed(&self, start_time: ElapsedTimeCounter, timeout: Duration) -> Option<ClosedTx> {
match self {
CachedTx::Open(_) => None,
CachedTx::Committed => Some(ClosedTx::Committed),
CachedTx::RolledBack => Some(ClosedTx::RolledBack),
CachedTx::Expired => Some(ClosedTx::Expired { start_time, timeout }),
}
}
}
pub(crate) enum ClosedTx {
Committed,
RolledBack,
Expired {
start_time: ElapsedTimeCounter,
timeout: Duration,
},
}