From 96a8ea72f18023e59e810bc48c536eb2d33b9a01 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Thu, 3 Apr 2025 18:54:16 +0200 Subject: [PATCH] WIP: v1.0.0 --- Cargo.lock | 24 +++- Cargo.toml | 4 +- src/http_api/mod.rs | 4 +- src/http_api/upload/append_to_upload.rs | 93 +++++++++----- src/upload_manager.rs | 156 +++++++++++++++--------- 5 files changed, 182 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19da886..0c690ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1332,7 +1332,6 @@ dependencies = [ name = "minna_caos" version = "0.1.0" dependencies = [ - "async-trait", "axum", "camino", "color-eyre", @@ -1351,6 +1350,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "strum", "tokio", "tokio-util", "validator", @@ -2404,6 +2404,28 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 8e97789..f045746 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,6 @@ camino = { version = "1.1.9", features = ["serde1"] } dashmap = "7.0.0-rc2" tokio-util = "0.7.14" replace_with = "0.1.7" -async-trait = "0.1.88" rand = "0.9.0" -futures = "0.3.31" \ No newline at end of file +futures = "0.3.31" +strum = { version = "0.27.1", features = ["derive"] } \ No newline at end of file diff --git a/src/http_api/mod.rs b/src/http_api/mod.rs index c93f2e3..ea969ab 100644 --- a/src/http_api/mod.rs +++ b/src/http_api/mod.rs @@ -13,13 +13,13 @@ use std::sync::Arc; #[derive(Debug)] struct ContextInner { - pub upload_manager: UploadManager, + pub upload_manager: Arc, pub api_secret: FStr<64>, } type Context = Arc; -pub async fn start_http_api_server(upload_manager: UploadManager, address: IpAddr, port: u16, api_secret: FStr<64>) -> Result<()> { +pub async fn start_http_api_server(upload_manager: Arc, address: IpAddr, port: u16, api_secret: FStr<64>) -> Result<()> { let router = Router::new() .nest("/uploads", create_uploads_router()) .with_state(Arc::new(ContextInner { upload_manager, api_secret })); diff --git a/src/http_api/upload/append_to_upload.rs b/src/http_api/upload/append_to_upload.rs index 584f0a8..d38b04c 100644 --- a/src/http_api/upload/append_to_upload.rs +++ b/src/http_api/upload/append_to_upload.rs @@ -2,7 +2,7 @@ use crate::http_api::Context; use crate::http_api::api_error::ApiError; use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers}; use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader}; -use crate::upload_manager::{FileReference, UnfinishedUpload, UploadId, UploadManager}; +use crate::upload_manager::{AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager}; use crate::util::acquirable::Acquisition; use axum::Json; use axum::body::{Body, BodyDataStream}; @@ -26,9 +26,10 @@ pub(super) struct AppendToUploadPathParameters { } #[derive(Debug)] -enum HandleAppendOutcome { +enum AppendToUploadOutcome { RequestSuperseded, UploadAlreadyComplete, + Failed(UploadFailureReason), UploadOffsetMismatch { expected: u64, provided: u64 }, InconsistentUploadLength { expected: u64, detail: Cow<'static, str> }, ContentStreamStoppedUnexpectedly, @@ -37,10 +38,10 @@ enum HandleAppendOutcome { UploadComplete, } -impl IntoResponse for HandleAppendOutcome { +impl IntoResponse for AppendToUploadOutcome { fn into_response(self) -> Response { match self { - HandleAppendOutcome::RequestSuperseded => ( + AppendToUploadOutcome::RequestSuperseded => ( StatusCode::CONFLICT, UploadCompleteResponseHeader(false), Json(json!({ @@ -49,7 +50,7 @@ impl IntoResponse for HandleAppendOutcome { })), ) .into_response(), - HandleAppendOutcome::UploadAlreadyComplete => ( + AppendToUploadOutcome::UploadAlreadyComplete => ( StatusCode::CONFLICT, Json(json!({ "type": "https://iana.org/assignments/http-problem-types#completed-upload", @@ -57,7 +58,16 @@ impl IntoResponse for HandleAppendOutcome { })), ) .into_response(), - HandleAppendOutcome::UploadOffsetMismatch { expected, provided } => ( + AppendToUploadOutcome::Failed(reason) => ( + StatusCode::GONE, + Json(json!({ + "type": "https://minna.media/api-problems/caos/request-superseded", + "title": "The upload was cancelled or failed.", + "reason": reason.to_string() + })), + ) + .into_response(), + AppendToUploadOutcome::UploadOffsetMismatch { expected, provided } => ( StatusCode::CONFLICT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(expected), @@ -69,7 +79,7 @@ impl IntoResponse for HandleAppendOutcome { })), ) .into_response(), - HandleAppendOutcome::InconsistentUploadLength { expected, detail } => ( + AppendToUploadOutcome::InconsistentUploadLength { expected, detail } => ( StatusCode::CONFLICT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(expected), @@ -80,7 +90,7 @@ impl IntoResponse for HandleAppendOutcome { })), ) .into_response(), - HandleAppendOutcome::ContentStreamStoppedUnexpectedly => ( + AppendToUploadOutcome::ContentStreamStoppedUnexpectedly => ( StatusCode::BAD_REQUEST, UploadCompleteResponseHeader(false), Json(json!({ @@ -89,7 +99,7 @@ impl IntoResponse for HandleAppendOutcome { })), ) .into_response(), - HandleAppendOutcome::TooMuchContent => ( + AppendToUploadOutcome::TooMuchContent => ( StatusCode::BAD_REQUEST, UploadCompleteResponseHeader(false), Json(json!({ @@ -98,14 +108,14 @@ impl IntoResponse for HandleAppendOutcome { })), ) .into_response(), - HandleAppendOutcome::UploadIncomplete { offset } => ( + AppendToUploadOutcome::UploadIncomplete { offset } => ( StatusCode::NO_CONTENT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(offset), Body::empty(), ) .into_response(), - HandleAppendOutcome::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(), + AppendToUploadOutcome::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(), } } } @@ -116,24 +126,28 @@ pub(super) async fn append_to_upload( headers: HeaderMap, request_body: Body, ) -> Result { - let parameters = parse_request_parameters(&context.upload_manager, upload_id, &headers) + let parameters = match parse_request_parameters(&context.upload_manager, upload_id, &headers) .await - .map_err(|e| (UploadCompleteResponseHeader(false), e))?; + .map_err(|e| (UploadCompleteResponseHeader(false), e))? + { + Ok(p) => p, + Err(o) => return Ok(o), + }; { let state = parameters.upload.state().read().await; if state.is_complete() { - return Ok(HandleAppendOutcome::UploadAlreadyComplete); + return Ok(AppendToUploadOutcome::UploadAlreadyComplete); } } let mut file_acquisition = if let Some(a) = parameters.upload.acquire_file().await { a } else { - return Ok(HandleAppendOutcome::RequestSuperseded); + return Ok(AppendToUploadOutcome::RequestSuperseded); }; - let outcome = do_append(&context.upload_manager, &mut file_acquisition, parameters, request_body.into_data_stream()) + let outcome = do_append(&mut file_acquisition, parameters, request_body.into_data_stream()) .await .map_err(|report| { ( @@ -155,9 +169,17 @@ struct RequestParameters { pub supplied_upload_complete: bool, } -async fn parse_request_parameters(upload_manager: &UploadManager, upload_id: UploadId, headers: &HeaderMap) -> Result { - let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id) { - upload +async fn parse_request_parameters( + upload_manager: &UploadManager, + upload_id: UploadId, + headers: &HeaderMap, +) -> Result, ApiError> { + let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id).await? { + match upload { + AnyStageUpload::Unfinished(u) => u, + AnyStageUpload::Finished => return Ok(Err(AppendToUploadOutcome::UploadAlreadyComplete)), + AnyStageUpload::Failed(reason) => return Ok(Err(AppendToUploadOutcome::Failed(reason))), + } } else { return Err(ApiError::UnknownResource { resource_type: "upload".into(), @@ -191,29 +213,35 @@ async fn parse_request_parameters(upload_manager: &UploadManager, upload_id: Upl .get_exactly_once(&upload_headers::UPLOAD_COMPLETE)? .get_boolean(&upload_headers::UPLOAD_COMPLETE)?; - Ok(RequestParameters { + Ok(Ok(RequestParameters { upload, supplied_content_length, supplied_upload_offset, supplied_upload_complete, - }) + })) } async fn do_append( - upload_manager: &UploadManager, file_acquisition: &mut Acquisition, parameters: RequestParameters, content_stream: BodyDataStream, -) -> Result { +) -> Result { + let mut upload_state = parameters.upload.state().write().await; let release_request_token = file_acquisition.release_request_token(); let mut file = file_acquisition.inner().get_or_open().await?; let total_size = parameters.upload.total_size(); let current_offset = file.stream_position().await?; + + if current_offset < upload_state.current_size() { + parameters.upload.fail(UploadFailureReason::MissingData).await?; + return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData)); + } + let remaining_content_length = total_size - current_offset; if parameters.supplied_upload_offset != current_offset { - return Ok(HandleAppendOutcome::UploadOffsetMismatch { + return Ok(AppendToUploadOutcome::UploadOffsetMismatch { expected: current_offset, provided: parameters.supplied_upload_offset, }); @@ -222,7 +250,7 @@ async fn do_append( let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length { if parameters.supplied_upload_complete { if remaining_content_length != supplied_content_length { - return Ok(HandleAppendOutcome::InconsistentUploadLength { + return Ok(AppendToUploadOutcome::InconsistentUploadLength { expected: total_size, detail: "Upload-Complete is set to true, and Content-Length is set, \ but the value of Content-Length does not equal the length of the remaining content." @@ -231,7 +259,7 @@ async fn do_append( } } else { if supplied_content_length >= remaining_content_length { - return Ok(HandleAppendOutcome::InconsistentUploadLength { + return Ok(AppendToUploadOutcome::InconsistentUploadLength { expected: total_size, detail: "Upload-Complete is set to false, and Content-Length is set, \ but the value of Content-Length is not smaller than the length of the remaining content." @@ -260,7 +288,6 @@ async fn do_append( file.sync_all().await?; let new_size = file.stream_position().await?; - let mut upload_state = parameters.upload.state().write().await; upload_state.set_current_size(new_size); let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome { @@ -272,25 +299,25 @@ async fn do_append( if is_upload_complete { upload_state.set_complete(); parameters.upload.save_to_database(&upload_state).await?; - upload_manager.queue_upload_for_processing(Arc::clone(¶meters.upload)).await; + parameters.upload.enqueue_for_processing(&upload_state).await; } else { parameters.upload.save_to_database(&upload_state).await?; } Ok(if let Some(outcome) = outcome { match outcome { - StreamToFileOutcome::StoppedUnexpectedly => HandleAppendOutcome::ContentStreamStoppedUnexpectedly, - StreamToFileOutcome::TooMuchContent => HandleAppendOutcome::TooMuchContent, + StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly, + StreamToFileOutcome::TooMuchContent => AppendToUploadOutcome::TooMuchContent, StreamToFileOutcome::Success => { if is_upload_complete { - HandleAppendOutcome::UploadComplete + AppendToUploadOutcome::UploadComplete } else { - HandleAppendOutcome::UploadIncomplete { offset: new_size } + AppendToUploadOutcome::UploadIncomplete { offset: new_size } } } } } else { - HandleAppendOutcome::RequestSuperseded + AppendToUploadOutcome::RequestSuperseded }) } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 56d395c..e528767 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -1,14 +1,16 @@ use crate::processing_worker::do_processing_work; use crate::util::acquirable::{Acquirable, Acquisition}; -use crate::util::file_reference::FileReference; +pub(crate) use crate::util::file_reference::FileReference; use crate::util::id::generate_id; use camino::Utf8PathBuf; -use color_eyre::Result; +use color_eyre::{Report, Result}; use dashmap::DashMap; use fstr::FStr; -use sqlx::SqlitePool; +use sqlx::{Row, SqlitePool}; use std::fmt::Debug; -use std::sync::Arc; +use std::str::FromStr; +use std::sync::{Arc, Weak}; +use strum::{Display, EnumString, IntoStaticStr}; use tokio::fs::{File, OpenOptions}; use tokio::sync::RwLock; @@ -26,49 +28,46 @@ pub struct UploadManager { } impl UploadManager { - pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf) -> Result { + pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf) -> Result> { log::info!("Loading uploads…"); - let mut complete_uploads = Vec::new(); - let unfinished_uploads = sqlx::query!("SELECT id, current_size, total_size, is_complete FROM unfinished_uploads") - .map(|row| { - let staging_file_path = staging_directory_path.join(&row.id); - let id = UploadId::from_str_lossy(&row.id, b'_'); - let is_complete = row.is_complete != 0; - - let upload = Arc::new(UnfinishedUpload { - database: database.clone(), - id, - total_size: row.total_size as u64, - state: RwLock::new(UnfinishedUploadState { - current_size: row.current_size as u64, - is_complete, - }), - file: Acquirable::new(FileReference::new(staging_file_path)), - }); - - if is_complete { - complete_uploads.push(Arc::clone(&upload)); - } - - (id, upload) - }) - .fetch_all(&database) - .await?; let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel(); let (large_file_processing_tasks_sender, large_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel(); - let manager = UploadManager { + let manager = Arc::new(UploadManager { database: database.clone(), - staging_directory_path, - unfinished_uploads: DashMap::from_iter(unfinished_uploads.into_iter()), + staging_directory_path: staging_directory_path.clone(), + unfinished_uploads: DashMap::new(), small_file_processing_tasks_sender, large_file_processing_tasks_sender, - }; + }); - log::info!("Found {} unprocessed upload(s).", complete_uploads.len()); - for upload in complete_uploads { - manager.queue_upload_for_processing(upload).await; + let unfinished_upload_rows = sqlx::query!("SELECT id, current_size, total_size, is_complete FROM unfinished_uploads") + .fetch_all(&database) + .await?; + + for row in unfinished_upload_rows { + let staging_file_path = staging_directory_path.join(&row.id); + let id = UploadId::from_str_lossy(&row.id, b'_'); + let is_complete = row.is_complete != 0; + + let upload = Arc::new(UnfinishedUpload { + manager: Arc::downgrade(&manager), + id, + total_size: row.total_size as u64, + state: RwLock::new(UnfinishedUploadState { + current_size: row.current_size as u64, + is_complete, + }), + file: Acquirable::new(FileReference::new(staging_file_path)), + }); + + if is_complete { + let state = upload.state.read().await; + upload.enqueue_for_processing(&state).await; + } + + manager.unfinished_uploads.insert(id, upload); } log::info!("Starting upload processing…"); @@ -78,7 +77,7 @@ impl UploadManager { Ok(manager) } - pub async fn create_upload(&self, total_size: u64) -> Result> { + pub async fn create_upload(self: &Arc, total_size: u64) -> Result> { let id: UploadId = generate_id(); { @@ -94,7 +93,7 @@ impl UploadManager { } let upload = Arc::new(UnfinishedUpload { - database: self.database.clone(), + manager: Arc::downgrade(&self), id, total_size, state: RwLock::new(UnfinishedUploadState { @@ -109,31 +108,36 @@ impl UploadManager { Ok(upload) } - pub async fn queue_upload_for_processing(&self, upload: Arc) { - { - let metadata = upload.state.read().await; - assert!(metadata.is_complete); - } - - if upload.total_size <= LARGE_FILE_SIZE_THRESHOLD { - self.small_file_processing_tasks_sender.send(upload).unwrap() + pub async fn get_upload_by_id(&self, id: &str) -> Result, Report> { + if let Some(unfinished_uploads) = self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) { + Ok(Some(AnyStageUpload::Unfinished(unfinished_uploads))) } else { - self.large_file_processing_tasks_sender.send(upload).unwrap() + Ok(sqlx::query!( + "SELECT reason FROM (SELECT id, '' AS reason FROM finished_uploads UNION SELECT id, reason FROM failed_uploads) WHERE id = ?", + id + ) + .map(|row| { + if row.reason.is_empty() { + AnyStageUpload::Finished + } else { + AnyStageUpload::Failed(UploadFailureReason::from_str(&row.reason).unwrap()) + } + }) + .fetch_optional(&self.database) + .await?) } } +} - pub fn get_upload_by_id(&self, id: &str) -> Option> { - self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) - } - - pub fn remove_failed_upload(&self, id: &UploadId) { - self.unfinished_uploads.remove(id); - } +pub enum AnyStageUpload { + Unfinished(Arc), + Finished, + Failed(UploadFailureReason), } #[derive(Debug)] pub struct UnfinishedUpload { - database: SqlitePool, + manager: Weak, id: UploadId, total_size: u64, state: RwLock, @@ -157,18 +161,47 @@ impl UnfinishedUpload { self.file.acquire().await } - pub async fn save_to_database(&self, state: &UnfinishedUploadState) -> Result<(), sqlx::Error> { + pub async fn enqueue_for_processing(self: &Arc, state: &UnfinishedUploadState) { + let manager = self.manager.upgrade().unwrap(); + assert!(state.is_complete); + + if self.total_size <= LARGE_FILE_SIZE_THRESHOLD { + manager.small_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap() + } else { + manager.large_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap() + } + } + + pub async fn save_to_database(&self, state: &UnfinishedUploadState) -> Result<(), Report> { let id = self.id.to_string(); let current_size = state.current_size() as i64; sqlx::query!("UPDATE unfinished_uploads SET current_size = ? WHERE id = ?", current_size, id) - .execute(&self.database) + .execute(&self.manager.upgrade().unwrap().database) .await?; Ok(()) } - pub async fn fail(&self, reason: UploadFailureReason) {} + pub async fn fail(&self, reason: UploadFailureReason) -> Result<(), Report> { + let manager = self.manager.upgrade().unwrap(); + manager.unfinished_uploads.remove(&self.id); + + let mut tx = manager.database.begin().await?; + + let id = self.id.to_string(); + let reason = reason.to_string(); + + sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await?; + + sqlx::query!("INSERT INTO failed_uploads (id, reason) VALUES (?, ?)", id, reason) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(()) + } } #[derive(Debug)] @@ -196,7 +229,8 @@ impl UnfinishedUploadState { } } -#[derive(Debug)] +#[derive(Debug, EnumString, Display)] +#[strum(serialize_all = "snake_case")] pub enum UploadFailureReason { CancelledByClient, Expired,