mod download;
pub(crate) mod options;
mod upload;
use std::sync::{atomic::AtomicBool, Arc};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use crate::{
bson::{doc, oid::ObjectId, Bson, DateTime, Document, RawBinaryRef},
cursor::Cursor,
error::{Error, ErrorKind, GridFsErrorKind, GridFsFileIdentifier, Result},
options::{CollectionOptions, FindOptions, ReadConcern, SelectionCriteria, WriteConcern},
Collection,
Database,
};
pub use download::GridFsDownloadStream;
pub(crate) use options::*;
pub use upload::GridFsUploadStream;
const DEFAULT_BUCKET_NAME: &str = "fs";
const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024;
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Chunk<'a> {
#[serde(rename = "_id")]
id: ObjectId,
files_id: Bson,
#[serde(serialize_with = "bson::serde_helpers::serialize_u32_as_i32")]
n: u32,
#[serde(borrow)]
data: RawBinaryRef<'a>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[skip_serializing_none]
#[non_exhaustive]
pub struct FilesCollectionDocument {
#[serde(rename = "_id")]
pub id: Bson,
pub length: u64,
#[serde(
rename = "chunkSize",
serialize_with = "bson::serde_helpers::serialize_u32_as_i32"
)]
pub chunk_size_bytes: u32,
pub upload_date: DateTime,
pub filename: Option<String>,
pub metadata: Option<Document>,
}
impl FilesCollectionDocument {
fn n(&self) -> u32 {
Self::n_from_vals(self.length, self.chunk_size_bytes)
}
fn n_from_vals(length: u64, chunk_size_bytes: u32) -> u32 {
let chunk_size_bytes = chunk_size_bytes as u64;
let n = length / chunk_size_bytes + u64::from(length % chunk_size_bytes != 0);
n as u32
}
fn expected_chunk_length(&self, n: u32) -> u32 {
Self::expected_chunk_length_from_vals(self.length, self.chunk_size_bytes, n)
}
fn expected_chunk_length_from_vals(length: u64, chunk_size_bytes: u32, n: u32) -> u32 {
let remainder = length % (chunk_size_bytes as u64);
if n == Self::n_from_vals(length, chunk_size_bytes) - 1 && remainder != 0 {
remainder as u32
} else {
chunk_size_bytes
}
}
}
#[derive(Debug)]
struct GridFsBucketInner {
db: Database,
options: GridFsBucketOptions,
files: Collection<FilesCollectionDocument>,
chunks: Collection<Chunk<'static>>,
created_indexes: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct GridFsBucket {
inner: Arc<GridFsBucketInner>,
}
impl GridFsBucket {
pub(crate) fn new(db: Database, mut options: GridFsBucketOptions) -> GridFsBucket {
if options.read_concern.is_none() {
options.read_concern = db.read_concern().cloned();
}
if options.write_concern.is_none() {
options.write_concern = db.write_concern().cloned();
}
if options.selection_criteria.is_none() {
options.selection_criteria = db.selection_criteria().cloned();
}
let bucket_name = options
.bucket_name
.as_deref()
.unwrap_or(DEFAULT_BUCKET_NAME);
let collection_options = CollectionOptions::builder()
.read_concern(options.read_concern.clone())
.write_concern(options.write_concern.clone())
.selection_criteria(options.selection_criteria.clone())
.build();
let files = db.collection_with_options::<FilesCollectionDocument>(
&format!("{}.files", bucket_name),
collection_options.clone(),
);
let chunks = db.collection_with_options::<Chunk>(
&format!("{}.chunks", bucket_name),
collection_options,
);
GridFsBucket {
inner: Arc::new(GridFsBucketInner {
db: db.clone(),
options,
files,
chunks,
created_indexes: AtomicBool::new(false),
}),
}
}
pub(crate) fn client(&self) -> &crate::Client {
self.inner.files.client()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.options.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.options.write_concern.as_ref()
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.options.selection_criteria.as_ref()
}
fn chunk_size_bytes(&self) -> u32 {
self.inner
.options
.chunk_size_bytes
.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES)
}
pub(crate) fn files(&self) -> &Collection<FilesCollectionDocument> {
&self.inner.files
}
pub(crate) fn chunks(&self) -> &Collection<Chunk<'static>> {
&self.inner.chunks
}
pub async fn delete(&self, id: Bson) -> Result<()> {
let delete_result = self
.files()
.delete_one(doc! { "_id": id.clone() }, None)
.await?;
self.chunks()
.delete_many(doc! { "files_id": id.clone() }, None)
.await?;
if delete_result.deleted_count == 0 {
return Err(ErrorKind::GridFs(GridFsErrorKind::FileNotFound {
identifier: GridFsFileIdentifier::Id(id),
})
.into());
}
Ok(())
}
pub async fn find(
&self,
filter: Document,
options: impl Into<Option<GridFsFindOptions>>,
) -> Result<Cursor<FilesCollectionDocument>> {
let find_options = options.into().map(FindOptions::from);
self.files().find(filter, find_options).await
}
pub async fn rename(&self, id: Bson, new_filename: impl AsRef<str>) -> Result<()> {
self.files()
.update_one(
doc! { "_id": id },
doc! { "$set": { "filename": new_filename.as_ref() } },
None,
)
.await?;
Ok(())
}
pub async fn drop(&self) -> Result<()> {
self.files().drop(None).await?;
self.chunks().drop(None).await?;
Ok(())
}
}
impl Error {
fn into_futures_io_error(self) -> futures_io::Error {
futures_io::Error::new(futures_io::ErrorKind::Other, self)
}
}