1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use tokio::sync::mpsc;

use super::Connection;
use crate::{
    bson::oid::ObjectId,
    error::Error,
    runtime::{AcknowledgedMessage, AcknowledgmentReceiver},
    sdam::BroadcastMessage,
};

pub(super) fn channel() -> (PoolManager, ManagementRequestReceiver) {
    let (sender, receiver) = mpsc::unbounded_channel();
    (
        PoolManager { sender },
        ManagementRequestReceiver { receiver },
    )
}

/// Struct used to make management requests to the pool (e.g. checking in a connection).
/// A PoolManager will NOT keep a pool from going out of scope and closing.
#[derive(Clone, Debug)]
pub(super) struct PoolManager {
    sender: mpsc::UnboundedSender<PoolManagementRequest>,
}

impl PoolManager {
    /// Lazily clear the pool.
    pub(super) async fn clear(&self, cause: Error, service_id: Option<ObjectId>) {
        let (message, acknowledgment_receiver) = AcknowledgedMessage::package(());
        if self
            .sender
            .send(PoolManagementRequest::Clear {
                _completion_handler: message,
                cause,
                service_id,
            })
            .is_ok()
        {
            acknowledgment_receiver.wait_for_acknowledgment().await;
        }
    }

    /// Mark the pool as "ready" as per the CMAP specification.
    pub(super) async fn mark_as_ready(&self) {
        let (message, listener) = AcknowledgedMessage::package(());
        if self
            .sender
            .send(PoolManagementRequest::MarkAsReady {
                completion_handler: message,
            })
            .is_ok()
        {
            let _ = listener.wait_for_acknowledgment().await;
        }
    }

    /// Check in the given connection to the pool.
    /// This returns an error containing the connection if the pool has been dropped already.
    pub(crate) fn check_in(&self, connection: Connection) -> std::result::Result<(), Connection> {
        if let Err(request) = self
            .sender
            .send(PoolManagementRequest::CheckIn(Box::new(connection)))
        {
            let conn = request.0.unwrap_check_in();
            return Err(conn);
        }
        Ok(())
    }

    /// Notify the pool that establishing a connection failed.
    pub(super) fn handle_connection_failed(&self) {
        let _ = self
            .sender
            .send(PoolManagementRequest::HandleConnectionFailed);
    }

    /// Notify the pool that establishing a connection succeeded.
    pub(super) fn handle_connection_succeeded(&self, conn: ConnectionSucceeded) {
        let _ = self
            .sender
            .send(PoolManagementRequest::HandleConnectionSucceeded(conn));
    }

    pub(super) fn broadcast(&self, msg: BroadcastMessage) -> AcknowledgmentReceiver<()> {
        let (msg, ack) = AcknowledgedMessage::package(msg);
        let _ = self.sender.send(PoolManagementRequest::Broadcast(msg));
        ack
    }
}

#[derive(Debug)]
pub(super) struct ManagementRequestReceiver {
    pub(super) receiver: mpsc::UnboundedReceiver<PoolManagementRequest>,
}

impl ManagementRequestReceiver {
    pub(super) async fn recv(&mut self) -> Option<PoolManagementRequest> {
        self.receiver.recv().await
    }
}

#[derive(Debug)]
pub(super) enum PoolManagementRequest {
    /// Clear the pool, transitioning it to Paused.
    Clear {
        _completion_handler: AcknowledgedMessage<()>,
        cause: Error,
        service_id: Option<ObjectId>,
    },

    /// Mark the pool as Ready, allowing connections to be created and checked out.
    MarkAsReady {
        completion_handler: AcknowledgedMessage<()>,
    },

    /// Check in the given connection.
    CheckIn(Box<Connection>),

    /// Update the pool based on the given establishment error.
    HandleConnectionFailed,

    /// Update the pool after a successful connection, optionally populating the pool
    /// with the successful connection.
    HandleConnectionSucceeded(ConnectionSucceeded),

    /// Handle a broadcast message.
    Broadcast(AcknowledgedMessage<BroadcastMessage>),
}

impl PoolManagementRequest {
    fn unwrap_check_in(self) -> Connection {
        match self {
            PoolManagementRequest::CheckIn(conn) => *conn,
            _ => panic!("tried to unwrap checkin but got {:?}", self),
        }
    }
}

#[derive(Debug)]
pub(super) enum ConnectionSucceeded {
    ForPool(Box<Connection>),
    Used { service_id: Option<ObjectId> },
}

impl ConnectionSucceeded {
    pub(super) fn service_id(&self) -> Option<ObjectId> {
        match self {
            ConnectionSucceeded::ForPool(conn) => conn.generation.service_id(),
            ConnectionSucceeded::Used { service_id, .. } => *service_id,
        }
    }
}