use tokio::sync::mpsc;
use super::Connection;
use crate::{
bson::oid::ObjectId,
error::Error,
runtime::{AcknowledgedMessage, AcknowledgmentReceiver},
sdam::BroadcastMessage,
};
pub(super) fn channel() -> (PoolManager, ManagementRequestReceiver) {
let (sender, receiver) = mpsc::unbounded_channel();
(
PoolManager { sender },
ManagementRequestReceiver { receiver },
)
}
#[derive(Clone, Debug)]
pub(super) struct PoolManager {
sender: mpsc::UnboundedSender<PoolManagementRequest>,
}
impl PoolManager {
pub(super) async fn clear(&self, cause: Error, service_id: Option<ObjectId>) {
let (message, acknowledgment_receiver) = AcknowledgedMessage::package(());
if self
.sender
.send(PoolManagementRequest::Clear {
_completion_handler: message,
cause,
service_id,
})
.is_ok()
{
acknowledgment_receiver.wait_for_acknowledgment().await;
}
}
pub(super) async fn mark_as_ready(&self) {
let (message, listener) = AcknowledgedMessage::package(());
if self
.sender
.send(PoolManagementRequest::MarkAsReady {
completion_handler: message,
})
.is_ok()
{
let _ = listener.wait_for_acknowledgment().await;
}
}
pub(crate) fn check_in(&self, connection: Connection) -> std::result::Result<(), Connection> {
if let Err(request) = self
.sender
.send(PoolManagementRequest::CheckIn(Box::new(connection)))
{
let conn = request.0.unwrap_check_in();
return Err(conn);
}
Ok(())
}
pub(super) fn handle_connection_failed(&self) {
let _ = self
.sender
.send(PoolManagementRequest::HandleConnectionFailed);
}
pub(super) fn handle_connection_succeeded(&self, conn: ConnectionSucceeded) {
let _ = self
.sender
.send(PoolManagementRequest::HandleConnectionSucceeded(conn));
}
pub(super) fn broadcast(&self, msg: BroadcastMessage) -> AcknowledgmentReceiver<()> {
let (msg, ack) = AcknowledgedMessage::package(msg);
let _ = self.sender.send(PoolManagementRequest::Broadcast(msg));
ack
}
}
#[derive(Debug)]
pub(super) struct ManagementRequestReceiver {
pub(super) receiver: mpsc::UnboundedReceiver<PoolManagementRequest>,
}
impl ManagementRequestReceiver {
pub(super) async fn recv(&mut self) -> Option<PoolManagementRequest> {
self.receiver.recv().await
}
}
#[derive(Debug)]
pub(super) enum PoolManagementRequest {
Clear {
_completion_handler: AcknowledgedMessage<()>,
cause: Error,
service_id: Option<ObjectId>,
},
MarkAsReady {
completion_handler: AcknowledgedMessage<()>,
},
CheckIn(Box<Connection>),
HandleConnectionFailed,
HandleConnectionSucceeded(ConnectionSucceeded),
Broadcast(AcknowledgedMessage<BroadcastMessage>),
}
impl PoolManagementRequest {
fn unwrap_check_in(self) -> Connection {
match self {
PoolManagementRequest::CheckIn(conn) => *conn,
_ => panic!("tried to unwrap checkin but got {:?}", self),
}
}
}
#[derive(Debug)]
pub(super) enum ConnectionSucceeded {
ForPool(Box<Connection>),
Used { service_id: Option<ObjectId> },
}
impl ConnectionSucceeded {
pub(super) fn service_id(&self) -> Option<ObjectId> {
match self {
ConnectionSucceeded::ForPool(conn) => conn.generation.service_id(),
ConnectionSucceeded::Used { service_id, .. } => *service_id,
}
}
}