use std::time::Duration;
use bson::{bson, Bson};
use serde::{Deserialize, Serialize};
use crate::{
bson::{oid::ObjectId, DateTime},
client::ClusterTime,
error::{Error, ErrorKind, Result},
hello::{HelloCommandResponse, HelloReply},
options::ServerAddress,
selection_criteria::TagSet,
serde_util,
};
const DRIVER_MIN_DB_VERSION: &str = "3.6";
const DRIVER_MIN_WIRE_VERSION: i32 = 6;
const DRIVER_MAX_WIRE_VERSION: i32 = 21;
#[derive(Debug, Deserialize, Clone, Copy, Eq, PartialEq, Serialize)]
#[non_exhaustive]
pub enum ServerType {
Standalone,
Mongos,
#[serde(rename = "RSPrimary")]
RsPrimary,
#[serde(rename = "RSSecondary")]
RsSecondary,
#[serde(rename = "RSArbiter")]
RsArbiter,
#[serde(rename = "RSOther")]
RsOther,
#[serde(rename = "RSGhost")]
RsGhost,
LoadBalancer,
#[serde(alias = "PossiblePrimary")]
Unknown,
}
impl ServerType {
pub(crate) fn can_auth(self) -> bool {
!matches!(self, ServerType::RsArbiter)
}
pub(crate) fn is_data_bearing(self) -> bool {
matches!(
self,
ServerType::Standalone
| ServerType::RsPrimary
| ServerType::RsSecondary
| ServerType::Mongos
| ServerType::LoadBalancer
)
}
pub(crate) fn is_available(self) -> bool {
!matches!(self, ServerType::Unknown)
}
}
impl Default for ServerType {
fn default() -> Self {
ServerType::Unknown
}
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub(crate) struct TopologyVersion {
pub(crate) process_id: ObjectId,
pub(crate) counter: i64,
}
impl TopologyVersion {
pub(crate) fn is_more_recent_than(&self, existing_tv: TopologyVersion) -> bool {
self.process_id != existing_tv.process_id || self.counter > existing_tv.counter
}
}
impl From<TopologyVersion> for Bson {
fn from(tv: TopologyVersion) -> Self {
bson!({
"processId": tv.process_id,
"counter": tv.counter
})
}
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct ServerDescription {
pub(crate) address: ServerAddress,
pub(crate) server_type: ServerType,
pub(crate) last_update_time: Option<DateTime>,
pub(crate) average_round_trip_time: Option<Duration>,
#[serde(serialize_with = "serde_util::serialize_result_error_as_string")]
pub(crate) reply: Result<Option<HelloReply>>,
}
fn hello_command_eq(a: &HelloCommandResponse, b: &HelloCommandResponse) -> bool {
a.server_type() == b.server_type()
&& a.min_wire_version == b.min_wire_version
&& a.max_wire_version == b.max_wire_version
&& a.me == b.me
&& a.hosts == b.hosts
&& a.passives == b.passives
&& a.arbiters == b.arbiters
&& a.tags == b.tags
&& a.set_name == b.set_name
&& a.set_version == b.set_version
&& a.election_id == b.election_id
&& a.primary == b.primary
&& a.logical_session_timeout_minutes == b.logical_session_timeout_minutes
&& a.topology_version == b.topology_version
}
impl PartialEq for ServerDescription {
fn eq(&self, other: &Self) -> bool {
if self.address != other.address || self.server_type != other.server_type {
return false;
}
match (self.reply.as_ref(), other.reply.as_ref()) {
(Ok(self_reply), Ok(other_reply)) => {
let self_response = self_reply.as_ref().map(|r| &r.command_response);
let other_response = other_reply.as_ref().map(|r| &r.command_response);
match (self_response, other_response) {
(Some(a), Some(b)) => hello_command_eq(a, b),
(None, None) => true,
_ => false,
}
}
(Err(self_err), Err(other_err)) => {
match (self_err.kind.as_ref(), other_err.kind.as_ref()) {
(
ErrorKind::Command(self_command_err),
ErrorKind::Command(other_command_err),
) => self_command_err.code == other_command_err.code,
_ => self_err.to_string() == other_err.to_string(),
}
}
_ => false,
}
}
}
impl ServerDescription {
pub(crate) fn new(address: ServerAddress) -> Self {
Self {
address: match address {
ServerAddress::Tcp { host, port } => ServerAddress::Tcp {
host: host.to_lowercase(),
port,
},
#[cfg(unix)]
ServerAddress::Unix { path } => ServerAddress::Unix { path },
},
server_type: Default::default(),
last_update_time: None,
reply: Ok(None),
average_round_trip_time: None,
}
}
pub(crate) fn new_from_hello_reply(
address: ServerAddress,
mut reply: HelloReply,
average_rtt: Duration,
) -> Self {
let mut description = Self::new(address);
description.average_round_trip_time = Some(average_rtt);
description.last_update_time = Some(DateTime::now());
description.server_type = reply.command_response.server_type();
if let Some(ref mut hosts) = reply.command_response.hosts {
let normalized_hostnames = hosts
.drain(..)
.map(|hostname| hostname.to_lowercase())
.collect();
*hosts = normalized_hostnames;
}
if let Some(ref mut passives) = reply.command_response.passives {
let normalized_hostnames = passives
.drain(..)
.map(|hostname| hostname.to_lowercase())
.collect();
*passives = normalized_hostnames;
}
if let Some(ref mut arbiters) = reply.command_response.arbiters {
let normalized_hostnames = arbiters
.drain(..)
.map(|hostname| hostname.to_lowercase())
.collect();
*arbiters = normalized_hostnames;
}
if let Some(ref mut me) = reply.command_response.me {
*me = me.to_lowercase();
}
description.reply = Ok(Some(reply));
description
}
pub(crate) fn new_from_error(address: ServerAddress, error: Error) -> Self {
let mut description = Self::new(address);
description.last_update_time = Some(DateTime::now());
description.average_round_trip_time = None;
description.reply = Err(error);
description
}
pub(crate) fn is_available(&self) -> bool {
self.server_type.is_available()
}
pub(crate) fn compatibility_error_message(&self) -> Option<String> {
if let Ok(Some(ref reply)) = self.reply {
let hello_min_wire_version = reply.command_response.min_wire_version.unwrap_or(0);
if hello_min_wire_version > DRIVER_MAX_WIRE_VERSION {
return Some(format!(
"Server at {} requires wire version {}, but this version of the MongoDB Rust \
driver only supports up to {}",
self.address, hello_min_wire_version, DRIVER_MAX_WIRE_VERSION,
));
}
let hello_max_wire_version = reply.command_response.max_wire_version.unwrap_or(0);
if hello_max_wire_version < DRIVER_MIN_WIRE_VERSION {
return Some(format!(
"Server at {} reports wire version {}, but this version of the MongoDB Rust \
driver requires at least {} (MongoDB {}).",
self.address,
hello_max_wire_version,
DRIVER_MIN_WIRE_VERSION,
DRIVER_MIN_DB_VERSION
));
}
}
None
}
pub(crate) fn set_name(&self) -> Result<Option<String>> {
let set_name = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.set_name.clone());
Ok(set_name)
}
pub(crate) fn known_hosts(&self) -> Result<impl Iterator<Item = &String>> {
let known_hosts = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.map(|reply| {
let hosts = reply.command_response.hosts.as_ref();
let passives = reply.command_response.passives.as_ref();
let arbiters = reply.command_response.arbiters.as_ref();
hosts
.into_iter()
.flatten()
.chain(passives.into_iter().flatten())
.chain(arbiters.into_iter().flatten())
});
Ok(known_hosts.into_iter().flatten())
}
pub(crate) fn invalid_me(&self) -> Result<bool> {
if let Some(ref reply) = self.reply.as_ref().map_err(Clone::clone)? {
if let Some(ref me) = reply.command_response.me {
return Ok(&self.address.to_string() != me);
}
}
Ok(false)
}
pub(crate) fn set_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.set_version);
Ok(me)
}
pub(crate) fn election_id(&self) -> Result<Option<ObjectId>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.election_id);
Ok(me)
}
#[cfg(test)]
pub(crate) fn min_wire_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.min_wire_version);
Ok(me)
}
pub(crate) fn max_wire_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.max_wire_version);
Ok(me)
}
pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>> {
match self.reply {
Ok(None) => Ok(None),
Ok(Some(ref reply)) => Ok(reply
.command_response
.last_write
.as_ref()
.map(|write| write.last_write_date)),
Err(ref e) => Err(e.clone()),
}
}
pub(crate) fn logical_session_timeout(&self) -> Result<Option<Duration>> {
match self.reply {
Ok(None) => Ok(None),
Ok(Some(ref reply)) => Ok(reply
.command_response
.logical_session_timeout_minutes
.map(|timeout| Duration::from_secs(timeout as u64 * 60))),
Err(ref e) => Err(e.clone()),
}
}
pub(crate) fn cluster_time(&self) -> Result<Option<ClusterTime>> {
match self.reply {
Ok(None) => Ok(None),
Ok(Some(ref reply)) => Ok(reply.cluster_time.clone()),
Err(ref e) => Err(e.clone()),
}
}
pub(crate) fn topology_version(&self) -> Option<TopologyVersion> {
match self.reply {
Ok(None) => None,
Ok(Some(ref reply)) => reply.command_response.topology_version,
Err(ref e) => e.topology_version(),
}
}
pub(crate) fn matches_tag_set(&self, tag_set: &TagSet) -> bool {
let reply = match self.reply.as_ref() {
Ok(Some(ref reply)) => reply,
_ => return false,
};
let server_tags = match reply.command_response.tags {
Some(ref tags) => tags,
None => return false,
};
tag_set
.iter()
.all(|(key, val)| server_tags.get(key) == Some(val))
}
}