From 12d38fb1b093f1e3a1fe2b4a5d70f3db6dc2f9e5 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Fri, 18 Apr 2025 00:08:36 +0200 Subject: [PATCH] WIP: v1.0.0 --- README.md | 2 +- src/config.rs | 2 +- src/http_api/upload/append_to_upload.rs | 62 +++++++---- src/http_api/upload/mod.rs | 6 +- src/processing_worker.rs | 23 ++-- src/upload_manager.rs | 138 +++++++++++++----------- src/util/acquirable.rs | 3 +- 7 files changed, 129 insertions(+), 107 deletions(-) diff --git a/README.md b/README.md index 880639d..fd82cb8 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ application. - app to caos: `POST /uploads` returns `{upload_ìd}` - client to caos: `PATCH /uploads/{upload_id}` with upload data -- app to caos: `GET /uploads/{upload_id}?wait_until=finished`, returns metadata (including `{hash}`) as soon as the upload is finished. +- app to caos: `GET /uploads/{upload_id}`, returns metadata (including `{hash}`) as soon as the upload is finished. - app to caos: `POST /uploads/{upload_id}/accept` with target bucket IDs diff --git a/src/config.rs b/src/config.rs index fe1156d..8fb647f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -36,7 +36,7 @@ fn validate_buckets(buckets: &Vec) -> Result<(), ValidationError> Ok(()) } -static BUCKET_ID_PATTERN: Lazy = Lazy::new(|| Regex::new(r"^(a-zA-z0-9_)+$").unwrap()); +static BUCKET_ID_PATTERN: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-z0-9_]+$").unwrap()); #[derive(Debug, Serialize, Deserialize, Validate)] pub struct ConfigBucket { diff --git a/src/http_api/upload/append_to_upload.rs b/src/http_api/upload/append_to_upload.rs index abf9c86..97efbfb 100644 --- a/src/http_api/upload/append_to_upload.rs +++ b/src/http_api/upload/append_to_upload.rs @@ -2,7 +2,7 @@ use crate::http_api::Context; use crate::http_api::api_error::{ApiError, ProblemJson}; use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers}; use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader}; -use crate::upload_manager::{AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager}; +use crate::upload_manager::{AcquiredUnfinishedUpload, AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager}; use crate::util::acquirable::Acquisition; use axum::body::{Body, BodyDataStream}; use axum::extract::{Path, State}; @@ -133,20 +133,27 @@ pub(super) async fn append_to_upload( Err(o) => return Ok(o), }; - let mut file_acquisition = if let Some(a) = parameters.upload.acquire_file().await { + let mut upload_acquisition = if let Some(a) = parameters.upload.acquire().await { a } else { return Ok(AppendToUploadOutcome::RequestSuperseded); }; - { - let state = parameters.upload.state().read().await; - if state.is_complete() { - return Ok(AppendToUploadOutcome::UploadAlreadyComplete); + if let Some(supplied_upload_length) = parameters.supplied_upload_length { + let expected = upload_acquisition.inner().upload().total_size(); + if supplied_upload_length != expected { + return Ok(AppendToUploadOutcome::InconsistentUploadLength { + expected, + detail: format!("Upload-Length is set to {supplied_upload_length}, but the actual length is {expected}.").into(), + }); } } - let outcome = do_append(&mut file_acquisition, parameters, request_body.into_data_stream()) + if parameters.upload.is_complete() { + return Ok(AppendToUploadOutcome::UploadAlreadyComplete); + } + + let outcome = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream()) .await .map_err(|report| { ( @@ -157,7 +164,24 @@ pub(super) async fn append_to_upload( ) })?; - file_acquisition.release().await; + match &outcome { + AppendToUploadOutcome::UploadComplete => { + upload_acquisition + .complete() + .await + .map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?; + } + AppendToUploadOutcome::Failed(reason) => { + upload_acquisition + .fail(*reason) + .await + .map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?; + } + _ => { + upload_acquisition.release().await; + } + } + Ok(outcome) } @@ -228,23 +252,23 @@ async fn parse_request_parameters( } async fn do_append( - file_acquisition: &mut Acquisition, + upload_acquisition: &mut Acquisition, parameters: RequestParameters, content_stream: BodyDataStream, ) -> Result { - let mut upload_state = parameters.upload.state().write().await; - let release_request_token = file_acquisition.release_request_token(); - let mut file = file_acquisition.inner().get_or_open(true).await?; + let release_request_token = upload_acquisition.release_request_token(); + let acquired_upload = upload_acquisition.inner(); + let mut file = acquired_upload.file().get_or_open(true).await?; let total_size = parameters.upload.total_size(); let current_offset = file.stream_position().await?; - if current_offset < upload_state.current_size() { + if current_offset < parameters.upload.current_size() { log::error!( "The upload ({}) failed because the file contains less data than expected.", parameters.upload.id() ); - parameters.upload.fail(UploadFailureReason::MissingData).await?; + return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData)); } @@ -298,7 +322,7 @@ async fn do_append( file.sync_all().await?; let new_size = file.stream_position().await?; - upload_state.set_current_size(new_size); + acquired_upload.set_current_size(new_size).await?; let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome { parameters.supplied_upload_complete @@ -306,14 +330,6 @@ async fn do_append( false }; - if is_upload_complete { - upload_state.set_complete(); - parameters.upload.save_to_database(&upload_state).await?; - parameters.upload.enqueue_for_processing(&upload_state).await; - } else { - parameters.upload.save_to_database(&upload_state).await?; - } - Ok(if let Some(outcome) = outcome { match outcome { StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly, diff --git a/src/http_api/upload/mod.rs b/src/http_api/upload/mod.rs index c70c20b..d72d01b 100644 --- a/src/http_api/upload/mod.rs +++ b/src/http_api/upload/mod.rs @@ -150,13 +150,11 @@ async fn get_upload_metadata( id: upload_id.to_string().into_boxed_str(), state: match upload { AnyStageUpload::Unfinished(upload) => { - let state = upload.state().read().await; - - if state.is_complete() { + if upload.is_complete() { GetUploadMetadataResponseState::Complete { size: upload.total_size() } } else { GetUploadMetadataResponseState::Ongoing { - current_size: state.current_size(), + current_size: upload.current_size(), total_size: upload.total_size(), } } diff --git a/src/processing_worker.rs b/src/processing_worker.rs index 21c6888..05ccbc8 100644 --- a/src/processing_worker.rs +++ b/src/processing_worker.rs @@ -1,5 +1,4 @@ -use crate::upload_manager::{FileReference, UnfinishedUpload}; -use crate::util::acquirable::Acquisition; +use crate::upload_manager::{AcquiredUnfinishedUpload, UnfinishedUpload}; use crate::util::hash_to_hex_string::HashExt; use crate::util::temporal_formatting::TemporalFormatting; use blake3::Hasher; @@ -7,24 +6,19 @@ use file_type::FileType; use fstr::FStr; use sqlx::SqlitePool; use std::io::SeekFrom; -use std::sync::Arc; use temporal_rs::Now; use tokio::io::{AsyncReadExt, AsyncSeekExt}; pub async fn do_processing_work( - mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver>, + mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver, database: SqlitePool, enable_multithreaded_hashing: bool, ) { - while let Some(upload) = tasks_receiver.recv().await { - let mut file_acquisition = upload - .acquire_file() - .await - .expect("When an upload is marked as complete, requests no longer acquire the file."); - - match process(enable_multithreaded_hashing, &mut file_acquisition).await { + while let Some(mut acquired_upload) = tasks_receiver.recv().await { + match process(enable_multithreaded_hashing, &mut acquired_upload).await { Ok(outcome) => { let mut tx = database.begin().await.unwrap(); + let upload = acquired_upload.upload(); let id = upload.id().as_str(); let hash = outcome.hash.as_str(); @@ -59,11 +53,10 @@ pub async fn do_processing_work( log::debug!("Successfully processed upload ({}): {:?}", id, outcome); } Err(report) => { + let upload = acquired_upload.upload(); log::error!("Error during upload processing ({}): {:#}", upload.id(), report); } } - - file_acquisition.destroy().await; } } @@ -73,8 +66,8 @@ struct ProcessingOutcome { media_type: &'static str, } -async fn process(enable_multithreaded_hashing: bool, file_acquisition: &mut Acquisition) -> Result { - let file_reference = file_acquisition.inner(); +async fn process(enable_multithreaded_hashing: bool, acquired_upload: &mut AcquiredUnfinishedUpload) -> Result { + let file_reference = acquired_upload.file(); let path = file_reference.path().to_owned(); let file = file_reference.get_or_open(false).await?; file.seek(SeekFrom::Start(0)).await?; diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 93793d9..0faf9ac 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -10,6 +10,7 @@ use serde::Serialize; use sqlx::SqlitePool; use std::fmt::Debug; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use strum::{Display, EnumString}; use tokio::sync::RwLock; @@ -23,8 +24,8 @@ pub struct UploadManager { database: SqlitePool, staging_directory_path: Utf8PathBuf, unfinished_uploads: DashMap>, - small_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender>, - large_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender>, + small_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender, + large_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender, } impl UploadManager { @@ -51,20 +52,20 @@ impl UploadManager { let id = UploadId::from_str_lossy(&row.id, b'_'); let is_complete = row.is_complete != 0; - let upload = Arc::new(UnfinishedUpload { + let upload = Arc::new_cyclic(|upload| UnfinishedUpload { manager: Arc::downgrade(&manager), id, total_size: row.total_size as u64, - state: RwLock::new(UnfinishedUploadState { - current_size: row.current_size as u64, - is_complete, + current_size: AtomicU64::new(row.current_size as u64), + is_complete: AtomicBool::new(false), + acquirable: Acquirable::new(AcquiredUnfinishedUpload { + upload: upload.to_owned(), + file: FileReference::new(staging_file_path), }), - file: Acquirable::new(FileReference::new(staging_file_path)), }); if is_complete { - let state = upload.state.read().await; - upload.enqueue_for_processing(&state).await; + upload.acquire().await.unwrap().complete().await; } manager.unfinished_uploads.insert(id, upload); @@ -96,15 +97,16 @@ impl UploadManager { .await?; } - let upload = Arc::new(UnfinishedUpload { + let upload = Arc::new_cyclic(|upload| UnfinishedUpload { manager: Arc::downgrade(&self), id, total_size, - state: RwLock::new(UnfinishedUploadState { - current_size: 0, - is_complete: false, + current_size: AtomicU64::new(0), + is_complete: AtomicBool::new(false), + acquirable: Acquirable::new(AcquiredUnfinishedUpload { + upload: upload.to_owned(), + file: FileReference::new(self.staging_directory_path.join(id.as_str())), }), - file: Acquirable::new(FileReference::new(self.staging_directory_path.join(id.as_str()))), }); self.unfinished_uploads.insert(id, Arc::clone(&upload)); @@ -150,8 +152,15 @@ pub struct UnfinishedUpload { manager: Weak, id: UploadId, total_size: u64, - state: RwLock, - file: Acquirable, + current_size: AtomicU64, + is_complete: AtomicBool, + acquirable: Acquirable, +} + +#[derive(Debug)] +pub struct AcquiredUnfinishedUpload { + upload: Weak, + file: FileReference, } impl UnfinishedUpload { @@ -163,43 +172,73 @@ impl UnfinishedUpload { self.total_size } - pub fn state(&self) -> &RwLock { - &self.state + pub fn current_size(&self) -> u64 { + self.current_size.load(Ordering::Relaxed) } - pub async fn acquire_file(&self) -> Option> { - self.file.acquire().await + pub fn is_complete(&self) -> bool { + self.is_complete.load(Ordering::Relaxed) } - pub async fn enqueue_for_processing(self: &Arc, state: &UnfinishedUploadState) { - let manager = self.manager.upgrade().unwrap(); - assert!(state.is_complete); + pub async fn acquire(&self) -> Option> { + self.acquirable.acquire().await + } +} - if self.total_size <= LARGE_FILE_SIZE_THRESHOLD { - manager.small_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap() - } else { - manager.large_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap() - } +impl AcquiredUnfinishedUpload { + pub fn upload(&self) -> Arc { + self.upload.upgrade().unwrap() } - pub async fn save_to_database(&self, state: &UnfinishedUploadState) -> Result<(), Report> { - let id = self.id.to_string(); - let current_size = state.current_size() as i64; + pub fn file(&mut self) -> &mut FileReference { + &mut self.file + } + + pub async fn set_current_size(&self, current_size: u64) -> Result<(), Report> { + let upload = self.upload.upgrade().unwrap(); + let manager = upload.manager.upgrade().unwrap(); + upload.current_size.store(current_size, Ordering::Relaxed); + let id = upload.id.to_string(); + let current_size = current_size as i64; sqlx::query!("UPDATE unfinished_uploads SET current_size = ? WHERE id = ?", current_size, id) - .execute(&self.manager.upgrade().unwrap().database) + .execute(&manager.database) .await?; Ok(()) } +} - pub async fn fail(&self, reason: UploadFailureReason) -> Result<(), Report> { - let manager = self.manager.upgrade().unwrap(); - manager.unfinished_uploads.remove(&self.id); +impl Acquisition { + pub async fn complete(self) -> Result<(), Report> { + let inner = self.destroy().await; + let upload = inner.upload.upgrade().unwrap(); + let manager = upload.manager.upgrade().unwrap(); + upload.is_complete.store(true, Ordering::Relaxed); + + let id = upload.id.to_string(); + sqlx::query!("UPDATE unfinished_uploads SET is_complete = 1 WHERE id = ?", id) + .execute(&manager.database) + .await?; + + if upload.total_size <= LARGE_FILE_SIZE_THRESHOLD { + manager.small_file_processing_tasks_sender.send(inner).unwrap() + } else { + manager.large_file_processing_tasks_sender.send(inner).unwrap() + } + + Ok(()) + } + + pub async fn fail(self, reason: UploadFailureReason) -> Result<(), Report> { + let inner = self.destroy().await; + let upload = inner.upload.upgrade().unwrap(); + let manager = upload.manager.upgrade().unwrap(); + manager.unfinished_uploads.remove(&upload.id); let mut tx = manager.database.begin().await?; - let id = self.id.to_string(); + let id = upload.id().to_string(); let reason = reason.to_string(); sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await?; @@ -214,32 +253,7 @@ impl UnfinishedUpload { } } -#[derive(Debug)] -pub struct UnfinishedUploadState { - current_size: u64, - is_complete: bool, -} - -impl UnfinishedUploadState { - pub fn current_size(&self) -> u64 { - self.current_size - } - - pub fn is_complete(&self) -> bool { - self.is_complete - } - - pub fn set_current_size(&mut self, current_size: u64) { - assert!(current_size > self.current_size, "new size is greater than current size"); - self.current_size = current_size; - } - - pub fn set_complete(&mut self) { - self.is_complete = true - } -} - -#[derive(Debug, EnumString, Display, Serialize)] +#[derive(Debug, Clone, Copy, EnumString, Display, Serialize)] #[strum(serialize_all = "snake_case")] #[serde(rename_all = "snake_case")] pub enum UploadFailureReason { diff --git a/src/util/acquirable.rs b/src/util/acquirable.rs index a43140e..b8172ab 100644 --- a/src/util/acquirable.rs +++ b/src/util/acquirable.rs @@ -136,8 +136,9 @@ impl Acquisition { /// Consume the acquisition without releasing it. The corresponding Acquirable will forever stay in the acquired state. /// /// All outstanding calls to Acquirable::acquire will return None. - pub async fn destroy(self) { + pub async fn destroy(self) -> T { let mut state = self.acquirable_state.lock().await; *state = AcquirableState::Destroyed; + self.inner } }