1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
use tokio::sync::{mpsc, oneshot};
use super::Connection;
use crate::{
error::{Error, Result},
runtime::{AsyncJoinHandle, WorkerHandle},
};
/// Returns a new requester/receiver pair.
pub(super) fn channel(handle: WorkerHandle) -> (ConnectionRequester, ConnectionRequestReceiver) {
let (sender, receiver) = mpsc::unbounded_channel();
(
ConnectionRequester {
sender,
_handle: handle,
},
ConnectionRequestReceiver { receiver },
)
}
/// Handle for requesting Connections from the pool.
/// This requester will keep the pool alive. Once all requesters have been dropped,
/// the pool will stop servicing requests, drop its available connections, and close.
#[derive(Clone, Debug)]
pub(super) struct ConnectionRequester {
sender: mpsc::UnboundedSender<ConnectionRequest>,
_handle: WorkerHandle,
}
impl ConnectionRequester {
/// Request a connection from the pool that owns the receiver end of this requester.
pub(super) async fn request(&self) -> ConnectionRequestResult {
let (sender, receiver) = oneshot::channel();
// this only errors if the receiver end is dropped, which can't happen because
// we own a handle to the worker, keeping it alive.
self.sender
.send(ConnectionRequest {
sender,
warm_pool: false,
})
.unwrap();
// similarly, the receiver only returns an error if the sender is dropped, which
// can't happen due to the handle.
receiver.await.unwrap()
}
pub(super) fn weak(&self) -> WeakConnectionRequester {
WeakConnectionRequester {
sender: self.sender.clone(),
}
}
}
/// Handle for requesting Connections from the pool. This does *not* keep the
/// pool alive.
#[derive(Clone, Debug)]
pub(super) struct WeakConnectionRequester {
sender: mpsc::UnboundedSender<ConnectionRequest>,
}
impl WeakConnectionRequester {
pub(super) async fn request_warm_pool(&self) -> Option<ConnectionRequestResult> {
let (sender, receiver) = oneshot::channel();
if self
.sender
.send(ConnectionRequest {
sender,
warm_pool: true,
})
.is_err()
{
return None;
}
receiver.await.ok()
}
}
/// Receiving end of a given ConnectionRequester.
#[derive(Debug)]
pub(super) struct ConnectionRequestReceiver {
receiver: mpsc::UnboundedReceiver<ConnectionRequest>,
}
impl ConnectionRequestReceiver {
pub(super) async fn recv(&mut self) -> Option<ConnectionRequest> {
self.receiver.recv().await
}
}
/// Struct encapsulating a request for a connection.
#[derive(Debug)]
pub(super) struct ConnectionRequest {
sender: oneshot::Sender<ConnectionRequestResult>,
warm_pool: bool,
}
impl ConnectionRequest {
/// Respond to the connection request, either with a pooled connection or one that is
/// establishing asynchronously.
pub(super) fn fulfill(
self,
result: ConnectionRequestResult,
) -> std::result::Result<(), ConnectionRequestResult> {
self.sender.send(result)
}
pub(super) fn is_warm_pool(&self) -> bool {
self.warm_pool
}
}
#[derive(Debug)]
pub(super) enum ConnectionRequestResult {
/// A connection that was already established and was simply checked out of the pool.
Pooled(Box<Connection>),
/// A new connection in the process of being established.
/// The handle can be awaited upon to receive the established connection.
Establishing(AsyncJoinHandle<Result<Connection>>),
/// The request was rejected because the pool was cleared before it could
/// be fulfilled. The error that caused the pool to be cleared is returned.
PoolCleared(Error),
/// The request set `warm_pool: true` and the pool has reached `min_pool_size`.
PoolWarmed,
}
impl ConnectionRequestResult {
pub(super) fn unwrap_pooled_connection(self) -> Connection {
match self {
ConnectionRequestResult::Pooled(c) => *c,
_ => panic!("attempted to unwrap pooled connection when was establishing"),
}
}
}