WIP: v1.0.0

This commit is contained in:
Moritz Ruth 2025-04-18 00:08:36 +02:00
parent e5208d2e5f
commit 12d38fb1b0
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
7 changed files with 129 additions and 107 deletions

View file

@ -37,7 +37,7 @@ application.
- app to caos: `POST /uploads` returns `{upload_ìd}` - app to caos: `POST /uploads` returns `{upload_ìd}`
- client to caos: `PATCH /uploads/{upload_id}` with upload data - client to caos: `PATCH /uploads/{upload_id}` with upload data
- app to caos: `GET /uploads/{upload_id}?wait_until=finished`, returns metadata (including `{hash}`) as soon as the upload is finished. - app to caos: `GET /uploads/{upload_id}`, returns metadata (including `{hash}`) as soon as the upload is finished.
- app to caos: `POST /uploads/{upload_id}/accept` with target bucket IDs - app to caos: `POST /uploads/{upload_id}/accept` with target bucket IDs

View file

@ -36,7 +36,7 @@ fn validate_buckets(buckets: &Vec<ConfigBucket>) -> Result<(), ValidationError>
Ok(()) Ok(())
} }
static BUCKET_ID_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new(r"^(a-zA-z0-9_)+$").unwrap()); static BUCKET_ID_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[a-zA-z0-9_]+$").unwrap());
#[derive(Debug, Serialize, Deserialize, Validate)] #[derive(Debug, Serialize, Deserialize, Validate)]
pub struct ConfigBucket { pub struct ConfigBucket {

View file

@ -2,7 +2,7 @@ use crate::http_api::Context;
use crate::http_api::api_error::{ApiError, ProblemJson}; use crate::http_api::api_error::{ApiError, ProblemJson};
use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers}; use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers};
use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader}; use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader};
use crate::upload_manager::{AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager}; use crate::upload_manager::{AcquiredUnfinishedUpload, AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager};
use crate::util::acquirable::Acquisition; use crate::util::acquirable::Acquisition;
use axum::body::{Body, BodyDataStream}; use axum::body::{Body, BodyDataStream};
use axum::extract::{Path, State}; use axum::extract::{Path, State};
@ -133,20 +133,27 @@ pub(super) async fn append_to_upload(
Err(o) => return Ok(o), Err(o) => return Ok(o),
}; };
let mut file_acquisition = if let Some(a) = parameters.upload.acquire_file().await { let mut upload_acquisition = if let Some(a) = parameters.upload.acquire().await {
a a
} else { } else {
return Ok(AppendToUploadOutcome::RequestSuperseded); return Ok(AppendToUploadOutcome::RequestSuperseded);
}; };
{ if let Some(supplied_upload_length) = parameters.supplied_upload_length {
let state = parameters.upload.state().read().await; let expected = upload_acquisition.inner().upload().total_size();
if state.is_complete() { if supplied_upload_length != expected {
return Ok(AppendToUploadOutcome::UploadAlreadyComplete); return Ok(AppendToUploadOutcome::InconsistentUploadLength {
expected,
detail: format!("Upload-Length is set to {supplied_upload_length}, but the actual length is {expected}.").into(),
});
} }
} }
let outcome = do_append(&mut file_acquisition, parameters, request_body.into_data_stream()) if parameters.upload.is_complete() {
return Ok(AppendToUploadOutcome::UploadAlreadyComplete);
}
let outcome = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream())
.await .await
.map_err(|report| { .map_err(|report| {
( (
@ -157,7 +164,24 @@ pub(super) async fn append_to_upload(
) )
})?; })?;
file_acquisition.release().await; match &outcome {
AppendToUploadOutcome::UploadComplete => {
upload_acquisition
.complete()
.await
.map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?;
}
AppendToUploadOutcome::Failed(reason) => {
upload_acquisition
.fail(*reason)
.await
.map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?;
}
_ => {
upload_acquisition.release().await;
}
}
Ok(outcome) Ok(outcome)
} }
@ -228,23 +252,23 @@ async fn parse_request_parameters(
} }
async fn do_append( async fn do_append(
file_acquisition: &mut Acquisition<FileReference>, upload_acquisition: &mut Acquisition<AcquiredUnfinishedUpload>,
parameters: RequestParameters, parameters: RequestParameters,
content_stream: BodyDataStream, content_stream: BodyDataStream,
) -> Result<AppendToUploadOutcome, Report> { ) -> Result<AppendToUploadOutcome, Report> {
let mut upload_state = parameters.upload.state().write().await; let release_request_token = upload_acquisition.release_request_token();
let release_request_token = file_acquisition.release_request_token(); let acquired_upload = upload_acquisition.inner();
let mut file = file_acquisition.inner().get_or_open(true).await?; let mut file = acquired_upload.file().get_or_open(true).await?;
let total_size = parameters.upload.total_size(); let total_size = parameters.upload.total_size();
let current_offset = file.stream_position().await?; let current_offset = file.stream_position().await?;
if current_offset < upload_state.current_size() { if current_offset < parameters.upload.current_size() {
log::error!( log::error!(
"The upload ({}) failed because the file contains less data than expected.", "The upload ({}) failed because the file contains less data than expected.",
parameters.upload.id() parameters.upload.id()
); );
parameters.upload.fail(UploadFailureReason::MissingData).await?;
return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData)); return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData));
} }
@ -298,7 +322,7 @@ async fn do_append(
file.sync_all().await?; file.sync_all().await?;
let new_size = file.stream_position().await?; let new_size = file.stream_position().await?;
upload_state.set_current_size(new_size); acquired_upload.set_current_size(new_size).await?;
let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome { let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome {
parameters.supplied_upload_complete parameters.supplied_upload_complete
@ -306,14 +330,6 @@ async fn do_append(
false false
}; };
if is_upload_complete {
upload_state.set_complete();
parameters.upload.save_to_database(&upload_state).await?;
parameters.upload.enqueue_for_processing(&upload_state).await;
} else {
parameters.upload.save_to_database(&upload_state).await?;
}
Ok(if let Some(outcome) = outcome { Ok(if let Some(outcome) = outcome {
match outcome { match outcome {
StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly, StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly,

View file

@ -150,13 +150,11 @@ async fn get_upload_metadata(
id: upload_id.to_string().into_boxed_str(), id: upload_id.to_string().into_boxed_str(),
state: match upload { state: match upload {
AnyStageUpload::Unfinished(upload) => { AnyStageUpload::Unfinished(upload) => {
let state = upload.state().read().await; if upload.is_complete() {
if state.is_complete() {
GetUploadMetadataResponseState::Complete { size: upload.total_size() } GetUploadMetadataResponseState::Complete { size: upload.total_size() }
} else { } else {
GetUploadMetadataResponseState::Ongoing { GetUploadMetadataResponseState::Ongoing {
current_size: state.current_size(), current_size: upload.current_size(),
total_size: upload.total_size(), total_size: upload.total_size(),
} }
} }

View file

@ -1,5 +1,4 @@
use crate::upload_manager::{FileReference, UnfinishedUpload}; use crate::upload_manager::{AcquiredUnfinishedUpload, UnfinishedUpload};
use crate::util::acquirable::Acquisition;
use crate::util::hash_to_hex_string::HashExt; use crate::util::hash_to_hex_string::HashExt;
use crate::util::temporal_formatting::TemporalFormatting; use crate::util::temporal_formatting::TemporalFormatting;
use blake3::Hasher; use blake3::Hasher;
@ -7,24 +6,19 @@ use file_type::FileType;
use fstr::FStr; use fstr::FStr;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::sync::Arc;
use temporal_rs::Now; use temporal_rs::Now;
use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub async fn do_processing_work( pub async fn do_processing_work(
mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver<Arc<UnfinishedUpload>>, mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver<AcquiredUnfinishedUpload>,
database: SqlitePool, database: SqlitePool,
enable_multithreaded_hashing: bool, enable_multithreaded_hashing: bool,
) { ) {
while let Some(upload) = tasks_receiver.recv().await { while let Some(mut acquired_upload) = tasks_receiver.recv().await {
let mut file_acquisition = upload match process(enable_multithreaded_hashing, &mut acquired_upload).await {
.acquire_file()
.await
.expect("When an upload is marked as complete, requests no longer acquire the file.");
match process(enable_multithreaded_hashing, &mut file_acquisition).await {
Ok(outcome) => { Ok(outcome) => {
let mut tx = database.begin().await.unwrap(); let mut tx = database.begin().await.unwrap();
let upload = acquired_upload.upload();
let id = upload.id().as_str(); let id = upload.id().as_str();
let hash = outcome.hash.as_str(); let hash = outcome.hash.as_str();
@ -59,11 +53,10 @@ pub async fn do_processing_work(
log::debug!("Successfully processed upload ({}): {:?}", id, outcome); log::debug!("Successfully processed upload ({}): {:?}", id, outcome);
} }
Err(report) => { Err(report) => {
let upload = acquired_upload.upload();
log::error!("Error during upload processing ({}): {:#}", upload.id(), report); log::error!("Error during upload processing ({}): {:#}", upload.id(), report);
} }
} }
file_acquisition.destroy().await;
} }
} }
@ -73,8 +66,8 @@ struct ProcessingOutcome {
media_type: &'static str, media_type: &'static str,
} }
async fn process(enable_multithreaded_hashing: bool, file_acquisition: &mut Acquisition<FileReference>) -> Result<ProcessingOutcome, std::io::Error> { async fn process(enable_multithreaded_hashing: bool, acquired_upload: &mut AcquiredUnfinishedUpload) -> Result<ProcessingOutcome, std::io::Error> {
let file_reference = file_acquisition.inner(); let file_reference = acquired_upload.file();
let path = file_reference.path().to_owned(); let path = file_reference.path().to_owned();
let file = file_reference.get_or_open(false).await?; let file = file_reference.get_or_open(false).await?;
file.seek(SeekFrom::Start(0)).await?; file.seek(SeekFrom::Start(0)).await?;

View file

@ -10,6 +10,7 @@ use serde::Serialize;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use std::fmt::Debug; use std::fmt::Debug;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use strum::{Display, EnumString}; use strum::{Display, EnumString};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -23,8 +24,8 @@ pub struct UploadManager {
database: SqlitePool, database: SqlitePool,
staging_directory_path: Utf8PathBuf, staging_directory_path: Utf8PathBuf,
unfinished_uploads: DashMap<UploadId, Arc<UnfinishedUpload>>, unfinished_uploads: DashMap<UploadId, Arc<UnfinishedUpload>>,
small_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender<Arc<UnfinishedUpload>>, small_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender<AcquiredUnfinishedUpload>,
large_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender<Arc<UnfinishedUpload>>, large_file_processing_tasks_sender: tokio::sync::mpsc::UnboundedSender<AcquiredUnfinishedUpload>,
} }
impl UploadManager { impl UploadManager {
@ -51,20 +52,20 @@ impl UploadManager {
let id = UploadId::from_str_lossy(&row.id, b'_'); let id = UploadId::from_str_lossy(&row.id, b'_');
let is_complete = row.is_complete != 0; let is_complete = row.is_complete != 0;
let upload = Arc::new(UnfinishedUpload { let upload = Arc::new_cyclic(|upload| UnfinishedUpload {
manager: Arc::downgrade(&manager), manager: Arc::downgrade(&manager),
id, id,
total_size: row.total_size as u64, total_size: row.total_size as u64,
state: RwLock::new(UnfinishedUploadState { current_size: AtomicU64::new(row.current_size as u64),
current_size: row.current_size as u64, is_complete: AtomicBool::new(false),
is_complete, acquirable: Acquirable::new(AcquiredUnfinishedUpload {
upload: upload.to_owned(),
file: FileReference::new(staging_file_path),
}), }),
file: Acquirable::new(FileReference::new(staging_file_path)),
}); });
if is_complete { if is_complete {
let state = upload.state.read().await; upload.acquire().await.unwrap().complete().await;
upload.enqueue_for_processing(&state).await;
} }
manager.unfinished_uploads.insert(id, upload); manager.unfinished_uploads.insert(id, upload);
@ -96,15 +97,16 @@ impl UploadManager {
.await?; .await?;
} }
let upload = Arc::new(UnfinishedUpload { let upload = Arc::new_cyclic(|upload| UnfinishedUpload {
manager: Arc::downgrade(&self), manager: Arc::downgrade(&self),
id, id,
total_size, total_size,
state: RwLock::new(UnfinishedUploadState { current_size: AtomicU64::new(0),
current_size: 0, is_complete: AtomicBool::new(false),
is_complete: false, acquirable: Acquirable::new(AcquiredUnfinishedUpload {
upload: upload.to_owned(),
file: FileReference::new(self.staging_directory_path.join(id.as_str())),
}), }),
file: Acquirable::new(FileReference::new(self.staging_directory_path.join(id.as_str()))),
}); });
self.unfinished_uploads.insert(id, Arc::clone(&upload)); self.unfinished_uploads.insert(id, Arc::clone(&upload));
@ -150,8 +152,15 @@ pub struct UnfinishedUpload {
manager: Weak<UploadManager>, manager: Weak<UploadManager>,
id: UploadId, id: UploadId,
total_size: u64, total_size: u64,
state: RwLock<UnfinishedUploadState>, current_size: AtomicU64,
file: Acquirable<FileReference>, is_complete: AtomicBool,
acquirable: Acquirable<AcquiredUnfinishedUpload>,
}
#[derive(Debug)]
pub struct AcquiredUnfinishedUpload {
upload: Weak<UnfinishedUpload>,
file: FileReference,
} }
impl UnfinishedUpload { impl UnfinishedUpload {
@ -163,43 +172,73 @@ impl UnfinishedUpload {
self.total_size self.total_size
} }
pub fn state(&self) -> &RwLock<UnfinishedUploadState> { pub fn current_size(&self) -> u64 {
&self.state self.current_size.load(Ordering::Relaxed)
} }
pub async fn acquire_file(&self) -> Option<Acquisition<FileReference>> { pub fn is_complete(&self) -> bool {
self.file.acquire().await self.is_complete.load(Ordering::Relaxed)
} }
pub async fn enqueue_for_processing(self: &Arc<Self>, state: &UnfinishedUploadState) { pub async fn acquire(&self) -> Option<Acquisition<AcquiredUnfinishedUpload>> {
let manager = self.manager.upgrade().unwrap(); self.acquirable.acquire().await
assert!(state.is_complete); }
}
if self.total_size <= LARGE_FILE_SIZE_THRESHOLD { impl AcquiredUnfinishedUpload {
manager.small_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap() pub fn upload(&self) -> Arc<UnfinishedUpload> {
} else { self.upload.upgrade().unwrap()
manager.large_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap()
}
} }
pub async fn save_to_database(&self, state: &UnfinishedUploadState) -> Result<(), Report> { pub fn file(&mut self) -> &mut FileReference {
let id = self.id.to_string(); &mut self.file
let current_size = state.current_size() as i64; }
pub async fn set_current_size(&self, current_size: u64) -> Result<(), Report> {
let upload = self.upload.upgrade().unwrap();
let manager = upload.manager.upgrade().unwrap();
upload.current_size.store(current_size, Ordering::Relaxed);
let id = upload.id.to_string();
let current_size = current_size as i64;
sqlx::query!("UPDATE unfinished_uploads SET current_size = ? WHERE id = ?", current_size, id) sqlx::query!("UPDATE unfinished_uploads SET current_size = ? WHERE id = ?", current_size, id)
.execute(&self.manager.upgrade().unwrap().database) .execute(&manager.database)
.await?; .await?;
Ok(()) Ok(())
} }
}
pub async fn fail(&self, reason: UploadFailureReason) -> Result<(), Report> { impl Acquisition<AcquiredUnfinishedUpload> {
let manager = self.manager.upgrade().unwrap(); pub async fn complete(self) -> Result<(), Report> {
manager.unfinished_uploads.remove(&self.id); let inner = self.destroy().await;
let upload = inner.upload.upgrade().unwrap();
let manager = upload.manager.upgrade().unwrap();
upload.is_complete.store(true, Ordering::Relaxed);
let id = upload.id.to_string();
sqlx::query!("UPDATE unfinished_uploads SET is_complete = 1 WHERE id = ?", id)
.execute(&manager.database)
.await?;
if upload.total_size <= LARGE_FILE_SIZE_THRESHOLD {
manager.small_file_processing_tasks_sender.send(inner).unwrap()
} else {
manager.large_file_processing_tasks_sender.send(inner).unwrap()
}
Ok(())
}
pub async fn fail(self, reason: UploadFailureReason) -> Result<(), Report> {
let inner = self.destroy().await;
let upload = inner.upload.upgrade().unwrap();
let manager = upload.manager.upgrade().unwrap();
manager.unfinished_uploads.remove(&upload.id);
let mut tx = manager.database.begin().await?; let mut tx = manager.database.begin().await?;
let id = self.id.to_string(); let id = upload.id().to_string();
let reason = reason.to_string(); let reason = reason.to_string();
sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await?; sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await?;
@ -214,32 +253,7 @@ impl UnfinishedUpload {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone, Copy, EnumString, Display, Serialize)]
pub struct UnfinishedUploadState {
current_size: u64,
is_complete: bool,
}
impl UnfinishedUploadState {
pub fn current_size(&self) -> u64 {
self.current_size
}
pub fn is_complete(&self) -> bool {
self.is_complete
}
pub fn set_current_size(&mut self, current_size: u64) {
assert!(current_size > self.current_size, "new size is greater than current size");
self.current_size = current_size;
}
pub fn set_complete(&mut self) {
self.is_complete = true
}
}
#[derive(Debug, EnumString, Display, Serialize)]
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum UploadFailureReason { pub enum UploadFailureReason {

View file

@ -136,8 +136,9 @@ impl<T: 'static + Sync + Send> Acquisition<T> {
/// Consume the acquisition without releasing it. The corresponding Acquirable will forever stay in the acquired state. /// Consume the acquisition without releasing it. The corresponding Acquirable will forever stay in the acquired state.
/// ///
/// All outstanding calls to Acquirable::acquire will return None. /// All outstanding calls to Acquirable::acquire will return None.
pub async fn destroy(self) { pub async fn destroy(self) -> T {
let mut state = self.acquirable_state.lock().await; let mut state = self.acquirable_state.lock().await;
*state = AcquirableState::Destroyed; *state = AcquirableState::Destroyed;
self.inner
} }
} }