1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
//! Contains options for ChangeStreams.
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use std::time::Duration;
use typed_builder::TypedBuilder;
use crate::{
bson::{Bson, Timestamp},
change_stream::event::ResumeToken,
collation::Collation,
concern::ReadConcern,
options::AggregateOptions,
selection_criteria::SelectionCriteria,
};
/// These are the valid options that can be passed to the `watch` method for creating a
/// [`ChangeStream`](crate::change_stream::ChangeStream).
#[skip_serializing_none]
#[derive(Clone, Debug, Default, Deserialize, Serialize, TypedBuilder)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ChangeStreamOptions {
#[rustfmt::skip]
/// Configures how the
/// [`ChangeStreamEvent::full_document`](crate::change_stream::event::ChangeStreamEvent::full_document)
/// field will be populated. By default, the field will be empty for updates.
#[builder(default)]
pub full_document: Option<FullDocumentType>,
/// Configures how the
/// [`ChangeStreamEvent::full_document_before_change`](
/// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field will be
/// populated. By default, the field will be empty for updates.
#[builder(default)]
pub full_document_before_change: Option<FullDocumentBeforeChangeType>,
/// Specifies the logical starting point for the new change stream. Note that if a watched
/// collection is dropped and recreated or newly renamed, `start_after` should be set instead.
/// `resume_after` and `start_after` cannot be set simultaneously.
///
/// For more information on resuming a change stream see the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-after)
#[builder(default)]
pub resume_after: Option<ResumeToken>,
/// The change stream will only provide changes that occurred at or after the specified
/// timestamp. Any command run against the server will return an operation time that can be
/// used here.
#[builder(default)]
pub start_at_operation_time: Option<Timestamp>,
/// Takes a resume token and starts a new change stream returning the first notification after
/// the token. This will allow users to watch collections that have been dropped and
/// recreated or newly renamed collections without missing any notifications.
///
/// This feature is only available on MongoDB 4.2+.
///
/// See the documentation [here](https://www.mongodb.com/docs/master/changeStreams/#change-stream-start-after) for more
/// information.
#[builder(default)]
pub start_after: Option<ResumeToken>,
/// If `true`, the change stream will monitor all changes for the given cluster.
#[builder(default, setter(skip))]
pub(crate) all_changes_for_cluster: Option<bool>,
/// The maximum amount of time for the server to wait on new documents to satisfy a change
/// stream query.
#[builder(default)]
#[serde(skip_serializing)]
pub max_await_time: Option<Duration>,
/// The number of documents to return per batch.
#[builder(default)]
#[serde(skip_serializing)]
pub batch_size: Option<u32>,
/// Specifies a collation.
#[builder(default)]
#[serde(skip_serializing)]
pub collation: Option<Collation>,
/// The read concern to use for the operation.
///
/// If none is specified, the read concern defined on the object executing this operation will
/// be used.
#[builder(default)]
#[serde(skip_serializing)]
pub read_concern: Option<ReadConcern>,
/// The criteria used to select a server for this operation.
///
/// If none is specified, the selection criteria defined on the object executing this operation
/// will be used.
#[builder(default)]
#[serde(skip_serializing)]
pub selection_criteria: Option<SelectionCriteria>,
/// Tags the query with an arbitrary [`Bson`] value to help trace the operation through the
/// database profiler, currentOp and logs.
///
/// The comment can be any [`Bson`] value on server versions 4.4+. On lower server versions,
/// the comment must be a [`Bson::String`] value.
#[builder(default)]
pub comment: Option<Bson>,
}
impl ChangeStreamOptions {
pub(crate) fn aggregate_options(&self) -> AggregateOptions {
AggregateOptions::builder()
.batch_size(self.batch_size)
.collation(self.collation.clone())
.max_await_time(self.max_await_time)
.read_concern(self.read_concern.clone())
.selection_criteria(self.selection_criteria.clone())
.comment_bson(self.comment.clone())
.build()
}
}
/// Describes the modes for configuring the
/// [`ChangeStreamEvent::full_document`](
/// crate::change_stream::event::ChangeStreamEvent::full_document) field.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum FullDocumentType {
/// The field will be populated with a copy of the entire document that was updated.
UpdateLookup,
/// The field will be populated for replace and update change events if the post-image for this
/// event is available.
WhenAvailable,
/// The same behavior as `WhenAvailable` except that an error is raised if the post-image is
/// not available.
Required,
/// User-defined other types for forward compatibility.
Other(String),
}
/// Describes the modes for configuring the
/// [`ChangeStreamEvent::full_document_before_change`](
/// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum FullDocumentBeforeChangeType {
/// The field will be populated for replace, update, and delete change events if the pre-image
/// for this event is available.
WhenAvailable,
/// The same behavior as `WhenAvailable` except that an error is raised if the pre-image is
/// not available.
Required,
/// Do not send a value.
Off,
/// User-defined other types for forward compatibility.
Other(String),
}