1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::time::Duration;

use bson::{doc, Document};

use crate::{
    cmap::{Command, RawCommandResponse, StreamDescription},
    error::Result,
    operation::{append_options, remove_empty_write_concern, OperationWithDefaults, Retryability},
    options::{Acknowledgment, TransactionOptions, WriteConcern},
};

use super::WriteConcernOnlyBody;

pub(crate) struct CommitTransaction {
    options: Option<TransactionOptions>,
}

impl CommitTransaction {
    pub(crate) fn new(options: Option<TransactionOptions>) -> Self {
        Self { options }
    }
}

impl OperationWithDefaults for CommitTransaction {
    type O = ();
    type Command = Document;

    const NAME: &'static str = "commitTransaction";

    fn build(&mut self, _description: &StreamDescription) -> Result<Command> {
        let mut body = doc! {
            Self::NAME: 1,
        };

        remove_empty_write_concern!(self.options);
        append_options(&mut body, self.options.as_ref())?;

        Ok(Command::new(
            Self::NAME.to_string(),
            "admin".to_string(),
            body,
        ))
    }

    fn handle_response(
        &self,
        response: RawCommandResponse,
        _description: &StreamDescription,
    ) -> Result<Self::O> {
        let response: WriteConcernOnlyBody = response.body()?;
        response.validate()
    }

    fn write_concern(&self) -> Option<&WriteConcern> {
        self.options
            .as_ref()
            .and_then(|opts| opts.write_concern.as_ref())
    }

    fn retryability(&self) -> Retryability {
        Retryability::Write
    }

    // Updates the write concern to use w: majority and a w_timeout of 10000 if w_timeout is not
    // already set. The write concern on a commitTransaction command should be updated if a
    // commit is being retried internally or by the user.
    fn update_for_retry(&mut self) {
        let options = self
            .options
            .get_or_insert_with(|| TransactionOptions::builder().build());
        match &mut options.write_concern {
            Some(write_concern) => {
                write_concern.w = Some(Acknowledgment::Majority);
                if write_concern.w_timeout.is_none() {
                    write_concern.w_timeout = Some(Duration::from_millis(10000));
                }
            }
            None => {
                options.write_concern = Some(
                    WriteConcern::builder()
                        .w(Acknowledgment::Majority)
                        .w_timeout(Duration::from_millis(10000))
                        .build(),
                );
            }
        }
    }
}