1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
use super::*;
use crate::{
    query_ast::*,
    query_graph::{Node, NodeRef, QueryGraph, QueryGraphDependency},
    ArgumentListLookup, ParsedField, ParsedInputList, ParsedInputMap,
};
use psl::datamodel_connector::ConnectorCapability;
use query_structure::{IntoFilter, Model};
use schema::{constants::args, QuerySchema};
use std::convert::TryInto;
use write_args_parser::*;

/// Creates a create record query and adds it to the query graph, together with it's nested queries and companion read query.
pub(crate) fn create_record(
    graph: &mut QueryGraph,
    query_schema: &QuerySchema,
    model: Model,
    mut field: ParsedField<'_>,
) -> QueryGraphBuilderResult<()> {
    let data_map = match field.arguments.lookup(args::DATA) {
        Some(data) => data.value.try_into()?,
        None => ParsedInputMap::default(),
    };

    if can_use_atomic_create(query_schema, &model, &data_map, &field) {
        let create_node = create::atomic_create_record_node(graph, query_schema, model, data_map, field)?;

        graph.add_result_node(&create_node);
    } else {
        graph.flag_transactional();

        let create_node = create::create_record_node(graph, query_schema, model.clone(), data_map)?;

        // Follow-up read query on the write
        let read_query = read::find_unique(field, model.clone(), query_schema)?;
        let read_node = graph.create_node(Query::Read(read_query));

        graph.add_result_node(&read_node);
        graph.create_edge(
            &create_node,
            &read_node,
            QueryGraphDependency::ProjectedDataDependency(
                model.primary_identifier(),
                Box::new(move |mut read_node, mut parent_ids| {
                    let parent_id = match parent_ids.pop() {
                        Some(pid) => Ok(pid),
                        None => Err(QueryGraphBuilderError::AssertionError(
                            "Expected a valid parent ID to be present for create follow-up read query.".to_string(),
                        )),
                    }?;

                    if let Node::Query(Query::Read(ReadQuery::RecordQuery(ref mut rq))) = read_node {
                        rq.add_filter(parent_id.filter());
                    };

                    Ok(read_node)
                }),
            ),
        )?;
    }

    Ok(())
}

/// Creates a create record query and adds it to the query graph, together with it's nested queries and companion read query.
pub(crate) fn create_many_records(
    graph: &mut QueryGraph,
    _query_schema: &QuerySchema,
    model: Model,
    mut field: ParsedField<'_>,
) -> QueryGraphBuilderResult<()> {
    graph.flag_transactional();

    let data_list: ParsedInputList<'_> = match field.arguments.lookup(args::DATA) {
        Some(data) => utils::coerce_vec(data.value),
        None => vec![],
    };

    let skip_duplicates: bool = match field.arguments.lookup(args::SKIP_DUPLICATES) {
        Some(arg) => arg.value.try_into()?,
        None => false,
    };

    let args = data_list
        .into_iter()
        .map(|data_value| {
            let data_map = data_value.try_into()?;
            let mut args = WriteArgsParser::from(&model, data_map)?.args;

            args.add_datetimes(&model);
            Ok(args)
        })
        .collect::<QueryGraphBuilderResult<Vec<_>>>()?;

    let query = CreateManyRecords {
        model,
        args,
        skip_duplicates,
    };

    graph.create_node(Query::Write(WriteQuery::CreateManyRecords(query)));
    Ok(())
}

pub fn create_record_node(
    graph: &mut QueryGraph,
    query_schema: &QuerySchema,
    model: Model,
    data_map: ParsedInputMap<'_>,
) -> QueryGraphBuilderResult<NodeRef> {
    let create_args = WriteArgsParser::from(&model, data_map)?;
    let mut args = create_args.args;

    args.add_datetimes(&model);

    let selected_fields = model.primary_identifier();
    let selection_order = selected_fields.db_names().collect();

    let cr = CreateRecord {
        // A regular create record is never used as a result node. Therefore, it's never serialized, so we don't need a name.
        name: String::new(),
        model,
        args,
        selected_fields,
        selection_order,
    };

    let create_node = graph.create_node(Query::Write(WriteQuery::CreateRecord(cr)));

    for (relation_field, data_map) in create_args.nested {
        nested::connect_nested_query(graph, query_schema, create_node, relation_field, data_map)?;
    }

    Ok(create_node)
}

/// An atomic create is a create performed in a single operation.
/// It uses `INSERT ... RETURNING` when the connector supports it.
/// We only perform such create when:
/// 1. There's no nested operations
/// 2. The selection set contains no relation
fn can_use_atomic_create(
    query_schema: &QuerySchema,
    model: &Model,
    data_map: &ParsedInputMap<'_>,
    field: &ParsedField<'_>,
) -> bool {
    // If the connector does not support RETURNING at all
    if !query_schema.has_capability(ConnectorCapability::InsertReturning) {
        return false;
    }

    // If the operation has nested operations
    if WriteArgsParser::has_nested_operation(model, data_map) {
        return false;
    }

    // If the operation has nested selection sets
    if field.has_nested_selection() {
        return false;
    }

    true
}

/// Creates a create record query that's done in a single operation and adds it to the query graph.
/// Translates to an `INSERT ... RETURNING` under the hood.
fn atomic_create_record_node(
    graph: &mut QueryGraph,
    query_schema: &QuerySchema,
    model: Model,
    data_map: ParsedInputMap<'_>,
    field: ParsedField<'_>,
) -> QueryGraphBuilderResult<NodeRef> {
    let create_args = WriteArgsParser::from(&model, data_map)?;
    let mut args = create_args.args;

    let nested_fields = field.nested_fields.unwrap().fields;
    let selection_order: Vec<String> = read::utils::collect_selection_order(&nested_fields);
    let selected_fields = read::utils::collect_selected_scalars(&nested_fields, &model);

    args.add_datetimes(&model);

    let cr = CreateRecord {
        name: field.name.clone(),
        model,
        args,
        selected_fields,
        selection_order,
    };

    let create_node = graph.create_node(Query::Write(WriteQuery::CreateRecord(cr)));

    for (relation_field, data_map) in create_args.nested {
        nested::connect_nested_query(graph, query_schema, create_node, relation_field, data_map)?;
    }

    Ok(create_node)
}