#[cfg(test)]
pub(crate) mod test;
pub(crate) mod conn;
mod connection_requester;
pub(crate) mod establish;
mod manager;
pub(crate) mod options;
mod status;
mod worker;
use std::time::Instant;
use derivative::Derivative;
pub use self::conn::ConnectionInfo;
pub(crate) use self::{
conn::{Command, Connection, RawCommand, RawCommandResponse, StreamDescription},
status::PoolGenerationSubscriber,
worker::PoolGeneration,
};
use self::{
connection_requester::ConnectionRequestResult,
establish::ConnectionEstablisher,
options::ConnectionPoolOptions,
};
use crate::{
bson::oid::ObjectId,
error::{Error, Result},
event::cmap::{
CmapEvent,
CmapEventEmitter,
ConnectionCheckoutFailedEvent,
ConnectionCheckoutFailedReason,
ConnectionCheckoutStartedEvent,
PoolCreatedEvent,
},
options::ServerAddress,
runtime::AcknowledgmentReceiver,
sdam::{BroadcastMessage, TopologyUpdater},
};
use connection_requester::ConnectionRequester;
use manager::PoolManager;
use worker::ConnectionPoolWorker;
#[cfg(test)]
use crate::runtime::WorkerHandle;
pub(crate) const DEFAULT_MAX_POOL_SIZE: u32 = 10;
#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub(crate) struct ConnectionPool {
address: ServerAddress,
manager: PoolManager,
connection_requester: ConnectionRequester,
generation_subscriber: PoolGenerationSubscriber,
#[derivative(Debug = "ignore")]
event_emitter: CmapEventEmitter,
}
impl ConnectionPool {
pub(crate) fn new(
address: ServerAddress,
connection_establisher: ConnectionEstablisher,
server_updater: TopologyUpdater,
topology_id: ObjectId,
options: Option<ConnectionPoolOptions>,
) -> Self {
let event_handler = options
.as_ref()
.and_then(|opts| opts.cmap_event_handler.clone());
let event_emitter = CmapEventEmitter::new(event_handler, topology_id);
let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start(
address.clone(),
connection_establisher,
server_updater,
event_emitter.clone(),
options.clone(),
);
event_emitter.emit_event(|| {
CmapEvent::PoolCreated(PoolCreatedEvent {
address: address.clone(),
options: options.map(|o| o.to_event_options()),
})
});
Self {
address,
manager,
connection_requester,
generation_subscriber,
event_emitter,
}
}
#[cfg(test)]
pub(crate) fn new_mocked(address: ServerAddress) -> Self {
let (manager, _) = manager::channel();
let handle = WorkerHandle::new_mocked();
let (connection_requester, _) = connection_requester::channel(handle);
let (_, generation_subscriber) = status::channel(PoolGeneration::normal());
Self {
address,
manager,
connection_requester,
generation_subscriber,
event_emitter: CmapEventEmitter::new(None, ObjectId::new()),
}
}
pub(crate) async fn check_out(&self) -> Result<Connection> {
let time_started = Instant::now();
self.event_emitter.emit_event(|| {
ConnectionCheckoutStartedEvent {
address: self.address.clone(),
}
.into()
});
let response = self.connection_requester.request().await;
let conn = match response {
ConnectionRequestResult::Pooled(c) => Ok(*c),
ConnectionRequestResult::Establishing(task) => task.await,
ConnectionRequestResult::PoolCleared(e) => {
Err(Error::pool_cleared_error(&self.address, &e))
}
ConnectionRequestResult::PoolWarmed => {
Err(Error::internal("Invalid result from connection requester"))
}
};
match conn {
Ok(ref conn) => {
self.event_emitter
.emit_event(|| conn.checked_out_event(time_started).into());
}
Err(ref _err) => {
self.event_emitter.emit_event(|| {
ConnectionCheckoutFailedEvent {
address: self.address.clone(),
reason: ConnectionCheckoutFailedReason::ConnectionError,
#[cfg(feature = "tracing-unstable")]
error: Some(_err.clone()),
duration: Instant::now() - time_started,
}
.into()
});
}
}
conn
}
pub(crate) async fn clear(&self, cause: Error, service_id: Option<ObjectId>) {
self.manager.clear(cause, service_id).await
}
pub(crate) async fn mark_as_ready(&self) {
self.manager.mark_as_ready().await
}
pub(crate) fn generation(&self) -> PoolGeneration {
self.generation_subscriber.generation()
}
pub(crate) fn broadcast(&self, msg: BroadcastMessage) -> AcknowledgmentReceiver<()> {
self.manager.broadcast(msg)
}
}