1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! What the executor module DOES:
//! - Defining an overarching executor trait, to be used on consumers of the core crate.
//! - Defining executor implementations that combine the different core modules into a coherent
//!   string of actions to execute a given query document.
//!
//! What the executor module DOES NOT DO:
//! - Define low level execution of queries. This is considered an implementation detail of the modules used by the executors.

mod execute_operation;
mod interpreting_executor;
mod pipeline;
mod request_context;

pub use self::{execute_operation::*, interpreting_executor::InterpretingExecutor};

pub(crate) use request_context::*;

use crate::{
    protocol::EngineProtocol, query_document::Operation, response_ir::ResponseData, schema::QuerySchemaRef,
    BatchDocumentTransaction, TxId,
};
use async_trait::async_trait;
use connector::Connector;
use serde::{Deserialize, Serialize};
use tracing::Dispatch;

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait QueryExecutor: TransactionManager {
    /// Executes a single operation and returns its result.
    /// Implementers must honor the passed transaction ID and execute the operation on the transaction identified
    /// by `tx_id`. If `None`, implementers are free to choose how to execute the query.
    async fn execute(
        &self,
        tx_id: Option<TxId>,
        operation: Operation,
        query_schema: QuerySchemaRef,
        trace_id: Option<String>,
        engine_protocol: EngineProtocol,
    ) -> crate::Result<ResponseData>;

    /// Executes a collection of operations as either a fanout of individual operations (non-transactional), or in series (transactional).
    ///
    /// Implementers must honor the passed transaction ID and execute the operation on the transaction identified
    /// by `tx_id`. If `None`, implementers are free to choose how to execute the query.
    ///
    /// Note that `transactional` is the legacy marker for transactional batches. It must be supported until the stabilization of ITXs.
    async fn execute_all(
        &self,
        tx_id: Option<TxId>,
        operations: Vec<Operation>,
        transaction: Option<BatchDocumentTransaction>,
        query_schema: QuerySchemaRef,
        trace_id: Option<String>,
        engine_protocol: EngineProtocol,
    ) -> crate::Result<Vec<crate::Result<ResponseData>>>;

    fn primary_connector(&self) -> &(dyn Connector + Send + Sync);
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TransactionOptions {
    /// Maximum wait time for tx acquisition in milliseconds.
    #[serde(rename = "max_wait")]
    pub max_acquisition_millis: u64,

    /// Time in milliseconds after which the transaction rolls back automatically.
    #[serde(rename = "timeout")]
    pub valid_for_millis: u64,

    /// Isolation level to use for the transaction.
    pub isolation_level: Option<String>,

    /// An optional pre-defined transaction id. Some value might be provided in case we want to generate
    /// a new id at the beginning of the transaction
    #[serde(skip)]
    pub new_tx_id: Option<TxId>,
}

impl TransactionOptions {
    pub fn new(max_acquisition_millis: u64, valid_for_millis: u64, isolation_level: Option<String>) -> Self {
        Self {
            max_acquisition_millis,
            valid_for_millis,
            isolation_level,
            new_tx_id: None,
        }
    }

    /// Generates a new transaction id before the transaction is started and returns a modified version
    /// of self with the new predefined_id set.
    pub fn with_new_transaction_id(&mut self) -> TxId {
        let tx_id: TxId = Default::default();
        self.new_tx_id = Some(tx_id.clone());
        tx_id
    }
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait TransactionManager {
    /// Starts a new transaction.
    /// Returns ID of newly opened transaction.
    /// Expected to throw an error if no transaction could be opened for `opts.max_acquisition_millis` milliseconds.
    /// The new transaction must only live for `opts.valid_for_millis` milliseconds before it automatically rolls back.
    /// This rollback mechanism is an implementation detail of the trait implementer.
    async fn start_tx(
        &self,
        query_schema: QuerySchemaRef,
        engine_protocol: EngineProtocol,
        opts: TransactionOptions,
    ) -> crate::Result<TxId>;

    /// Commits a transaction.
    async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()>;

    /// Rolls back a transaction.
    async fn rollback_tx(&self, tx_id: TxId) -> crate::Result<()>;
}

// With the node-api when a future is spawned in a new thread `tokio:spawn` it will not
// use the current dispatcher and its logs will not be captured anymore. We can use this
// method to get the current dispatcher and combine it with `with_subscriber`
// let dispatcher = get_current_dispatcher();
// tokio::spawn(async {
//      my_async_ops.await
// }.with_subscriber(dispatcher));
//
//
// Finally, this can be replaced with with_current_collector
// https://github.com/tokio-rs/tracing/blob/master/tracing-futures/src/lib.rs#L234
// once this is in a release

pub fn get_current_dispatcher() -> Dispatch {
    tracing::dispatcher::get_default(|current| current.clone())
}