From 3709f6efc455f9fd775b3906845790991c341feb Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Sun, 20 Apr 2025 23:30:12 +0200 Subject: [PATCH] WIP: v1.0.0 --- .gitignore | 3 +- README.md | 5 +- migrations/20250321201214_initial.sql | 3 +- src/http_api/api_error.rs | 28 ++- src/http_api/headers.rs | 13 ++ src/http_api/uploads/append_to_upload.rs | 244 +++++++++++------------ src/http_api/uploads/mod.rs | 126 ++++++++++-- src/processing_worker.rs | 10 +- src/upload_manager.rs | 142 +++++++++---- src/util/acquirable.rs | 71 ++++--- 10 files changed, 422 insertions(+), 223 deletions(-) diff --git a/.gitignore b/.gitignore index e310c9d..505aad1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target .idea/ +.sqlx/ /run/ -*.env \ No newline at end of file +*.env diff --git a/README.md b/README.md index 68f10c6..6278379 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ application. Even while this process is still running, the object data is already accessible at `/objects/OBJECT_HASH`. + ### Filesystem requirements minna-caos uses the local filesystem as a staging area for uploads. (This is the special `staging` bucket seen in the example above.) @@ -87,7 +88,9 @@ The filesystem containing the staging directory must… ## Roadmap - metadata endpoints -- endpoint for settling an upload with the hash of an existing object +- support non-resumable clients +- send the draft version header +- support upload cancellation - upload expiration - garbage-collect failed uploads - add code comments diff --git a/migrations/20250321201214_initial.sql b/migrations/20250321201214_initial.sql index 5c9acc3..45652fe 100644 --- a/migrations/20250321201214_initial.sql +++ b/migrations/20250321201214_initial.sql @@ -20,7 +20,8 @@ create table unfinished_uploads ( id text not null, current_size integer not null, -- in bytes - total_size integer not null, -- in bytes + total_size integer, -- in bytes + max_size integer, -- in bytes is_complete integer not null, -- boolean primary key (id) ) without rowid, strict; diff --git a/src/http_api/api_error.rs b/src/http_api/api_error.rs index 4b734c5..5604220 100644 --- a/src/http_api/api_error.rs +++ b/src/http_api/api_error.rs @@ -46,11 +46,12 @@ pub enum ApiError { CaosUploadRequestSuperseded, CaosUploadFailed { reason: UploadFailureReason }, CaosUploadOffsetMismatch { expected: u64, provided: u64 }, - CaosInconsistentUploadLength { expected: u64, detail: Cow<'static, str> }, - CaosUploadNotFinished, + CaosInconsistentUploadLength { detail: Cow<'static, str> }, + CaosWrongUploadStage { expected: Cow<'static, str>, actual: Cow<'static, str> }, CaosUnknownBucket { bucket_id: Cow<'static, str> }, CaosNoReplicaAvailable, CaosStagingAreaFull, + CaosUploadSizeLimitExceeded { limit: u64, provided: u64 }, } impl From for ApiError { @@ -164,21 +165,22 @@ impl IntoResponse for ApiError { })), ) .into_response(), - ApiError::CaosInconsistentUploadLength { expected, detail } => ( + ApiError::CaosInconsistentUploadLength { detail } => ( StatusCode::CONFLICT, - UploadOffsetResponseHeader(expected), ProblemJson(json!({ "type": "https://iana.org/assignments/http-problem-types#inconsistent-uploads-length", - "title": "The provided uploads lengths are inconsistent with one another or a previously established total length.", + "title": "The provided upload lengths are inconsistent with one another or a previously established total length.", "detail": detail, })), ) .into_response(), - ApiError::CaosUploadNotFinished => ( + ApiError::CaosWrongUploadStage { expected, actual } => ( StatusCode::CONFLICT, ProblemJson(json!({ - "type": "https://minna.media/api-problems/caos/uploads-not-finished", - "title": "The uploads is not finished yet." + "type": "https://minna.media/api-problems/caos/wrong-upload-stage", + "title": "The upload is not in the expected stage.", + "expected": expected, + "actual": actual, })), ) .into_response(), @@ -209,6 +211,16 @@ impl IntoResponse for ApiError { })), ) .into_response(), + ApiError::CaosUploadSizeLimitExceeded { limit, provided } => ( + StatusCode::BAD_REQUEST, + ProblemJson(json!({ + "type": "https://minna.media/api-problems/caos/upload-size-limit-exceeded", + "title": "The data is longer than allowed for this upload.", + "limit": limit, + "provided": provided, + })), + ) + .into_response(), } } } diff --git a/src/http_api/headers.rs b/src/http_api/headers.rs index 27ff5cc..95f0c79 100644 --- a/src/http_api/headers.rs +++ b/src/http_api/headers.rs @@ -1,5 +1,6 @@ use crate::http_api::api_error::ApiError; use axum::http::{HeaderMap, HeaderName, HeaderValue, header}; +use std::num::NonZeroU64; pub mod upload_headers { use axum::http::HeaderName; @@ -50,6 +51,7 @@ impl HeaderMapExt for HeaderMap { pub trait HeaderValueExt { fn get_unsigned_decimal_number(&self, header_name_for_error: &HeaderName) -> Result; + fn get_positive_decimal_number(&self, header_name_for_error: &HeaderName) -> Result; fn get_boolean(&self, header_name_for_error: &HeaderName) -> Result; } @@ -65,6 +67,17 @@ impl HeaderValueExt for HeaderValue { }) } + fn get_positive_decimal_number(&self, header_name_for_error: &HeaderName) -> Result { + self.to_str() + .ok() + .map(|v| v.parse::().ok()) + .flatten() + .ok_or(ApiError::InvalidRequestHeader { + name: header_name_for_error.to_owned(), + message: "must be a positive 64-bit decimal number".into(), + }) + } + fn get_boolean(&self, header_name_for_error: &HeaderName) -> Result { if let Ok(value) = self.to_str() { if value == "?1" { diff --git a/src/http_api/uploads/append_to_upload.rs b/src/http_api/uploads/append_to_upload.rs index 2581671..ddcf8bc 100644 --- a/src/http_api/uploads/append_to_upload.rs +++ b/src/http_api/uploads/append_to_upload.rs @@ -3,7 +3,7 @@ use crate::http_api::api_error::ApiError; use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers}; use crate::http_api::uploads::headers::{UploadCompleteResponseHeader, UploadOffsetResponseHeader}; use crate::http_api::uploads::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadPathParameters}; -use crate::upload_manager::{AcquiredUnfinishedUpload, UploadFailureReason}; +use crate::upload_manager::{AcquiredUnfinishedUpload, UnfinishedUploadMetadata, UnfinishedUploadStage, UploadFailureReason}; use crate::util::acquirable::Acquisition; use axum::body::{Body, BodyDataStream}; use axum::extract::{Path, State}; @@ -12,6 +12,7 @@ use axum::response::{IntoResponse, NoContent, Response}; use color_eyre::Report; use futures::TryStreamExt; use std::io::ErrorKind; +use std::num::NonZeroU64; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio_util::io::StreamReader; @@ -69,7 +70,7 @@ pub(super) async fn append_to_upload( headers: HeaderMap, request_body: Body, ) -> Result { - let mut tx = context.database.begin().await.map_err(Into::::into)?; + let mut tx = context.database.begin().await.map_err(Report::from)?; let upload = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await { upload @@ -85,7 +86,8 @@ pub(super) async fn append_to_upload( .into()); }; - let parameters = parse_request_parameters(&headers).await?; + let metadata = upload.metadata(&mut tx).await.map_err(Report::from)?; + let parameters = parse_request_parameters(&headers)?; let mut upload_acquisition = if let Some(a) = upload.acquire().await { a @@ -93,40 +95,136 @@ pub(super) async fn append_to_upload( return Err(ApiError::CaosUploadRequestSuperseded.into()); }; - if let Some(supplied_upload_length) = parameters.supplied_upload_length { - let expected = upload_acquisition.inner().upload().total_size(); - if supplied_upload_length != expected { - return Err(ApiError::CaosInconsistentUploadLength { - expected, - detail: format!("Upload-Length is set to {supplied_upload_length}, but the actual length is {expected}.").into(), + let new_total_size = if let Some(supplied_upload_length) = parameters.supplied_upload_length { + if let Some(expected) = metadata.total_size { + if supplied_upload_length != expected { + return Err(ApiError::CaosInconsistentUploadLength { + detail: format!("Upload-Length is set to {supplied_upload_length}, but the previously established length is {expected}.").into(), + } + .into()); } - .into()); + + None + } else { + Some(supplied_upload_length) } + } else { + None + }; + + if let Some(new_total_size) = new_total_size { + upload.set_total_size(&mut tx, new_total_size).await?; } - if upload.is_complete() { - return Err(ApiError::IanaUploadAlreadyComplete.into()); + match metadata.stage() { + UnfinishedUploadStage::Created => {} + UnfinishedUploadStage::Ongoing => {} + UnfinishedUploadStage::Complete => return Err(ApiError::IanaUploadAlreadyComplete.into()), } - let response = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream()).await?; + let release_request_token = upload_acquisition.release_request_token(); + let acquired_upload = upload_acquisition.inner(); + let upload = acquired_upload.upload(); + let mut file = acquired_upload.file().get_or_open(true).await?; - match &response { - AppendToUploadResponse::UploadIncomplete { .. } => upload_acquisition.release().await, - AppendToUploadResponse::UploadComplete => upload_acquisition.complete().await?, - AppendToUploadResponse::UploadFailed { reason } => upload_acquisition.fail(*reason).await?, + let current_offset = file.stream_position().await?; + if current_offset < metadata.current_size { + log::error!("The upload ({}) failed because the file contains less data than expected.", upload.id()); + return Ok(AppendToUploadResponse::UploadFailed { + reason: UploadFailureReason::MissingData, + } + .into()); } - Ok(response) + let remaining_length = metadata.total_size.map(|s| s.get() - current_offset); + + if parameters.supplied_upload_offset != current_offset { + return Err(ApiError::CaosUploadOffsetMismatch { + expected: current_offset, + provided: parameters.supplied_upload_offset, + } + .into()); + } + + let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length { + if let Some(remaining_length) = remaining_length { + if parameters.supplied_upload_complete { + if remaining_length != supplied_content_length { + return Err(ApiError::CaosInconsistentUploadLength { + detail: "Upload-Complete is set to true, and Content-Length is set, \ + but the value of Content-Length does not equal the length of the remaining content." + .into(), + } + .into()); + } + } else { + if supplied_content_length >= remaining_length { + return Err(ApiError::CaosInconsistentUploadLength { + detail: "Upload-Complete is set to false, and Content-Length is set, \ + but the value of Content-Length is not smaller than the length of the remaining content." + .into(), + } + .into()); + } + } + } + + supplied_content_length + } else { + remaining_length.unwrap_or(u64::MAX) + }; + + let outcome = tokio::select! { + o = stream_to_file( + request_body.into_data_stream(), + &mut file, + remaining_length, + parameters.supplied_content_length, + parameters.supplied_upload_complete, + payload_length_limit + ) => Some(o?), + _ = release_request_token.cancelled() => None + }; + + file.sync_all().await?; + + let new_size = file.stream_position().await?; + acquired_upload.set_current_size(&mut tx, new_size).await?; + + let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome { + parameters.supplied_upload_complete + } else { + false + }; + + if let Some(outcome) = outcome { + match outcome { + StreamToFileOutcome::StorageFull => Err(ApiError::CaosStagingAreaFull.into()), + StreamToFileOutcome::StoppedUnexpectedly => Err(ApiError::RequestBodyTooShort.into()), + StreamToFileOutcome::TooMuchContent => Err(ApiError::RequestBodyTooLong.into()), + StreamToFileOutcome::Success => { + if is_upload_complete { + upload_acquisition.complete().await?; + Ok(AppendToUploadResponse::UploadComplete) + } else { + Ok(AppendToUploadResponse::UploadIncomplete { offset: new_size }) + } + } + } + } else { + Err(ApiError::CaosUploadRequestSuperseded.into()) + } } +#[derive(Debug)] struct RequestParameters { pub supplied_content_length: Option, - pub supplied_upload_length: Option, + pub supplied_upload_length: Option, pub supplied_upload_offset: u64, pub supplied_upload_complete: bool, } -async fn parse_request_parameters(headers: &HeaderMap) -> Result { +fn parse_request_parameters(headers: &HeaderMap) -> Result { if !headers .get_exactly_once(&axum::http::header::CONTENT_TYPE)? .to_str() @@ -147,7 +245,7 @@ async fn parse_request_parameters(headers: &HeaderMap) -> Result Result, - parameters: RequestParameters, - content_stream: BodyDataStream, -) -> Result { - let release_request_token = upload_acquisition.release_request_token(); - let acquired_upload = upload_acquisition.inner(); - let upload = acquired_upload.upload(); - let mut file = acquired_upload.file().get_or_open(true).await?; - - let total_size = upload.total_size(); - let current_offset = file.stream_position().await?; - - if current_offset < upload.current_size() { - log::error!("The uploads ({}) failed because the file contains less data than expected.", upload.id()); - return Ok(AppendToUploadResponse::UploadFailed { - reason: UploadFailureReason::MissingData, - } - .into()); - } - - let remaining_content_length = total_size - current_offset; - - if parameters.supplied_upload_offset != current_offset { - return Err(ApiError::CaosUploadOffsetMismatch { - expected: current_offset, - provided: parameters.supplied_upload_offset, - } - .into()); - } - - let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length { - if parameters.supplied_upload_complete { - if remaining_content_length != supplied_content_length { - return Err(ApiError::CaosInconsistentUploadLength { - expected: total_size, - detail: "Upload-Complete is set to true, and Content-Length is set, \ - but the value of Content-Length does not equal the length of the remaining content." - .into(), - } - .into()); - } - } else { - if supplied_content_length >= remaining_content_length { - return Err(ApiError::CaosInconsistentUploadLength { - expected: total_size, - detail: "Upload-Complete is set to false, and Content-Length is set, \ - but the value of Content-Length is not smaller than the length of the remaining content." - .into(), - } - .into()); - } - } - - supplied_content_length - } else { - remaining_content_length - }; - - let outcome = tokio::select! { - o = stream_to_file( - content_stream, - &mut file, - remaining_content_length, - parameters.supplied_content_length, - parameters.supplied_upload_complete, - payload_length_limit - ) => Some(o?), - _ = release_request_token.cancelled() => None - }; - - file.sync_all().await?; - - let new_size = file.stream_position().await?; - acquired_upload.set_current_size(new_size).await?; - - let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome { - parameters.supplied_upload_complete - } else { - false - }; - - if let Some(outcome) = outcome { - match outcome { - StreamToFileOutcome::StorageFull => Err(ApiError::CaosStagingAreaFull.into()), - StreamToFileOutcome::StoppedUnexpectedly => Err(ApiError::RequestBodyTooShort.into()), - StreamToFileOutcome::TooMuchContent => Err(ApiError::RequestBodyTooLong.into()), - StreamToFileOutcome::Success => { - if is_upload_complete { - Ok(AppendToUploadResponse::UploadComplete) - } else { - Ok(AppendToUploadResponse::UploadIncomplete { offset: new_size }) - } - } - } - } else { - Err(ApiError::CaosUploadRequestSuperseded.into()) - } -} - #[derive(Debug)] pub enum StreamToFileOutcome { StorageFull, @@ -277,7 +275,7 @@ pub enum StreamToFileOutcome { async fn stream_to_file( content_stream: BodyDataStream, file: &mut File, - remaining_content_length: u64, + remaining_length: Option, supplied_content_length: Option, supplied_upload_complete: bool, payload_length_limit: u64, @@ -294,7 +292,7 @@ async fn stream_to_file( } } else { if supplied_upload_complete { - if n < remaining_content_length { + if n < remaining_length.unwrap_or(0) { return Ok(StreamToFileOutcome::StoppedUnexpectedly); } } diff --git a/src/http_api/uploads/mod.rs b/src/http_api/uploads/mod.rs index f17d1ec..eb909eb 100644 --- a/src/http_api/uploads/mod.rs +++ b/src/http_api/uploads/mod.rs @@ -3,14 +3,15 @@ use crate::http_api::api_error::ApiError; use crate::http_api::auth::AppAuthorization; use crate::http_api::headers::{CACHE_CONTROL_CACHE_FOREVER, CACHE_CONTROL_NEVER_CACHE, upload_headers}; use crate::http_api::uploads::append_to_upload::append_to_upload; -use crate::upload_manager::{UploadFailureReason, UploadId}; +use crate::upload_manager::{UnfinishedUploadStage, UploadFailureReason, UploadId}; use crate::util::id::BucketId; use axum::extract::{Path, State}; -use axum::http::HeaderValue; -use axum::response::{IntoResponse, Response}; +use axum::http::{HeaderMap, HeaderValue}; +use axum::response::{IntoResponse, NoContent, Response}; use axum::{Json, Router, routing}; use color_eyre::Report; use serde::{Deserialize, Serialize}; +use std::num::NonZeroU64; pub mod append_to_upload; pub mod headers; @@ -32,6 +33,7 @@ pub fn create_uploads_router() -> Router { .route("/", routing::post(create_upload)) .route("/{upload_id}", routing::get(get_upload_metadata).post(append_to_upload)) .route("/{upload_id}/accept", routing::post(accept_upload)) + .route("/{upload_id}/complete", routing::post(complete_upload_directly)) } async fn create_upload( @@ -66,13 +68,18 @@ struct GetUploadMetadataResponse { #[derive(Debug, Serialize)] #[serde(rename_all = "snake_case", tag = "state")] enum GetUploadMetadataResponseState { + // These three are unfinished + Created { + size_limit: u64, + }, Ongoing { current_size: u64, - total_size: u64, + total_size: Option, }, Complete { size: u64, }, + Finished { hash: String, size: u64, @@ -87,16 +94,26 @@ enum GetUploadMetadataResponseState { impl IntoResponse for GetUploadMetadataResponse { fn into_response(self) -> Response { match self.state { - GetUploadMetadataResponseState::Ongoing { current_size, total_size } => ( + GetUploadMetadataResponseState::Created { .. } => ( [ (upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?0")), - (upload_headers::UPLOAD_OFFSET, HeaderValue::from(current_size)), - (upload_headers::UPLOAD_LENGTH, HeaderValue::from(total_size)), + (upload_headers::UPLOAD_OFFSET, HeaderValue::from(0)), CACHE_CONTROL_NEVER_CACHE, ], Json(self), ) .into_response(), + GetUploadMetadataResponseState::Ongoing { current_size, total_size } => { + let mut headers = HeaderMap::new(); + headers.insert(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?0")); + headers.insert(upload_headers::UPLOAD_OFFSET, HeaderValue::from(current_size)); + + if let Some(total_size) = total_size { + headers.insert(upload_headers::UPLOAD_LENGTH, HeaderValue::from(total_size.get())); + } + + (headers, [CACHE_CONTROL_NEVER_CACHE], Json(self)).into_response() + } GetUploadMetadataResponseState::Complete { size } => ( [ (upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?1")), @@ -129,15 +146,16 @@ impl IntoResponse for GetUploadMetadataResponse { async fn get_upload_metadata( State(context): State, Path(UploadPathParameters { upload_id }): Path, -) -> Result { +) -> Result { let mut tx = context.database.begin().await.map_err(Into::::into)?; let state: GetUploadMetadataResponseState = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await { - if upload.is_complete() { - GetUploadMetadataResponseState::Complete { size: upload.total_size() } + let metadata = upload.metadata(&mut tx).await?; + if metadata.is_complete { + GetUploadMetadataResponseState::Complete { size: metadata.current_size } } else { GetUploadMetadataResponseState::Ongoing { - current_size: upload.current_size(), - total_size: upload.total_size(), + current_size: metadata.current_size, + total_size: metadata.total_size, } } } else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? { @@ -172,10 +190,13 @@ async fn accept_upload( State(context): State, Path(UploadPathParameters { upload_id }): Path, Json(payload): Json, -) -> Result { - let mut tx = context.database.begin().await.map_err(Into::::into)?; +) -> Result { + let mut tx = context.database.begin().await.map_err(Report::from)?; let _hash = if let Some(_) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await { - return Err(ApiError::CaosUploadNotFinished); + return Err(ApiError::CaosWrongUploadStage { + expected: "finished".into(), + actual: "unfinished".into(), + }); } else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? { return Err(ApiError::CaosUploadFailed { reason }); } else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? { @@ -198,7 +219,78 @@ async fn accept_upload( } context.upload_manager.accept_finished_upload(&mut tx, upload_id, payload.buckets).await?; - tx.commit().await.map_err(Into::::into)?; + tx.commit().await.map_err(Report::from)?; - Ok(()) + Ok(NoContent) +} + +#[derive(Debug, Deserialize)] +struct CompleteUploadDirectlyPayload { + existing_hash: String, +} + +async fn complete_upload_directly( + State(context): State, + Path(UploadPathParameters { upload_id }): Path, + Json(payload): Json, +) -> Result { + let mut tx = context.database.begin().await.map_err(Report::from)?; + if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await { + let upload_acquisition = if let Some(a) = upload.acquire().await { + a + } else { + return Err(ApiError::CaosUploadRequestSuperseded); + }; + + let metadata = upload.metadata(&mut tx).await?; + + match metadata.stage() { + UnfinishedUploadStage::Created => { + let object_size = sqlx::query!("SELECT size FROM objects WHERE hash = ?", payload.existing_hash) + .map(|r| r.size as u64) + .fetch_optional(&mut *tx) + .await?; + + if let Some(object_size) = object_size { + if let Some(max_size) = metadata.max_size { + if object_size <= max_size.get() { + upload_acquisition.complete_directly(&mut tx, &payload.existing_hash).await?; + Ok(()) + } else { + Err(ApiError::CaosUploadSizeLimitExceeded { + limit: max_size.get(), + provided: object_size, + } + .into()) + } + } else { + Ok(()) + } + } else { + Err(ApiError::UnknownResource { + resource_type: "objects".into(), + id: payload.existing_hash.into(), + } + .into()) + } + } + stage => Err(ApiError::CaosWrongUploadStage { + expected: "created".into(), + actual: stage.to_string().into(), + }), + } + } else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? { + Err(ApiError::CaosUploadFailed { reason }) + } else if let Some(_) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? { + Err(ApiError::CaosWrongUploadStage { + expected: "created".into(), + actual: "finished".into(), + }) + } else { + Err(ApiError::UnknownResource { + resource_type: "uploads".into(), + id: upload_id.to_string().into(), + } + .into()) + } } diff --git a/src/processing_worker.rs b/src/processing_worker.rs index d3a5190..3d283c8 100644 --- a/src/processing_worker.rs +++ b/src/processing_worker.rs @@ -1,4 +1,4 @@ -use crate::upload_manager::AcquiredUnfinishedUpload; +use crate::upload_manager::{AcquiredUnfinishedUpload, UploadId}; use crate::util::hash_to_hex_string::HashExt; use crate::util::temporal_formatting::TemporalFormatting; use blake3::Hasher; @@ -10,22 +10,25 @@ use std::io::SeekFrom; use temporal_rs::Now; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; pub async fn do_processing_work( database: SqlitePool, enable_multithreaded_hashing: bool, staging_directory_path: Utf8PathBuf, - mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver, + mut tasks_receiver: UnboundedReceiver, + finished_upload_ids_sender: UnboundedSender, ) { while let Some(mut acquired_upload) = tasks_receiver.recv().await { match process(enable_multithreaded_hashing, &staging_directory_path, &mut acquired_upload).await { Ok(outcome) => { let mut tx = database.begin().await.unwrap(); let upload = acquired_upload.upload(); + let metadata = upload.metadata(&mut tx).await.unwrap(); let id = upload.id().as_str(); let hash = outcome.hash.as_str(); - let size = upload.total_size() as i64; + let size = metadata.current_size as i64; let creation_date = Now::zoneddatetime_iso(None).unwrap().to_string_with_defaults().unwrap(); // This is all in a transaction, so doing this first is fine. @@ -66,6 +69,7 @@ pub async fn do_processing_work( // This just removes the old link under the upload ID. fs::remove_file(acquired_upload.file().path()).await.unwrap(); + finished_upload_ids_sender.send(*upload.id()).unwrap(); log::info!("Successfully processed upload ({}): {:?}", id, outcome); } Err(report) => { diff --git a/src/upload_manager.rs b/src/upload_manager.rs index eb8d661..69b8740 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -9,6 +9,7 @@ use fstr::FStr; use serde::Serialize; use sqlx::{SqlitePool, SqliteTransaction}; use std::fmt::Debug; +use std::num::NonZeroU64; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use strum::{Display, EnumString}; @@ -30,6 +31,7 @@ impl UploadManager { pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result> { log::info!("Loading uploads…"); + let (finished_upload_ids_sender, mut finished_upload_ids_receiver) = tokio::sync::mpsc::unbounded_channel(); let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel(); let (large_file_processing_tasks_sender, large_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -53,9 +55,6 @@ impl UploadManager { let upload = Arc::new_cyclic(|upload| UnfinishedUpload { manager: Arc::downgrade(&manager), id, - total_size: row.total_size as u64, - current_size: AtomicU64::new(row.current_size as u64), - is_complete: AtomicBool::new(false), acquirable: Acquirable::new(AcquiredUnfinishedUpload { upload: upload.to_owned(), file: FileReference::new(staging_file_path), @@ -75,6 +74,7 @@ impl UploadManager { enable_multithreaded_hashing, staging_directory_path.clone(), small_file_processing_tasks_receiver, + finished_upload_ids_sender.clone(), )); tokio::spawn(do_processing_work( @@ -82,8 +82,19 @@ impl UploadManager { enable_multithreaded_hashing, staging_directory_path, large_file_processing_tasks_receiver, + finished_upload_ids_sender, )); + tokio::spawn({ + let manager = Arc::clone(&manager); + + async move { + while let Some(id) = finished_upload_ids_receiver.recv().await { + manager.unfinished_uploads.remove(&id); + } + } + }); + Ok(manager) } @@ -105,9 +116,6 @@ impl UploadManager { let upload = Arc::new_cyclic(|upload| UnfinishedUpload { manager: Arc::downgrade(&self), id, - total_size, - current_size: AtomicU64::new(0), - is_complete: AtomicBool::new(false), acquirable: Acquirable::new(AcquiredUnfinishedUpload { upload: upload.to_owned(), file: FileReference::new(self.staging_directory_path.join(id.as_str())), @@ -174,6 +182,7 @@ impl UploadManager { } } +#[derive(Debug)] pub struct FinishedUploadMetadata { pub hash: String, pub size: u64, @@ -185,38 +194,70 @@ pub struct FinishedUploadMetadata { pub struct UnfinishedUpload { manager: Weak, id: UploadId, - total_size: u64, - current_size: AtomicU64, - is_complete: AtomicBool, acquirable: Acquirable, } -#[derive(Debug)] -pub struct AcquiredUnfinishedUpload { - upload: Weak, - file: FileReference, -} - impl UnfinishedUpload { pub fn id(&self) -> &UploadId { &self.id } - pub fn total_size(&self) -> u64 { - self.total_size - } - - pub fn current_size(&self) -> u64 { - self.current_size.load(Ordering::Relaxed) - } - - pub fn is_complete(&self) -> bool { - self.is_complete.load(Ordering::Relaxed) + pub async fn metadata(&self, tx: &mut SqliteTransaction<'_>) -> Result { + let id = self.id.as_str(); + Ok(sqlx::query!( + "SELECT current_size, total_size, max_size, is_complete FROM unfinished_uploads WHERE id = ?", + id + ) + .map(|r| UnfinishedUploadMetadata { + current_size: r.current_size as u64, + total_size: r.total_size.map(|s| NonZeroU64::new(s as u64).unwrap()), + max_size: r.max_size.map(|s| NonZeroU64::new(s as u64).unwrap()), + is_complete: r.is_complete == 1, + }) + .fetch_one(&mut **tx) + .await?) } pub async fn acquire(&self) -> Option> { self.acquirable.acquire().await } + + pub async fn set_total_size(&self, tx: &mut SqliteTransaction<'_>, size: NonZeroU64) -> Result<()> { + let size = size.get() as i64; + let id = self.id.as_str(); + + sqlx::query!("UPDATE unfinished_uploads SET total_size = ? WHERE id = ?", size, id) + .execute(&mut **tx) + .await?; + + Ok(()) + } +} + +#[derive(Debug)] +pub struct UnfinishedUploadMetadata { + pub current_size: u64, + pub total_size: Option, + pub max_size: Option, + pub is_complete: bool, +} + +impl UnfinishedUploadMetadata { + pub fn stage(&self) -> UnfinishedUploadStage { + if self.is_complete { + UnfinishedUploadStage::Complete + } else if self.current_size == 0 { + UnfinishedUploadStage::Created + } else { + UnfinishedUploadStage::Ongoing + } + } +} + +#[derive(Debug)] +pub struct AcquiredUnfinishedUpload { + upload: Weak, + file: FileReference, } impl AcquiredUnfinishedUpload { @@ -228,17 +269,14 @@ impl AcquiredUnfinishedUpload { &mut self.file } - pub async fn set_current_size(&self, current_size: u64) -> Result<(), Report> { + pub async fn set_current_size(&self, tx: &mut SqliteTransaction<'_>, current_size: u64) -> Result<(), Report> { let upload = self.upload.upgrade().unwrap(); - let manager = upload.manager.upgrade().unwrap(); - - upload.current_size.store(current_size, Ordering::Relaxed); let id = upload.id.to_string(); let current_size = current_size as i64; sqlx::query!("UPDATE unfinished_uploads SET current_size = ? WHERE id = ?", current_size, id) - .execute(&manager.database) + .execute(&mut **tx) .await?; Ok(()) @@ -246,21 +284,40 @@ impl AcquiredUnfinishedUpload { } impl Acquisition { - pub async fn complete(self) -> Result<(), Report> { + async fn consume(self) -> (AcquiredUnfinishedUpload, Arc, Arc) { let inner = self.destroy().await; let upload = inner.upload.upgrade().unwrap(); let manager = upload.manager.upgrade().unwrap(); - upload.is_complete.store(true, Ordering::Relaxed); + manager.unfinished_uploads.remove(upload.id()); + (inner, upload, manager) + } - let id = upload.id.to_string(); - sqlx::query!("UPDATE unfinished_uploads SET is_complete = 1 WHERE id = ?", id) - .execute(&manager.database) + pub async fn complete_directly(self, tx: &mut SqliteTransaction<'_>, existing_hash: &str) -> Result<(), Report> { + let (_, upload, _) = self.consume().await; + + let id = upload.id().to_string(); + sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut **tx).await?; + + sqlx::query!("INSERT INTO finished_uploads (id, hash) VALUES (?, ?)", id, existing_hash) + .execute(&mut **tx) .await?; - if upload.total_size <= LARGE_FILE_SIZE_THRESHOLD { - manager.small_file_processing_tasks_sender.send(inner).unwrap() + Ok(()) + } + + pub async fn complete(self) -> Result<(), Report> { + let (acquired_upload, upload, manager) = self.consume().await; + + let id = upload.id.to_string(); + let size = sqlx::query!("UPDATE unfinished_uploads SET is_complete = 1 WHERE id = ? RETURNING current_size", id) + .map(|r| r.current_size as u64) + .fetch_one(&manager.database) + .await?; + + if size <= LARGE_FILE_SIZE_THRESHOLD { + manager.small_file_processing_tasks_sender.send(acquired_upload).unwrap() } else { - manager.large_file_processing_tasks_sender.send(inner).unwrap() + manager.large_file_processing_tasks_sender.send(acquired_upload).unwrap() } Ok(()) @@ -289,6 +346,15 @@ impl Acquisition { } } +#[derive(Debug, Clone, Copy, Display, Serialize)] +#[strum(serialize_all = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum UnfinishedUploadStage { + Created, + Ongoing, + Complete, +} + #[derive(Debug, Clone, Copy, EnumString, Display, Serialize)] #[strum(serialize_all = "snake_case")] #[serde(rename_all = "snake_case")] diff --git a/src/util/acquirable.rs b/src/util/acquirable.rs index b8172ab..0ed4453 100644 --- a/src/util/acquirable.rs +++ b/src/util/acquirable.rs @@ -34,7 +34,7 @@ impl Acquirable { ( Outcome::Acquired(Acquisition { - inner, + inner: Some(inner), acquirable_state: Arc::clone(&self.state), release_request_token: release_request_token.clone(), }), @@ -68,7 +68,7 @@ impl Acquirable { match data { Ok((data, release_request_token)) => Some(Acquisition { - inner: data, + inner: Some(data), acquirable_state: Arc::clone(&self.state), release_request_token, }), @@ -94,51 +94,60 @@ pub enum AcquirableState { #[must_use] pub struct Acquisition { - inner: T, + inner: Option, // Only set to None when dropped or destroyed acquirable_state: Arc>>, release_request_token: CancellationToken, } impl Acquisition { pub fn inner(&mut self) -> &mut T { - &mut self.inner + // SAFETY: inner is only None when dropped or destroyed (and then dropped) + unsafe { self.inner.as_mut().unwrap_unchecked() } } pub fn release_request_token(&self) -> CancellationToken { self.release_request_token.clone() } - pub async fn release(self) { - let mut state = self.acquirable_state.lock().await; - - replace_with_or_abort(&mut *state, |state| match state { - AcquirableState::Acquired { - data_return_channel_sender, .. - } => { - let release_request_token = CancellationToken::new(); - match data_return_channel_sender.send((self.inner, release_request_token.clone())) { - Ok(_) => { - let (data_return_channel_sender, data_return_channel_receiver) = tokio::sync::oneshot::channel(); - drop(data_return_channel_receiver); - - AcquirableState::Acquired { - release_request_token, - data_return_channel_sender, - } - } - Err((data, _)) => AcquirableState::Available { inner: data }, - } - } - _ => unreachable!(), - }); - } - /// Consume the acquisition without releasing it. The corresponding Acquirable will forever stay in the acquired state. /// /// All outstanding calls to Acquirable::acquire will return None. - pub async fn destroy(self) -> T { + pub async fn destroy(mut self) -> T { let mut state = self.acquirable_state.lock().await; *state = AcquirableState::Destroyed; - self.inner + + unsafe { self.inner.take().unwrap_unchecked() } + } +} + +impl Drop for Acquisition { + fn drop(&mut self) { + let state = Arc::clone(&self.acquirable_state); + if let Some(inner) = self.inner.take() { + tokio::spawn(async move { + let mut state = state.lock().await; + + replace_with_or_abort(&mut *state, |state| match state { + AcquirableState::Acquired { + data_return_channel_sender, .. + } => { + let release_request_token = CancellationToken::new(); + match data_return_channel_sender.send((inner, release_request_token.clone())) { + Ok(_) => { + let (data_return_channel_sender, data_return_channel_receiver) = tokio::sync::oneshot::channel(); + drop(data_return_channel_receiver); + + AcquirableState::Acquired { + release_request_token, + data_return_channel_sender, + } + } + Err((data, _)) => AcquirableState::Available { inner: data }, + } + } + _ => unreachable!(), + }); + }); + } } }