mod change_stream;
#[cfg(test)]
mod test;
use crate::{
    bson::{doc, Bson, Document},
    bson_util,
    cmap::{Command, RawCommandResponse, StreamDescription},
    cursor::CursorSpecification,
    error::Result,
    operation::{append_options, remove_empty_write_concern, Retryability},
    options::{AggregateOptions, SelectionCriteria, WriteConcern},
    Namespace,
};
use super::{
    CursorBody,
    OperationWithDefaults,
    WriteConcernOnlyBody,
    SERVER_4_2_0_WIRE_VERSION,
    SERVER_4_4_0_WIRE_VERSION,
};
pub(crate) use change_stream::ChangeStreamAggregate;
#[derive(Debug)]
pub(crate) struct Aggregate {
    target: AggregateTarget,
    pipeline: Vec<Document>,
    options: Option<AggregateOptions>,
}
impl Aggregate {
    #[cfg(test)]
    fn empty() -> Self {
        Self::new(Namespace::empty(), Vec::new(), None)
    }
    pub(crate) fn new(
        target: impl Into<AggregateTarget>,
        pipeline: impl IntoIterator<Item = Document>,
        mut options: Option<AggregateOptions>,
    ) -> Self {
        if let Some(ref mut options) = options {
            if let Some(ref comment) = options.comment {
                if options.comment_bson.is_none() {
                    options.comment_bson = Some(comment.clone().into());
                }
            }
        }
        Self {
            target: target.into(),
            pipeline: pipeline.into_iter().collect(),
            options,
        }
    }
}
impl OperationWithDefaults for Aggregate {
    type O = CursorSpecification;
    type Command = Document;
    const NAME: &'static str = "aggregate";
    fn build(&mut self, _description: &StreamDescription) -> Result<Command> {
        let mut body = doc! {
            Self::NAME: self.target.to_bson(),
            "pipeline": bson_util::to_bson_array(&self.pipeline),
            "cursor": {}
        };
        remove_empty_write_concern!(self.options);
        append_options(&mut body, self.options.as_ref())?;
        if self.is_out_or_merge() {
            if let Ok(cursor_doc) = body.get_document_mut("cursor") {
                cursor_doc.remove("batchSize");
            }
        }
        Ok(Command::new_read(
            Self::NAME.to_string(),
            self.target.db_name().to_string(),
            self.options.as_ref().and_then(|o| o.read_concern.clone()),
            body,
        ))
    }
    fn extract_at_cluster_time(
        &self,
        response: &bson::RawDocument,
    ) -> Result<Option<bson::Timestamp>> {
        CursorBody::extract_at_cluster_time(response)
    }
    fn handle_response(
        &self,
        response: RawCommandResponse,
        description: &StreamDescription,
    ) -> Result<Self::O> {
        let cursor_response: CursorBody = response.body()?;
        if self.is_out_or_merge() {
            let wc_error_info = response.body::<WriteConcernOnlyBody>()?;
            wc_error_info.validate()?;
        };
        let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION {
            None
        } else {
            self.options
                .as_ref()
                .and_then(|opts| opts.comment_bson.clone())
        };
        Ok(CursorSpecification::new(
            cursor_response.cursor,
            description.server_address.clone(),
            self.options.as_ref().and_then(|opts| opts.batch_size),
            self.options.as_ref().and_then(|opts| opts.max_await_time),
            comment,
        ))
    }
    fn selection_criteria(&self) -> Option<&SelectionCriteria> {
        self.options
            .as_ref()
            .and_then(|opts| opts.selection_criteria.as_ref())
    }
    fn supports_read_concern(&self, description: &StreamDescription) -> bool {
        !self.is_out_or_merge()
            || description.max_wire_version.unwrap_or(0) >= SERVER_4_2_0_WIRE_VERSION
    }
    fn write_concern(&self) -> Option<&WriteConcern> {
        self.options
            .as_ref()
            .and_then(|opts| opts.write_concern.as_ref())
    }
    fn retryability(&self) -> Retryability {
        if self.is_out_or_merge() {
            Retryability::None
        } else {
            Retryability::Read
        }
    }
}
impl Aggregate {
    fn is_out_or_merge(&self) -> bool {
        self.pipeline
            .last()
            .map(|stage| {
                let stage = bson_util::first_key(stage);
                stage == Some("$out") || stage == Some("$merge")
            })
            .unwrap_or(false)
    }
}
#[derive(Clone, Debug)]
pub(crate) enum AggregateTarget {
    Database(String),
    Collection(Namespace),
}
impl AggregateTarget {
    fn to_bson(&self) -> Bson {
        match self {
            AggregateTarget::Database(_) => Bson::Int32(1),
            AggregateTarget::Collection(ref ns) => Bson::String(ns.coll.to_string()),
        }
    }
    fn db_name(&self) -> &str {
        match self {
            AggregateTarget::Database(ref s) => s.as_str(),
            AggregateTarget::Collection(ref ns) => ns.db.as_str(),
        }
    }
}
impl From<Namespace> for AggregateTarget {
    fn from(ns: Namespace) -> Self {
        AggregateTarget::Collection(ns)
    }
}
impl From<String> for AggregateTarget {
    fn from(db_name: String) -> Self {
        AggregateTarget::Database(db_name)
    }
}