WIP: v1.0.0

This commit is contained in:
Moritz Ruth 2025-04-03 18:54:16 +02:00
parent c2d7a1aba7
commit 96a8ea72f1
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
5 changed files with 182 additions and 99 deletions

24
Cargo.lock generated
View file

@ -1332,7 +1332,6 @@ dependencies = [
name = "minna_caos"
version = "0.1.0"
dependencies = [
"async-trait",
"axum",
"camino",
"color-eyre",
@ -1351,6 +1350,7 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
"strum",
"tokio",
"tokio-util",
"validator",
@ -2404,6 +2404,28 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "strum"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32"
dependencies = [
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8"
dependencies = [
"heck",
"proc-macro2",
"quote",
"rustversion",
"syn",
]
[[package]]
name = "subtle"
version = "2.6.1"

View file

@ -26,6 +26,6 @@ camino = { version = "1.1.9", features = ["serde1"] }
dashmap = "7.0.0-rc2"
tokio-util = "0.7.14"
replace_with = "0.1.7"
async-trait = "0.1.88"
rand = "0.9.0"
futures = "0.3.31"
futures = "0.3.31"
strum = { version = "0.27.1", features = ["derive"] }

View file

@ -13,13 +13,13 @@ use std::sync::Arc;
#[derive(Debug)]
struct ContextInner {
pub upload_manager: UploadManager,
pub upload_manager: Arc<UploadManager>,
pub api_secret: FStr<64>,
}
type Context = Arc<ContextInner>;
pub async fn start_http_api_server(upload_manager: UploadManager, address: IpAddr, port: u16, api_secret: FStr<64>) -> Result<()> {
pub async fn start_http_api_server(upload_manager: Arc<UploadManager>, address: IpAddr, port: u16, api_secret: FStr<64>) -> Result<()> {
let router = Router::new()
.nest("/uploads", create_uploads_router())
.with_state(Arc::new(ContextInner { upload_manager, api_secret }));

View file

@ -2,7 +2,7 @@ use crate::http_api::Context;
use crate::http_api::api_error::ApiError;
use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers};
use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader};
use crate::upload_manager::{FileReference, UnfinishedUpload, UploadId, UploadManager};
use crate::upload_manager::{AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager};
use crate::util::acquirable::Acquisition;
use axum::Json;
use axum::body::{Body, BodyDataStream};
@ -26,9 +26,10 @@ pub(super) struct AppendToUploadPathParameters {
}
#[derive(Debug)]
enum HandleAppendOutcome {
enum AppendToUploadOutcome {
RequestSuperseded,
UploadAlreadyComplete,
Failed(UploadFailureReason),
UploadOffsetMismatch { expected: u64, provided: u64 },
InconsistentUploadLength { expected: u64, detail: Cow<'static, str> },
ContentStreamStoppedUnexpectedly,
@ -37,10 +38,10 @@ enum HandleAppendOutcome {
UploadComplete,
}
impl IntoResponse for HandleAppendOutcome {
impl IntoResponse for AppendToUploadOutcome {
fn into_response(self) -> Response {
match self {
HandleAppendOutcome::RequestSuperseded => (
AppendToUploadOutcome::RequestSuperseded => (
StatusCode::CONFLICT,
UploadCompleteResponseHeader(false),
Json(json!({
@ -49,7 +50,7 @@ impl IntoResponse for HandleAppendOutcome {
})),
)
.into_response(),
HandleAppendOutcome::UploadAlreadyComplete => (
AppendToUploadOutcome::UploadAlreadyComplete => (
StatusCode::CONFLICT,
Json(json!({
"type": "https://iana.org/assignments/http-problem-types#completed-upload",
@ -57,7 +58,16 @@ impl IntoResponse for HandleAppendOutcome {
})),
)
.into_response(),
HandleAppendOutcome::UploadOffsetMismatch { expected, provided } => (
AppendToUploadOutcome::Failed(reason) => (
StatusCode::GONE,
Json(json!({
"type": "https://minna.media/api-problems/caos/request-superseded",
"title": "The upload was cancelled or failed.",
"reason": reason.to_string()
})),
)
.into_response(),
AppendToUploadOutcome::UploadOffsetMismatch { expected, provided } => (
StatusCode::CONFLICT,
UploadCompleteResponseHeader(false),
UploadOffsetResponseHeader(expected),
@ -69,7 +79,7 @@ impl IntoResponse for HandleAppendOutcome {
})),
)
.into_response(),
HandleAppendOutcome::InconsistentUploadLength { expected, detail } => (
AppendToUploadOutcome::InconsistentUploadLength { expected, detail } => (
StatusCode::CONFLICT,
UploadCompleteResponseHeader(false),
UploadOffsetResponseHeader(expected),
@ -80,7 +90,7 @@ impl IntoResponse for HandleAppendOutcome {
})),
)
.into_response(),
HandleAppendOutcome::ContentStreamStoppedUnexpectedly => (
AppendToUploadOutcome::ContentStreamStoppedUnexpectedly => (
StatusCode::BAD_REQUEST,
UploadCompleteResponseHeader(false),
Json(json!({
@ -89,7 +99,7 @@ impl IntoResponse for HandleAppendOutcome {
})),
)
.into_response(),
HandleAppendOutcome::TooMuchContent => (
AppendToUploadOutcome::TooMuchContent => (
StatusCode::BAD_REQUEST,
UploadCompleteResponseHeader(false),
Json(json!({
@ -98,14 +108,14 @@ impl IntoResponse for HandleAppendOutcome {
})),
)
.into_response(),
HandleAppendOutcome::UploadIncomplete { offset } => (
AppendToUploadOutcome::UploadIncomplete { offset } => (
StatusCode::NO_CONTENT,
UploadCompleteResponseHeader(false),
UploadOffsetResponseHeader(offset),
Body::empty(),
)
.into_response(),
HandleAppendOutcome::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(),
AppendToUploadOutcome::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(),
}
}
}
@ -116,24 +126,28 @@ pub(super) async fn append_to_upload(
headers: HeaderMap,
request_body: Body,
) -> Result<impl IntoResponse, (UploadCompleteResponseHeader, ApiError)> {
let parameters = parse_request_parameters(&context.upload_manager, upload_id, &headers)
let parameters = match parse_request_parameters(&context.upload_manager, upload_id, &headers)
.await
.map_err(|e| (UploadCompleteResponseHeader(false), e))?;
.map_err(|e| (UploadCompleteResponseHeader(false), e))?
{
Ok(p) => p,
Err(o) => return Ok(o),
};
{
let state = parameters.upload.state().read().await;
if state.is_complete() {
return Ok(HandleAppendOutcome::UploadAlreadyComplete);
return Ok(AppendToUploadOutcome::UploadAlreadyComplete);
}
}
let mut file_acquisition = if let Some(a) = parameters.upload.acquire_file().await {
a
} else {
return Ok(HandleAppendOutcome::RequestSuperseded);
return Ok(AppendToUploadOutcome::RequestSuperseded);
};
let outcome = do_append(&context.upload_manager, &mut file_acquisition, parameters, request_body.into_data_stream())
let outcome = do_append(&mut file_acquisition, parameters, request_body.into_data_stream())
.await
.map_err(|report| {
(
@ -155,9 +169,17 @@ struct RequestParameters {
pub supplied_upload_complete: bool,
}
async fn parse_request_parameters(upload_manager: &UploadManager, upload_id: UploadId, headers: &HeaderMap) -> Result<RequestParameters, ApiError> {
let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id) {
upload
async fn parse_request_parameters(
upload_manager: &UploadManager,
upload_id: UploadId,
headers: &HeaderMap,
) -> Result<Result<RequestParameters, AppendToUploadOutcome>, ApiError> {
let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id).await? {
match upload {
AnyStageUpload::Unfinished(u) => u,
AnyStageUpload::Finished => return Ok(Err(AppendToUploadOutcome::UploadAlreadyComplete)),
AnyStageUpload::Failed(reason) => return Ok(Err(AppendToUploadOutcome::Failed(reason))),
}
} else {
return Err(ApiError::UnknownResource {
resource_type: "upload".into(),
@ -191,29 +213,35 @@ async fn parse_request_parameters(upload_manager: &UploadManager, upload_id: Upl
.get_exactly_once(&upload_headers::UPLOAD_COMPLETE)?
.get_boolean(&upload_headers::UPLOAD_COMPLETE)?;
Ok(RequestParameters {
Ok(Ok(RequestParameters {
upload,
supplied_content_length,
supplied_upload_offset,
supplied_upload_complete,
})
}))
}
async fn do_append(
upload_manager: &UploadManager,
file_acquisition: &mut Acquisition<FileReference>,
parameters: RequestParameters,
content_stream: BodyDataStream,
) -> Result<HandleAppendOutcome, Report> {
) -> Result<AppendToUploadOutcome, Report> {
let mut upload_state = parameters.upload.state().write().await;
let release_request_token = file_acquisition.release_request_token();
let mut file = file_acquisition.inner().get_or_open().await?;
let total_size = parameters.upload.total_size();
let current_offset = file.stream_position().await?;
if current_offset < upload_state.current_size() {
parameters.upload.fail(UploadFailureReason::MissingData).await?;
return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData));
}
let remaining_content_length = total_size - current_offset;
if parameters.supplied_upload_offset != current_offset {
return Ok(HandleAppendOutcome::UploadOffsetMismatch {
return Ok(AppendToUploadOutcome::UploadOffsetMismatch {
expected: current_offset,
provided: parameters.supplied_upload_offset,
});
@ -222,7 +250,7 @@ async fn do_append(
let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length {
if parameters.supplied_upload_complete {
if remaining_content_length != supplied_content_length {
return Ok(HandleAppendOutcome::InconsistentUploadLength {
return Ok(AppendToUploadOutcome::InconsistentUploadLength {
expected: total_size,
detail: "Upload-Complete is set to true, and Content-Length is set, \
but the value of Content-Length does not equal the length of the remaining content."
@ -231,7 +259,7 @@ async fn do_append(
}
} else {
if supplied_content_length >= remaining_content_length {
return Ok(HandleAppendOutcome::InconsistentUploadLength {
return Ok(AppendToUploadOutcome::InconsistentUploadLength {
expected: total_size,
detail: "Upload-Complete is set to false, and Content-Length is set, \
but the value of Content-Length is not smaller than the length of the remaining content."
@ -260,7 +288,6 @@ async fn do_append(
file.sync_all().await?;
let new_size = file.stream_position().await?;
let mut upload_state = parameters.upload.state().write().await;
upload_state.set_current_size(new_size);
let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome {
@ -272,25 +299,25 @@ async fn do_append(
if is_upload_complete {
upload_state.set_complete();
parameters.upload.save_to_database(&upload_state).await?;
upload_manager.queue_upload_for_processing(Arc::clone(&parameters.upload)).await;
parameters.upload.enqueue_for_processing(&upload_state).await;
} else {
parameters.upload.save_to_database(&upload_state).await?;
}
Ok(if let Some(outcome) = outcome {
match outcome {
StreamToFileOutcome::StoppedUnexpectedly => HandleAppendOutcome::ContentStreamStoppedUnexpectedly,
StreamToFileOutcome::TooMuchContent => HandleAppendOutcome::TooMuchContent,
StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly,
StreamToFileOutcome::TooMuchContent => AppendToUploadOutcome::TooMuchContent,
StreamToFileOutcome::Success => {
if is_upload_complete {
HandleAppendOutcome::UploadComplete
AppendToUploadOutcome::UploadComplete
} else {
HandleAppendOutcome::UploadIncomplete { offset: new_size }
AppendToUploadOutcome::UploadIncomplete { offset: new_size }
}
}
}
} else {
HandleAppendOutcome::RequestSuperseded
AppendToUploadOutcome::RequestSuperseded
})
}

View file

@ -1,14 +1,16 @@
use crate::processing_worker::do_processing_work;
use crate::util::acquirable::{Acquirable, Acquisition};
use crate::util::file_reference::FileReference;
pub(crate) use crate::util::file_reference::FileReference;
use crate::util::id::generate_id;
use camino::Utf8PathBuf;
use color_eyre::Result;
use color_eyre::{Report, Result};
use dashmap::DashMap;
use fstr::FStr;
use sqlx::SqlitePool;
use sqlx::{Row, SqlitePool};
use std::fmt::Debug;
use std::sync::Arc;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use strum::{Display, EnumString, IntoStaticStr};
use tokio::fs::{File, OpenOptions};
use tokio::sync::RwLock;
@ -26,49 +28,46 @@ pub struct UploadManager {
}
impl UploadManager {
pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf) -> Result<Self> {
pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf) -> Result<Arc<Self>> {
log::info!("Loading uploads…");
let mut complete_uploads = Vec::new();
let unfinished_uploads = sqlx::query!("SELECT id, current_size, total_size, is_complete FROM unfinished_uploads")
.map(|row| {
let staging_file_path = staging_directory_path.join(&row.id);
let id = UploadId::from_str_lossy(&row.id, b'_');
let is_complete = row.is_complete != 0;
let upload = Arc::new(UnfinishedUpload {
database: database.clone(),
id,
total_size: row.total_size as u64,
state: RwLock::new(UnfinishedUploadState {
current_size: row.current_size as u64,
is_complete,
}),
file: Acquirable::new(FileReference::new(staging_file_path)),
});
if is_complete {
complete_uploads.push(Arc::clone(&upload));
}
(id, upload)
})
.fetch_all(&database)
.await?;
let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel();
let (large_file_processing_tasks_sender, large_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel();
let manager = UploadManager {
let manager = Arc::new(UploadManager {
database: database.clone(),
staging_directory_path,
unfinished_uploads: DashMap::from_iter(unfinished_uploads.into_iter()),
staging_directory_path: staging_directory_path.clone(),
unfinished_uploads: DashMap::new(),
small_file_processing_tasks_sender,
large_file_processing_tasks_sender,
};
});
log::info!("Found {} unprocessed upload(s).", complete_uploads.len());
for upload in complete_uploads {
manager.queue_upload_for_processing(upload).await;
let unfinished_upload_rows = sqlx::query!("SELECT id, current_size, total_size, is_complete FROM unfinished_uploads")
.fetch_all(&database)
.await?;
for row in unfinished_upload_rows {
let staging_file_path = staging_directory_path.join(&row.id);
let id = UploadId::from_str_lossy(&row.id, b'_');
let is_complete = row.is_complete != 0;
let upload = Arc::new(UnfinishedUpload {
manager: Arc::downgrade(&manager),
id,
total_size: row.total_size as u64,
state: RwLock::new(UnfinishedUploadState {
current_size: row.current_size as u64,
is_complete,
}),
file: Acquirable::new(FileReference::new(staging_file_path)),
});
if is_complete {
let state = upload.state.read().await;
upload.enqueue_for_processing(&state).await;
}
manager.unfinished_uploads.insert(id, upload);
}
log::info!("Starting upload processing…");
@ -78,7 +77,7 @@ impl UploadManager {
Ok(manager)
}
pub async fn create_upload(&self, total_size: u64) -> Result<Arc<UnfinishedUpload>> {
pub async fn create_upload(self: &Arc<Self>, total_size: u64) -> Result<Arc<UnfinishedUpload>> {
let id: UploadId = generate_id();
{
@ -94,7 +93,7 @@ impl UploadManager {
}
let upload = Arc::new(UnfinishedUpload {
database: self.database.clone(),
manager: Arc::downgrade(&self),
id,
total_size,
state: RwLock::new(UnfinishedUploadState {
@ -109,31 +108,36 @@ impl UploadManager {
Ok(upload)
}
pub async fn queue_upload_for_processing(&self, upload: Arc<UnfinishedUpload>) {
{
let metadata = upload.state.read().await;
assert!(metadata.is_complete);
}
if upload.total_size <= LARGE_FILE_SIZE_THRESHOLD {
self.small_file_processing_tasks_sender.send(upload).unwrap()
pub async fn get_upload_by_id(&self, id: &str) -> Result<Option<AnyStageUpload>, Report> {
if let Some(unfinished_uploads) = self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) {
Ok(Some(AnyStageUpload::Unfinished(unfinished_uploads)))
} else {
self.large_file_processing_tasks_sender.send(upload).unwrap()
Ok(sqlx::query!(
"SELECT reason FROM (SELECT id, '' AS reason FROM finished_uploads UNION SELECT id, reason FROM failed_uploads) WHERE id = ?",
id
)
.map(|row| {
if row.reason.is_empty() {
AnyStageUpload::Finished
} else {
AnyStageUpload::Failed(UploadFailureReason::from_str(&row.reason).unwrap())
}
})
.fetch_optional(&self.database)
.await?)
}
}
}
pub fn get_upload_by_id(&self, id: &str) -> Option<Arc<UnfinishedUpload>> {
self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value()))
}
pub fn remove_failed_upload(&self, id: &UploadId) {
self.unfinished_uploads.remove(id);
}
pub enum AnyStageUpload {
Unfinished(Arc<UnfinishedUpload>),
Finished,
Failed(UploadFailureReason),
}
#[derive(Debug)]
pub struct UnfinishedUpload {
database: SqlitePool,
manager: Weak<UploadManager>,
id: UploadId,
total_size: u64,
state: RwLock<UnfinishedUploadState>,
@ -157,18 +161,47 @@ impl UnfinishedUpload {
self.file.acquire().await
}
pub async fn save_to_database(&self, state: &UnfinishedUploadState) -> Result<(), sqlx::Error> {
pub async fn enqueue_for_processing(self: &Arc<Self>, state: &UnfinishedUploadState) {
let manager = self.manager.upgrade().unwrap();
assert!(state.is_complete);
if self.total_size <= LARGE_FILE_SIZE_THRESHOLD {
manager.small_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap()
} else {
manager.large_file_processing_tasks_sender.send(Arc::clone(&self)).unwrap()
}
}
pub async fn save_to_database(&self, state: &UnfinishedUploadState) -> Result<(), Report> {
let id = self.id.to_string();
let current_size = state.current_size() as i64;
sqlx::query!("UPDATE unfinished_uploads SET current_size = ? WHERE id = ?", current_size, id)
.execute(&self.database)
.execute(&self.manager.upgrade().unwrap().database)
.await?;
Ok(())
}
pub async fn fail(&self, reason: UploadFailureReason) {}
pub async fn fail(&self, reason: UploadFailureReason) -> Result<(), Report> {
let manager = self.manager.upgrade().unwrap();
manager.unfinished_uploads.remove(&self.id);
let mut tx = manager.database.begin().await?;
let id = self.id.to_string();
let reason = reason.to_string();
sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await?;
sqlx::query!("INSERT INTO failed_uploads (id, reason) VALUES (?, ?)", id, reason)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
}
#[derive(Debug)]
@ -196,7 +229,8 @@ impl UnfinishedUploadState {
}
}
#[derive(Debug)]
#[derive(Debug, EnumString, Display)]
#[strum(serialize_all = "snake_case")]
pub enum UploadFailureReason {
CancelledByClient,
Expired,