use super::coerce::{coerce_record_with_json_relation, IndexedSelection};
use crate::{
column_metadata,
model_extensions::*,
query_arguments_ext::QueryArgumentsExt,
query_builder::{self, read},
Context, QueryExt, Queryable, SqlError,
};
use connector_interface::*;
use futures::stream::{FuturesUnordered, StreamExt};
use quaint::ast::*;
use query_structure::*;
pub(crate) async fn get_single_record(
conn: &dyn Queryable,
model: &Model,
filter: &Filter,
selected_fields: &FieldSelection,
relation_load_strategy: RelationLoadStrategy,
ctx: &Context<'_>,
) -> crate::Result<Option<SingleRecord>> {
match relation_load_strategy {
RelationLoadStrategy::Join => get_single_record_joins(conn, model, filter, selected_fields, ctx).await,
RelationLoadStrategy::Query => get_single_record_wo_joins(conn, model, filter, selected_fields, ctx).await,
}
}
pub(crate) async fn get_single_record_joins(
conn: &dyn Queryable,
model: &Model,
filter: &Filter,
selected_fields: &FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<Option<SingleRecord>> {
let field_names: Vec<_> = selected_fields.db_names_grouping_virtuals().collect();
let idents = selected_fields.type_identifiers_with_arities_grouping_virtuals();
let indexes = get_selection_indexes(
selected_fields.relations().collect(),
selected_fields.virtuals().collect(),
&field_names,
);
let query = query_builder::select::SelectBuilder::build(
QueryArguments::from((model.clone(), filter.clone())),
selected_fields,
ctx,
);
let mut record = execute_find_one(conn, query, &idents, &field_names, ctx).await?;
if let Some(record) = record.as_mut() {
coerce_record_with_json_relation(record, &indexes)?;
};
Ok(record.map(|record| SingleRecord { record, field_names }))
}
pub(crate) async fn get_single_record_wo_joins(
conn: &dyn Queryable,
model: &Model,
filter: &Filter,
selected_fields: &FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<Option<SingleRecord>> {
let selected_fields = selected_fields.without_relations().into_virtuals_last();
let query = read::get_records(
model,
ModelProjection::from(&selected_fields)
.as_columns(ctx)
.mark_all_selected(),
selected_fields.virtuals(),
filter,
ctx,
);
let field_names: Vec<_> = selected_fields.db_names().collect();
let idents = selected_fields.type_identifiers_with_arities();
let record = execute_find_one(conn, query, &idents, &field_names, ctx)
.await?
.map(|record| SingleRecord { record, field_names });
Ok(record)
}
async fn execute_find_one(
conn: &dyn Queryable,
query: Select<'_>,
idents: &[(TypeIdentifier, FieldArity)],
field_names: &[String],
ctx: &Context<'_>,
) -> crate::Result<Option<Record>> {
let meta = column_metadata::create(field_names, idents);
let row = (match conn.find(query, meta.as_slice(), ctx).await {
Ok(result) => Ok(Some(result)),
Err(_e @ SqlError::RecordNotFoundForWhere(_)) => Ok(None),
Err(_e @ SqlError::RecordDoesNotExist { .. }) => Ok(None),
Err(e) => Err(e),
})?
.map(Record::from);
Ok(row)
}
pub(crate) async fn get_many_records(
conn: &dyn Queryable,
model: &Model,
query_arguments: QueryArguments,
selected_fields: &FieldSelection,
relation_load_strategy: RelationLoadStrategy,
ctx: &Context<'_>,
) -> crate::Result<ManyRecords> {
match relation_load_strategy {
RelationLoadStrategy::Join => get_many_records_joins(conn, model, query_arguments, selected_fields, ctx).await,
RelationLoadStrategy::Query => {
get_many_records_wo_joins(conn, model, query_arguments, selected_fields, ctx).await
}
}
}
pub(crate) async fn get_many_records_joins(
conn: &dyn Queryable,
_model: &Model,
query_arguments: QueryArguments,
selected_fields: &FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<ManyRecords> {
let field_names: Vec<_> = selected_fields.db_names_grouping_virtuals().collect();
let idents = selected_fields.type_identifiers_with_arities_grouping_virtuals();
let meta = column_metadata::create(field_names.as_slice(), idents.as_slice());
let indexes = get_selection_indexes(
selected_fields.relations().collect(),
selected_fields.virtuals().collect(),
&field_names,
);
let mut records = ManyRecords::new(field_names.clone());
if let Some(0) = query_arguments.take {
return Ok(records);
};
match ctx.max_bind_values {
Some(chunk_size) if query_arguments.should_batch(chunk_size) => {
return Err(SqlError::QueryParameterLimitExceeded(
"Joined queries cannot be split into multiple queries.".to_string(),
));
}
_ => (),
};
let query = query_builder::select::SelectBuilder::build(query_arguments.clone(), selected_fields, ctx);
for item in conn.filter(query.into(), meta.as_slice(), ctx).await?.into_iter() {
let mut record = Record::from(item);
coerce_record_with_json_relation(&mut record, &indexes)?;
records.push(record)
}
if query_arguments.needs_reversed_order() {
records.reverse();
}
Ok(records)
}
pub(crate) async fn get_many_records_wo_joins(
conn: &dyn Queryable,
model: &Model,
mut query_arguments: QueryArguments,
selected_fields: &FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<ManyRecords> {
let selected_fields = selected_fields.without_relations().into_virtuals_last();
let reversed = query_arguments.needs_reversed_order();
let field_names: Vec<_> = selected_fields.db_names().collect();
let idents = selected_fields.type_identifiers_with_arities();
let meta = column_metadata::create(field_names.as_slice(), idents.as_slice());
let mut records = ManyRecords::new(field_names.clone());
if let Some(0) = query_arguments.take {
return Ok(records);
};
match ctx.max_bind_values {
Some(chunk_size) if query_arguments.should_batch(chunk_size) => {
if query_arguments.has_unbatchable_ordering() {
return Err(SqlError::QueryParameterLimitExceeded(
"Your query cannot be split into multiple queries because of the order by aggregation or relevance"
.to_string(),
));
}
if query_arguments.has_unbatchable_filters() {
return Err(SqlError::QueryParameterLimitExceeded(
"Parameter limits for this database provider require this query to be split into multiple queries, but the negation filters used prevent the query from being split. Please reduce the used values in the query."
.to_string(),
));
}
let order = std::mem::take(&mut query_arguments.order_by);
let batches = query_arguments.batched(chunk_size);
let mut futures = FuturesUnordered::new();
for args in batches.into_iter() {
let query = read::get_records(
model,
ModelProjection::from(&selected_fields)
.as_columns(ctx)
.mark_all_selected(),
selected_fields.virtuals(),
args,
ctx,
);
futures.push(conn.filter(query.into(), meta.as_slice(), ctx));
}
while let Some(result) = futures.next().await {
for item in result?.into_iter() {
records.push(Record::from(item))
}
}
if !order.is_empty() {
records.order_by(&order, reversed)
}
}
_ => {
let query = read::get_records(
model,
ModelProjection::from(&selected_fields)
.as_columns(ctx)
.mark_all_selected(),
selected_fields.virtuals(),
query_arguments,
ctx,
);
for item in conn.filter(query.into(), meta.as_slice(), ctx).await?.into_iter() {
records.push(Record::from(item))
}
}
}
if reversed {
records.reverse();
}
Ok(records)
}
pub(crate) async fn get_related_m2m_record_ids(
conn: &dyn Queryable,
from_field: &RelationFieldRef,
from_record_ids: &[SelectionResult],
ctx: &Context<'_>,
) -> crate::Result<Vec<(SelectionResult, SelectionResult)>> {
let mut idents = vec![];
idents.extend(ModelProjection::from(from_field.model().primary_identifier()).type_identifiers_with_arities());
idents
.extend(ModelProjection::from(from_field.related_model().primary_identifier()).type_identifiers_with_arities());
let mut field_names = Vec::new();
field_names.extend(from_field.model().primary_identifier().db_names());
field_names.extend(from_field.related_model().primary_identifier().db_names());
let meta = column_metadata::create(&field_names, &idents);
let relation = from_field.relation();
let table = relation.as_table(ctx);
let from_columns: Vec<_> = from_field.related_field().m2m_columns(ctx);
let to_columns: Vec<_> = from_field.m2m_columns(ctx);
let select = Select::from_table(table)
.so_that(query_builder::in_conditions(&from_columns, from_record_ids, ctx))
.columns(from_columns.into_iter().chain(to_columns.into_iter()));
let parent_model_id = from_field.model().primary_identifier();
let child_model_id = from_field.related_model().primary_identifier();
let from_sfs: Vec<_> = parent_model_id
.as_scalar_fields()
.expect("Parent model ID has non-scalar fields.");
let to_sfs: Vec<_> = child_model_id
.as_scalar_fields()
.expect("Child model ID has non-scalar fields.");
Ok(conn
.filter(select.into(), meta.as_slice(), ctx)
.await?
.into_iter()
.map(|row| {
let mut values = row.values;
let child_values = values.split_off(from_sfs.len());
let parent_values = values;
let p: SelectionResult = from_sfs
.iter()
.zip(parent_values)
.map(|(sf, val)| (sf.clone(), val))
.collect::<Vec<_>>()
.into();
let c: SelectionResult = to_sfs
.iter()
.zip(child_values)
.map(|(sf, val)| (sf.clone(), val))
.collect::<Vec<_>>()
.into();
(p, c)
})
.collect())
}
pub(crate) async fn aggregate(
conn: &dyn Queryable,
model: &Model,
query_arguments: QueryArguments,
selections: Vec<AggregationSelection>,
group_by: Vec<ScalarFieldRef>,
having: Option<Filter>,
ctx: &Context<'_>,
) -> crate::Result<Vec<AggregationRow>> {
if !group_by.is_empty() {
group_by_aggregate(conn, model, query_arguments, selections, group_by, having, ctx).await
} else {
plain_aggregate(conn, model, query_arguments, selections, ctx)
.await
.map(|v| vec![v])
}
}
async fn plain_aggregate(
conn: &dyn Queryable,
model: &Model,
query_arguments: QueryArguments,
selections: Vec<AggregationSelection>,
ctx: &Context<'_>,
) -> crate::Result<Vec<AggregationResult>> {
let query = read::aggregate(model, &selections, query_arguments, ctx);
let idents: Vec<_> = selections
.iter()
.flat_map(|aggregator| aggregator.identifiers())
.map(|(_, ident, arity)| (ident, arity))
.collect();
let meta = column_metadata::create_anonymous(&idents);
let mut rows = conn.filter(query.into(), meta.as_slice(), ctx).await?;
let row = rows
.pop()
.expect("Expected exactly one return row for aggregation query.");
Ok(row.into_aggregation_results(&selections))
}
async fn group_by_aggregate(
conn: &dyn Queryable,
model: &Model,
query_arguments: QueryArguments,
selections: Vec<AggregationSelection>,
group_by: Vec<ScalarFieldRef>,
having: Option<Filter>,
ctx: &Context<'_>,
) -> crate::Result<Vec<AggregationRow>> {
let query = read::group_by_aggregate(model, query_arguments, &selections, group_by, having, ctx);
let idents: Vec<_> = selections
.iter()
.flat_map(|aggregator| aggregator.identifiers())
.map(|(_, ident, arity)| (ident, arity))
.collect();
let meta = column_metadata::create_anonymous(&idents);
let rows = conn.filter(query.into(), meta.as_slice(), ctx).await?;
Ok(rows
.into_iter()
.map(|row| row.into_aggregation_results(&selections))
.collect())
}
fn get_selection_indexes<'a>(
relations: Vec<&'a RelationSelection>,
virtuals: Vec<&'a VirtualSelection>,
field_names: &'a [String],
) -> Vec<(usize, IndexedSelection<'a>)> {
field_names
.iter()
.enumerate()
.filter_map(|(idx, field_name)| {
relations
.iter()
.find_map(|rs| (rs.field.name() == field_name).then_some(IndexedSelection::Relation(rs)))
.or_else(|| {
virtuals.iter().find_map(|vs| {
let obj_name = vs.serialized_name().0;
(obj_name == field_name).then_some(IndexedSelection::Virtual(obj_name))
})
})
.map(|indexed_selection| (idx, indexed_selection))
})
.collect()
}