1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
use std::time::Duration;
use bson::RawDocumentBuf;
use serde::{Deserialize, Serialize};
use crate::{
bson::{doc, oid::ObjectId, DateTime, Document, Timestamp},
client::{
options::{ServerAddress, ServerApi},
ClusterTime,
},
cmap::{Command, Connection},
error::Result,
sdam::{ServerType, TopologyVersion},
selection_criteria::TagSet,
};
/// The legacy version of the `hello` command which was deprecated in 5.0.
/// To limit usages of the legacy name in the codebase, this constant should be used
/// wherever possible.
pub(crate) const LEGACY_HELLO_COMMAND_NAME: &str = "isMaster";
pub(crate) const LEGACY_HELLO_COMMAND_NAME_LOWERCASE: &str = "ismaster";
#[derive(Debug, Clone, Copy)]
pub(crate) struct AwaitableHelloOptions {
pub(crate) topology_version: TopologyVersion,
pub(crate) max_await_time: Duration,
}
/// Construct a hello or legacy hello command, depending on the circumstances.
///
/// If an API version is provided or `load_balanced` is true, `hello` will be used.
/// If the server indicated `helloOk: true`, then `hello` will also be used.
/// Otherwise, legacy hello will be used, and if it's unknown whether the server supports hello,
/// the command also will contain `helloOk: true`.
pub(crate) fn hello_command(
server_api: Option<&ServerApi>,
load_balanced: Option<bool>,
hello_ok: Option<bool>,
awaitable_options: Option<AwaitableHelloOptions>,
) -> Command {
let (mut command, command_name) = if server_api.is_some()
|| matches!(load_balanced, Some(true))
|| matches!(hello_ok, Some(true))
{
(doc! { "hello": 1 }, "hello")
} else {
let mut cmd = doc! { LEGACY_HELLO_COMMAND_NAME: 1 };
if hello_ok.is_none() {
cmd.insert("helloOk", true);
}
(cmd, LEGACY_HELLO_COMMAND_NAME)
};
if let Some(opts) = awaitable_options {
command.insert("topologyVersion", opts.topology_version);
command.insert("maxAwaitTimeMS", opts.max_await_time.as_millis() as i64);
}
let mut command = Command::new(command_name.into(), "admin".into(), command);
if let Some(server_api) = server_api {
command.set_server_api(server_api);
}
command.exhaust_allowed = awaitable_options.is_some();
command
}
/// Execute a hello or legacy hello command.
pub(crate) async fn run_hello(conn: &mut Connection, command: Command) -> Result<HelloReply> {
let response_result = conn.send_command(command, None).await;
response_result.and_then(|raw_response| raw_response.into_hello_reply())
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct HelloReply {
pub(crate) server_address: ServerAddress,
pub(crate) command_response: HelloCommandResponse,
pub(crate) raw_command_response: RawDocumentBuf,
pub(crate) cluster_time: Option<ClusterTime>,
}
/// The response to a `hello` command.
///
/// See the documentation [here](https://www.mongodb.com/docs/manual/reference/command/hello/) for more details.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct HelloCommandResponse {
/// Whether the server is writable. If true, this instance is a primary in a replica set, a
/// mongos instance, or a standalone mongod.
pub is_writable_primary: Option<bool>,
#[serde(rename = "ismaster")]
/// Legacy name for `is_writable_primary` field.
pub is_master: Option<bool>,
/// Whether or not the server supports using the `hello` command for monitoring instead
/// of the legacy hello command.
pub hello_ok: Option<bool>,
/// The list of all hosts.
pub hosts: Option<Vec<String>>,
/// The list of all passives in a replica set.
pub passives: Option<Vec<String>>,
/// The list of all arbiters in a replica set.
pub arbiters: Option<Vec<String>>,
/// An optional message. This contains the value "isdbgrid" when returned from a mongos.
pub msg: Option<String>,
/// The address of the server that returned this `HelloCommandResponse`.
pub me: Option<String>,
#[serde(rename = "compression")]
/// The list of compatible compressors that the server returned.
pub compressors: Option<Vec<String>>,
/// The current replica set config version.
pub set_version: Option<i32>,
/// The name of the current replica set.
pub set_name: Option<String>,
/// Whether the server is hidden.
pub hidden: Option<bool>,
/// Whether the server is a secondary.
pub secondary: Option<bool>,
/// Whether the server is an arbiter.
pub arbiter_only: Option<bool>,
#[serde(rename = "isreplicaset")]
/// Whether the server is a replica set.
pub is_replica_set: Option<bool>,
/// The time in minutes that a session remains active after its most recent use.
pub logical_session_timeout_minutes: Option<i64>,
/// Optime and date information for the server's most recent write operation.
pub last_write: Option<LastWrite>,
/// The minimum wire version that the server supports.
pub min_wire_version: Option<i32>,
/// The maximum wire version that the server supports.
pub max_wire_version: Option<i32>,
/// User-defined tags for a replica set member.
pub tags: Option<TagSet>,
/// A unique identifier for each election.
pub election_id: Option<ObjectId>,
/// The address of current primary member of the replica set.
pub primary: Option<String>,
/// A list of SASL mechanisms used to create the user's credential(s).
pub sasl_supported_mechs: Option<Vec<String>>,
/// The reply to speculative authentication done in the authentication handshake.
pub speculative_authenticate: Option<Document>,
/// The maximum permitted size of a BSON object in bytes.
pub max_bson_object_size: i64,
/// The maximum number of write operations permitted in a write batch.
pub max_write_batch_size: Option<i64>,
/// If the connection is to a load balancer, the id of the selected backend.
pub service_id: Option<ObjectId>,
/// For internal use.
pub topology_version: Option<TopologyVersion>,
/// The maximum permitted size of a BSON wire protocol message.
pub max_message_size_bytes: i32,
/// The server-generated ID for the connection the "hello" command was run on.
/// Present on server versions 4.2+.
pub connection_id: Option<i64>,
}
impl HelloCommandResponse {
pub(crate) fn server_type(&self) -> ServerType {
if self.msg.as_deref() == Some("isdbgrid") {
ServerType::Mongos
} else if self.set_name.is_some() {
if self.hidden == Some(true) {
ServerType::RsOther
} else if self.is_writable_primary == Some(true) || self.is_master == Some(true) {
ServerType::RsPrimary
} else if self.secondary == Some(true) {
ServerType::RsSecondary
} else if self.arbiter_only == Some(true) {
ServerType::RsArbiter
} else {
ServerType::RsOther
}
} else if self.is_replica_set == Some(true) {
ServerType::RsGhost
} else {
ServerType::Standalone
}
}
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct LastWrite {
pub last_write_date: DateTime,
}
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub(crate) struct OpTime {
ts: Timestamp,
t: i32,
}