1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
use super::group_by_builder::*;
use crate::{
constants::*,
cursor::{CursorBuilder, CursorData},
filter::{FilterPrefix, MongoFilterVisitor},
join::JoinStage,
orderby::OrderByBuilder,
query_strings::Aggregate,
root_queries::observing,
vacuum_cursor, BsonTransform, IntoBson,
};
use connector_interface::AggregationSelection;
use itertools::Itertools;
use mongodb::{
bson::{doc, Document},
options::AggregateOptions,
ClientSession, Collection,
};
use query_structure::{FieldSelection, Filter, Model, QueryArguments, ScalarFieldRef, VirtualSelection};
use std::convert::TryFrom;
// Mongo Driver broke usage of the simple API, can't be used by us anymore.
// As such the read query will always be based on aggregation pipeline
// such pipeline will have different stages. See
// https://www.mongodb.com/docs/manual/core/aggregation-pipeline/
pub struct ReadQuery {
pub(crate) stages: Vec<Document>,
}
impl ReadQuery {
pub async fn execute(
self,
on_collection: Collection<Document>,
with_session: &mut ClientSession,
) -> crate::Result<Vec<Document>> {
let opts = AggregateOptions::builder().allow_disk_use(true).build();
let query_string_builder = Aggregate::new(&self.stages, on_collection.name());
let cursor = observing(&query_string_builder, || {
on_collection.aggregate_with_session(self.stages.clone(), opts, with_session)
})
.await?;
vacuum_cursor(cursor, with_session).await
}
}
/// Translated query arguments ready to use in mongo find or aggregation queries.
#[derive(Debug)]
pub(crate) struct MongoReadQueryBuilder {
pub(crate) model: Model,
/// Pre-join, "normal" filters.
pub(crate) query: Option<Document>,
/// Join stages.
pub(crate) joins: Vec<JoinStage>,
/// Filters that can only be applied after the joins
/// or aggregations added the required data to execute them.
pub(crate) join_filters: Vec<Document>,
/// Aggregation-related stages.
pub(crate) aggregations: Vec<Document>,
/// Filters that can only be applied after the aggregations
/// transformed the documents.
pub(crate) aggregation_filters: Vec<Document>,
/// Order by builder for deferred processing.
order_builder: Option<OrderByBuilder>,
/// Finalized ordering: Order document.
pub(crate) order: Option<Document>,
/// Finalized ordering: Necessary Joins
/// Kept separate as cursor building needs to consider them seperately.
pub(crate) order_joins: Vec<JoinStage>,
/// Finalized ordering aggregation computed from the joins
pub(crate) order_aggregate_projections: Vec<Document>,
/// Cursor builder for deferred processing.
cursor_builder: Option<CursorBuilder>,
/// Struct containing data required to build cursor queries.
pub(crate) cursor_data: Option<CursorData>,
/// Skip a number of documents at the start of the result.
pub(crate) skip: Option<u64>,
/// Take only a certain number of documents from the result.
pub(crate) limit: Option<i64>,
/// Projection document to scope down return fields.
pub(crate) projection: Option<Document>,
/// Switch to indicate the underlying aggregation is a `group_by` query.
/// This is due to legacy drift in how `aggregate` and `group_by` work in
/// the API and will hopefully be merged again in the future.
pub(crate) is_group_by_query: bool,
}
impl MongoReadQueryBuilder {
pub fn new(model: Model) -> Self {
Self {
model,
query: None,
joins: vec![],
join_filters: vec![],
aggregations: vec![],
aggregation_filters: vec![],
order_builder: None,
order: None,
order_joins: vec![],
order_aggregate_projections: vec![],
cursor_builder: None,
cursor_data: None,
skip: None,
limit: None,
projection: None,
is_group_by_query: false,
}
}
pub(crate) fn from_args(args: QueryArguments) -> crate::Result<MongoReadQueryBuilder> {
let reverse_order = args.take.map(|t| t < 0).unwrap_or(false);
let order_by = args.order_by;
let order_builder = Some(OrderByBuilder::new(order_by.clone(), reverse_order));
let cursor_builder = args.cursor.map(|c| CursorBuilder::new(c, order_by, reverse_order));
let mut post_filters = vec![];
let mut joins = vec![];
let query = match args.filter {
Some(filter) => {
// If a filter comes with joins, it needs to be run _after_ the initial filter query / $matches.
let (filter, filter_joins) = MongoFilterVisitor::new(FilterPrefix::default(), false)
.visit(filter)?
.render();
if !filter_joins.is_empty() {
joins.extend(filter_joins);
post_filters.push(filter);
None
} else {
Some(filter)
}
}
None => None,
};
Ok(MongoReadQueryBuilder {
model: args.model,
query,
join_filters: post_filters,
joins,
order_builder,
cursor_builder,
skip: skip(args.skip.map(|i| i as u64), args.ignore_skip),
limit: take(args.take, args.ignore_take),
aggregations: vec![],
aggregation_filters: vec![],
order: None,
order_joins: vec![],
order_aggregate_projections: vec![],
cursor_data: None,
projection: None,
is_group_by_query: false,
})
}
/// Finalizes the builder and builds a `MongoQuery`.
pub(crate) fn build(mut self) -> crate::Result<ReadQuery> {
self.finalize()?;
Ok(self.build_pipeline_query())
}
/// Aggregation-pipeline based query. A distinction must be made between cursored and uncursored queries,
/// as they require different stage shapes (see individual fns for details).
fn build_pipeline_query(self) -> ReadQuery {
let stages = if self.cursor_data.is_none() {
self.into_pipeline_stages()
} else {
self.cursored_pipeline_stages()
};
ReadQuery { stages }
}
fn into_pipeline_stages(self) -> Vec<Document> {
let mut stages = vec![];
// Initial $matches
if let Some(query) = self.query {
stages.push(doc! { "$match": { "$expr": query } })
};
// Joins ($lookup)
let joins = self.joins.into_iter().chain(self.order_joins);
let mut unwinds: Vec<Document> = vec![];
for join_stage in joins {
let (join, unwind) = join_stage.build();
if let Some(u) = unwind {
unwinds.push(u);
}
stages.push(join);
}
// Order by aggregate computed from joins ($addFields)
stages.extend(self.order_aggregate_projections);
// Post-join $matches
stages.extend(
self.join_filters
.into_iter()
.map(|filter| doc! { "$match": { "$expr": filter } }),
);
// If the query is a group by, then skip, take, sort all apply to the _groups_, not the documents
// before. If it is a plain aggregation, then the aggregate stages need to be _after_ these, because
// they apply to the documents to be aggregated, not the aggregations (legacy meh).
if self.is_group_by_query {
// Aggregates
stages.extend(self.aggregations.clone());
// Aggregation filters
stages.extend(
self.aggregation_filters
.clone()
.into_iter()
.map(|filter| doc! { "$match": { "$expr": filter } }),
);
}
// Join's $unwind placed before sorting
// because Mongo does not support sorting multiple arrays
// https://jira.mongodb.org/browse/SERVER-32859
stages.extend(unwinds);
// $sort
if let Some(order) = self.order {
stages.push(doc! { "$sort": order })
};
// $skip
if let Some(skip) = self.skip {
stages.push(doc! { "$skip": i64::try_from(skip).unwrap() });
};
// $limit
if let Some(limit) = self.limit {
stages.push(doc! { "$limit": limit });
};
// $project
if let Some(projection) = self.projection {
stages.push(doc! { "$project": projection });
};
if !self.is_group_by_query {
// Aggregates
stages.extend(self.aggregations);
// Aggregation filters
stages.extend(
self.aggregation_filters
.into_iter()
.map(|filter| doc! { "$match": { "$expr": filter } }),
);
}
stages
}
/// Pipeline query with a cursor. Requires special building to form a query that first
/// pins a cursor and then builds cursor conditions based on that cursor document
/// and the orderings that the query defined.
/// The stages have the form:
/// ```text
/// testModel.aggregate([
/// { $match: { <filter finding exactly one document (cursor doc)> }},
/// { $lookup: { <if present, join that are required for orderBy relations> }}
/// { ... more joins if necessary ... }
/// {
/// $lookup: <"self join" testModel and execute non-cursor query with cursor filter here.>
/// }
/// ])
/// ```
/// Expressed in words, this query first makes the cursor document (that defines all values
/// to make cursor comparators work) available for the inner pipeline to build the filters.
/// The inner pipeline is basically what an non-cursor query would look like with added cursor
/// conditions. The inner join stage is refered to as a self-join here because it joins the cursor document
/// to it's collection again to pull in all documents for filtering, but technically it doesn't
/// actually join anything.
///
/// Todo concrete example
fn cursored_pipeline_stages(mut self) -> Vec<Document> {
let coll_name = self.model.db_name().to_owned();
let cursor_data = self.cursor_data.take().unwrap();
// For now we assume that simply putting the cursor condition into the join conditions is enough
// to let them run in the correct place.
self.join_filters.push(cursor_data.cursor_condition);
let order_join_stages = self
.order_joins
.clone()
.into_iter()
.map(|nested_stage| {
let (join, _) = nested_stage.build();
join
})
.collect_vec();
// Outer query to pin the cursor document.
let mut outer_stages = vec![];
// First match the cursor, then add required ordering joins.
outer_stages.push(doc! { "$match": { "$expr": cursor_data.cursor_filter } });
outer_stages.extend(order_join_stages);
outer_stages.extend(self.order_aggregate_projections.clone());
// Self-"join" collection
let inner_stages = self.into_pipeline_stages();
outer_stages.push(doc! {
"$lookup": {
"from": coll_name,
"let": cursor_data.bindings,
"pipeline": inner_stages,
"as": "cursor_inner",
}
});
outer_stages.push(doc! { "$unwind": "$cursor_inner" });
outer_stages.push(doc! { "$replaceRoot": { "newRoot": "$cursor_inner" } });
outer_stages
}
/// Adds a final projection onto the fields specified by the `FieldSelection`.
pub fn with_model_projection(mut self, selected_fields: FieldSelection) -> crate::Result<Self> {
let projection = selected_fields.into_bson()?.into_document()?;
self.projection = Some(projection);
Ok(self)
}
/// Adds the necessary joins and the associated selections to the projection
pub fn with_virtual_fields<'a>(
mut self,
virtual_selections: impl Iterator<Item = &'a VirtualSelection>,
) -> crate::Result<Self> {
for aggr in virtual_selections {
let join = match aggr {
VirtualSelection::RelationCount(rf, filter) => {
let filter = filter
.as_ref()
.map(|f| MongoFilterVisitor::new(FilterPrefix::default(), false).visit(f.clone()))
.transpose()?;
JoinStage {
source: rf.clone(),
alias: Some(aggr.db_alias()),
nested: vec![],
filter,
}
}
};
let projection = doc! {
aggr.db_alias(): { "$size": format!("${}", aggr.db_alias()) }
};
self.joins.push(join);
self.projection = self.projection.map_or(Some(projection.clone()), |mut p| {
p.extend(projection);
Some(p)
});
}
Ok(self)
}
/// Adds group-by fields with their aggregations to this query.
pub fn with_groupings(
mut self,
by_fields: Vec<ScalarFieldRef>,
selections: &[AggregationSelection],
having: Option<Filter>,
) -> crate::Result<Self> {
if !by_fields.is_empty() {
self.is_group_by_query = true;
}
let mut group_by = GroupByBuilder::new();
group_by.with_selections(selections);
if let Some(having) = having {
group_by.with_having_filter(&having);
// Having filters can only appear in group by queries.
// All group by fields go into the UNDERSCORE_ID key of the result document.
// As it is the only place where the flat scalars are contained for the group,
// we need to refer to that object.
let prefix = FilterPrefix::from(group_by::UNDERSCORE_ID);
let (filter_doc, _) = MongoFilterVisitor::new(prefix, false).visit(having)?.render();
self.aggregation_filters.push(filter_doc);
}
let (grouping_stage, project_stage) = group_by.render(by_fields);
self.aggregations.push(doc! { "$group": grouping_stage });
if let Some(project_stage) = project_stage {
self.aggregations.push(doc! { "$project": project_stage });
}
Ok(self)
}
/// Runs last transformations on `self` to execute steps dependent on base args.
fn finalize(&mut self) -> crate::Result<()> {
// Cursor building depends on the ordering, so it must come first.
if let Some(order_builder) = self.order_builder.take() {
let (order, order_aggregate_projections, joins) = order_builder.build(self.is_group_by_query);
self.order_joins.extend(joins);
self.order = order;
self.order_aggregate_projections = order_aggregate_projections;
}
if let Some(cursor_builder) = self.cursor_builder.take() {
let cursor_data = cursor_builder.build()?;
self.cursor_data = Some(cursor_data);
}
Ok(())
}
}
fn skip(skip: Option<u64>, ignore: bool) -> Option<u64> {
if ignore {
None
} else {
skip
}
}
fn take(take: Option<i64>, ignore: bool) -> Option<i64> {
if ignore {
None
} else {
take.map(|t| if t < 0 { -t } else { t })
}
}