From 486f30e4f38ee7f3633b3adb69ca1506832dd32d Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Fri, 18 Apr 2025 21:24:07 +0200 Subject: [PATCH] WIP: v1.0.0 --- run/config.toml | 3 +- src/config.rs | 33 ++- src/http_api/api_error.rs | 95 +++++++++ src/http_api/auth.rs | 4 +- src/http_api/mod.rs | 16 +- src/http_api/upload/append_to_upload.rs | 266 +++++++++--------------- src/http_api/upload/headers.rs | 26 +++ src/http_api/upload/mod.rs | 113 ++++++---- src/main.rs | 18 +- src/processing_worker.rs | 2 +- src/upload_manager.rs | 73 ++++--- src/util/id.rs | 2 + 12 files changed, 373 insertions(+), 278 deletions(-) create mode 100644 src/http_api/upload/headers.rs diff --git a/run/config.toml b/run/config.toml index 4af7f40..fde3161 100644 --- a/run/config.toml +++ b/run/config.toml @@ -4,8 +4,7 @@ api_secret = "Xt99Hp%wU%zf&vczQ%bJPbr2$owC#wuM#7fxEy%Uc%pp4Thdk7V$4kxMJFupvNKk" database_file = "./database.sqlite" staging_directory = "./data/staging" -[[buckets]] -id = "local" +[buckets.local] display_name = "Local" backend = "filesystem" path = "./data" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 8fb647f..cb03feb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,3 +1,4 @@ +use crate::util::id::BucketId; use camino::Utf8PathBuf; use color_eyre::Result; use color_eyre::eyre::WrapErr; @@ -7,7 +8,7 @@ use fstr::FStr; use once_cell::sync::Lazy; use regex::Regex; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::HashMap; use std::net::IpAddr; use validator::{Validate, ValidationError}; @@ -21,36 +22,28 @@ pub struct Config { #[serde(default)] pub enable_multithreaded_hashing: bool, #[validate(nested, custom(function = "validate_buckets"))] - pub buckets: Vec, + pub buckets: HashMap, } -fn validate_buckets(buckets: &Vec) -> Result<(), ValidationError> { - let mut ids = HashSet::new(); - - for bucket_config in buckets { - if !ids.insert(&bucket_config.id) { - return Err(ValidationError::new("duplicate_id").with_message(format!("There is more than one bucket with this ID: {}", bucket_config.id).into())); - }; - } - - Ok(()) -} - -static BUCKET_ID_PATTERN: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-z0-9_]+$").unwrap()); +static BUCKET_ID_PATTERN: Lazy = Lazy::new(|| Regex::new(r"^[a-zA-z0-9_]{1,32}$").unwrap()); #[derive(Debug, Serialize, Deserialize, Validate)] pub struct ConfigBucket { - #[validate(length(min = 1, max = 32), regex(path = *BUCKET_ID_PATTERN), custom(function = "validate_config_bucket_id"))] - pub id: String, #[validate(length(min = 1, max = 128))] pub display_name: String, #[serde(flatten)] pub backend: ConfigBucketBackend, } -fn validate_config_bucket_id(value: &str) -> Result<(), ValidationError> { - if value == "staging" { - return Err(ValidationError::new("reserved_bucket_id").with_message("Reserved bucket ID: staging".into())); +fn validate_buckets(value: &HashMap) -> Result<(), ValidationError> { + for bucket_id in value.keys() { + if bucket_id.as_ref() == "staging" { + return Err(ValidationError::new("reserved_bucket_id").with_message("Reserved bucket ID: staging".into())); + } + + if !BUCKET_ID_PATTERN.is_match(bucket_id) { + return Err(ValidationError::new("illegal_bucket_id").with_message("Bucket ID must match ^[a-zA-z0-9_]{1,32}$".into())); + } } Ok(()) diff --git a/src/http_api/api_error.rs b/src/http_api/api_error.rs index 1ae5412..69eb862 100644 --- a/src/http_api/api_error.rs +++ b/src/http_api/api_error.rs @@ -1,3 +1,5 @@ +use crate::http_api::upload::headers::UploadOffsetResponseHeader; +use crate::upload_manager::UploadFailureReason; use axum::http::{HeaderName, HeaderValue, StatusCode, header}; use axum::response::{IntoResponse, Response}; use color_eyre::Report; @@ -36,6 +38,17 @@ pub enum ApiError { InvalidRequestHeader { name: HeaderName, message: Cow<'static, str> }, InvalidRequestContent { path: Cow<'static, str>, message: Cow<'static, str> }, UnknownResource { resource_type: Cow<'static, str>, id: Cow<'static, str> }, + RequestBodyTooLong, + RequestBodyTooShort, + + IanaUploadAlreadyComplete, + + CaosUploadRequestSuperseded, + CaosUploadFailed { reason: UploadFailureReason }, + CaosUploadOffsetMismatch { expected: u64, provided: u64 }, + CaosInconsistentUploadLength { expected: u64, detail: Cow<'static, str> }, + CaosUploadNotFinished, + CaosUnknownBucket { bucket_id: Cow<'static, str> }, } impl From for ApiError { @@ -88,6 +101,88 @@ impl IntoResponse for ApiError { })), ) .into_response(), + ApiError::RequestBodyTooLong => ( + StatusCode::BAD_REQUEST, + ProblemJson(json!({ + "type": "https://minna.media/api-problems/general/request-body-too-long", + "title": "The received request body is longer than expected.", + })), + ) + .into_response(), + ApiError::RequestBodyTooShort => ( + StatusCode::BAD_REQUEST, + ProblemJson(json!({ + "type": "https://minna.media/api-problems/general/request-body-too-short", + "title": "The received request body is shorter than expected.", + })), + ) + .into_response(), + ApiError::IanaUploadAlreadyComplete => ( + StatusCode::CONFLICT, + // According to https://www.ietf.org/archive/id/draft-ietf-httpbis-resumable-upload-08.html#section-4.4.2-7, + // if the request did not (itself) complete the upload, the response must contain the `Upload-Complete: ?0` header. + // When the request is *already* complete, the request cannot ever possibly complete the upload. + ProblemJson(json!({ + "type": "https://iana.org/assignments/http-problem-types#completed-upload", + "title": "The upload is already complete.", + })), + ) + .into_response(), + ApiError::CaosUploadRequestSuperseded => ( + StatusCode::CONFLICT, + ProblemJson(json!({ + "type": "https://minna.media/api-problems/caos/request-superseded", + "title": "Another request superseded the current request.", + })), + ) + .into_response(), + ApiError::CaosUploadFailed { reason } => ( + StatusCode::GONE, + ProblemJson(json!({ + "type": "https://minna.media/api-problems/caos/request-superseded", + "title": "The upload was cancelled or failed.", + "reason": reason.to_string() + })), + ) + .into_response(), + ApiError::CaosUploadOffsetMismatch { expected, provided } => ( + StatusCode::CONFLICT, + UploadOffsetResponseHeader(expected), + ProblemJson(json!({ + "type": "https://iana.org/assignments/http-problem-types#mismatching-upload-offset", + "title": "The upload offset provided in the request does not match the actual offset of the resource.", + "expected-offset": expected, + "provided-offset": provided, + })), + ) + .into_response(), + ApiError::CaosInconsistentUploadLength { expected, detail } => ( + StatusCode::CONFLICT, + UploadOffsetResponseHeader(expected), + ProblemJson(json!({ + "type": "https://iana.org/assignments/http-problem-types#inconsistent-upload-length", + "title": "The provided upload lengths are inconsistent with one another or a previously established total length.", + "detail": detail, + })), + ) + .into_response(), + ApiError::CaosUploadNotFinished => ( + StatusCode::CONFLICT, + ProblemJson(json!({ + "type": "https://minna.media/api-problems/caos/upload-not-finished", + "title": "The upload is not finished yet." + })), + ) + .into_response(), + ApiError::CaosUnknownBucket { bucket_id } => ( + StatusCode::CONFLICT, + ProblemJson(json!({ + "type": "https://minna.media/api-problems/caos/unknown-bucket", + "title": "There is no bucket with the specified ID.", + "bucketId": bucket_id + })), + ) + .into_response(), } } } diff --git a/src/http_api/auth.rs b/src/http_api/auth.rs index 9354f1d..4513513 100644 --- a/src/http_api/auth.rs +++ b/src/http_api/auth.rs @@ -9,7 +9,7 @@ pub struct AppAuthorization; impl FromRequestParts for AppAuthorization { type Rejection = ApiError; - async fn from_request_parts(parts: &mut Parts, state: &Context) -> Result { + async fn from_request_parts(parts: &mut Parts, context: &Context) -> Result { let provided_secret = parts .headers .get_at_most_once(&axum::http::header::AUTHORIZATION)? @@ -19,7 +19,7 @@ impl FromRequestParts for AppAuthorization { .flatten() .take_if(|v| v.len() == 64); - let correct_secret = state.api_secret; + let correct_secret = context.config.api_secret; if let Some(provided_secret) = provided_secret { if constant_time_eq::constant_time_eq(provided_secret.as_bytes(), correct_secret.as_bytes()) { return Ok(AppAuthorization); diff --git a/src/http_api/mod.rs b/src/http_api/mod.rs index ea969ab..9cbe19d 100644 --- a/src/http_api/mod.rs +++ b/src/http_api/mod.rs @@ -3,26 +3,30 @@ mod auth; mod headers; mod upload; +use crate::config::Config; use crate::http_api::upload::create_uploads_router; use crate::upload_manager::UploadManager; use axum::Router; use color_eyre::Result; -use fstr::FStr; +use sqlx::SqlitePool; use std::net::IpAddr; use std::sync::Arc; #[derive(Debug)] struct ContextInner { pub upload_manager: Arc, - pub api_secret: FStr<64>, + pub database: SqlitePool, + pub config: Arc, } type Context = Arc; -pub async fn start_http_api_server(upload_manager: Arc, address: IpAddr, port: u16, api_secret: FStr<64>) -> Result<()> { - let router = Router::new() - .nest("/uploads", create_uploads_router()) - .with_state(Arc::new(ContextInner { upload_manager, api_secret })); +pub async fn start_http_api_server(upload_manager: Arc, database: SqlitePool, config: Arc, address: IpAddr, port: u16) -> Result<()> { + let router = Router::new().nest("/uploads", create_uploads_router()).with_state(Arc::new(ContextInner { + upload_manager, + database, + config, + })); let listener = tokio::net::TcpListener::bind((address, port)).await?; axum::serve(listener, router).await?; diff --git a/src/http_api/upload/append_to_upload.rs b/src/http_api/upload/append_to_upload.rs index 97efbfb..9f73d59 100644 --- a/src/http_api/upload/append_to_upload.rs +++ b/src/http_api/upload/append_to_upload.rs @@ -1,8 +1,9 @@ use crate::http_api::Context; -use crate::http_api::api_error::{ApiError, ProblemJson}; +use crate::http_api::api_error::ApiError; use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers}; -use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader}; -use crate::upload_manager::{AcquiredUnfinishedUpload, AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager}; +use crate::http_api::upload::PARTIAL_UPLOAD_MEDIA_TYPE; +use crate::http_api::upload::headers::{UploadCompleteResponseHeader, UploadOffsetResponseHeader}; +use crate::upload_manager::{AcquiredUnfinishedUpload, UploadFailureReason, UploadId}; use crate::util::acquirable::Acquisition; use axum::body::{Body, BodyDataStream}; use axum::extract::{Path, State}; @@ -11,10 +12,7 @@ use axum::response::{IntoResponse, Response}; use color_eyre::Report; use futures::TryStreamExt; use serde::Deserialize; -use serde_json::json; -use std::borrow::Cow; use std::io::ErrorKind; -use std::sync::Arc; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio_util::io::StreamReader; @@ -25,96 +23,54 @@ pub(super) struct AppendToUploadPathParameters { } #[derive(Debug)] -enum AppendToUploadOutcome { - RequestSuperseded, - UploadAlreadyComplete, - Failed(UploadFailureReason), - UploadOffsetMismatch { expected: u64, provided: u64 }, - InconsistentUploadLength { expected: u64, detail: Cow<'static, str> }, - ContentStreamStoppedUnexpectedly, - TooMuchContent, +pub(super) enum AppendToUploadResponse { UploadIncomplete { offset: u64 }, + UploadFailed { reason: UploadFailureReason }, UploadComplete, } -impl IntoResponse for AppendToUploadOutcome { +impl IntoResponse for AppendToUploadResponse { fn into_response(self) -> Response { match self { - AppendToUploadOutcome::RequestSuperseded => ( - StatusCode::CONFLICT, - UploadCompleteResponseHeader(false), - ProblemJson(json!({ - "type": "https://minna.media/api-problems/caos/request-superseded", - "title": "Another request superseded the current request.", - })), - ) - .into_response(), - AppendToUploadOutcome::UploadAlreadyComplete => ( - StatusCode::CONFLICT, - ProblemJson(json!({ - "type": "https://iana.org/assignments/http-problem-types#completed-upload", - "title": "The upload is already complete.", - })), - ) - .into_response(), - AppendToUploadOutcome::Failed(reason) => ( - StatusCode::GONE, - ProblemJson(json!({ - "type": "https://minna.media/api-problems/caos/request-superseded", - "title": "The upload was cancelled or failed.", - "reason": reason.to_string() - })), - ) - .into_response(), - AppendToUploadOutcome::UploadOffsetMismatch { expected, provided } => ( - StatusCode::CONFLICT, - UploadCompleteResponseHeader(false), - UploadOffsetResponseHeader(expected), - ProblemJson(json!({ - "type": "https://iana.org/assignments/http-problem-types#mismatching-upload-offset", - "title": "The upload offset provided in the request does not match the actual offset of the resource.", - "expected-offset": expected, - "provided-offset": provided, - })), - ) - .into_response(), - AppendToUploadOutcome::InconsistentUploadLength { expected, detail } => ( - StatusCode::CONFLICT, - UploadCompleteResponseHeader(false), - UploadOffsetResponseHeader(expected), - ProblemJson(json!({ - "type": "https://iana.org/assignments/http-problem-types#inconsistent-upload-length", - "title": "The provided upload lengths are inconsistent with one another or a previously established total length.", - "detail": detail, - })), - ) - .into_response(), - AppendToUploadOutcome::ContentStreamStoppedUnexpectedly => ( - StatusCode::BAD_REQUEST, - UploadCompleteResponseHeader(false), - ProblemJson(json!({ - "type": "https://minna.media/api-problems/caos/content-stream-stopped-unexpectedly", - "title": "The content stream stopped unexpectedly.", - })), - ) - .into_response(), - AppendToUploadOutcome::TooMuchContent => ( - StatusCode::BAD_REQUEST, - UploadCompleteResponseHeader(false), - ProblemJson(json!({ - "type": "https://minna.media/api-problems/caos/too-much-content", - "title": "The request contained more content than it should.", - })), - ) - .into_response(), - AppendToUploadOutcome::UploadIncomplete { offset } => ( + AppendToUploadResponse::UploadIncomplete { offset } => ( StatusCode::NO_CONTENT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(offset), Body::empty(), ) .into_response(), - AppendToUploadOutcome::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(), + AppendToUploadResponse::UploadFailed { reason } => (UploadCompleteResponseHeader(false), ApiError::CaosUploadFailed { reason }).into_response(), + AppendToUploadResponse::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(), + } + } +} + +pub(super) struct AppendToUploadError { + inner: ApiError, +} + +impl IntoResponse for AppendToUploadError { + fn into_response(self) -> Response { + (UploadCompleteResponseHeader(false), self.inner).into_response() + } +} + +impl From for AppendToUploadError { + fn from(value: ApiError) -> Self { + AppendToUploadError { inner: value } + } +} + +impl From for AppendToUploadError { + fn from(value: Report) -> Self { + AppendToUploadError { inner: value.into() } + } +} + +impl From for AppendToUploadError { + fn from(error: std::io::Error) -> Self { + AppendToUploadError { + inner: Report::new(error).into(), } } } @@ -124,93 +80,65 @@ pub(super) async fn append_to_upload( Path(AppendToUploadPathParameters { upload_id }): Path, headers: HeaderMap, request_body: Body, -) -> Result { - let parameters = match parse_request_parameters(&context.upload_manager, upload_id, &headers) - .await - .map_err(|e| (UploadCompleteResponseHeader(false), e))? - { - Ok(p) => p, - Err(o) => return Ok(o), +) -> Result { + let mut tx = context.database.begin().await.map_err(Into::::into)?; + + let upload = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await { + upload + } else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? { + return Ok(AppendToUploadResponse::UploadFailed { reason }.into()); + } else if let Some(_) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? { + return Err(ApiError::IanaUploadAlreadyComplete.into()); + } else { + return Err(ApiError::UnknownResource { + resource_type: "upload".into(), + id: upload_id.to_string().into(), + } + .into()); }; - let mut upload_acquisition = if let Some(a) = parameters.upload.acquire().await { + let parameters = parse_request_parameters(&headers).await?; + + let mut upload_acquisition = if let Some(a) = upload.acquire().await { a } else { - return Ok(AppendToUploadOutcome::RequestSuperseded); + return Err(ApiError::CaosUploadRequestSuperseded.into()); }; if let Some(supplied_upload_length) = parameters.supplied_upload_length { let expected = upload_acquisition.inner().upload().total_size(); if supplied_upload_length != expected { - return Ok(AppendToUploadOutcome::InconsistentUploadLength { + return Err(ApiError::CaosInconsistentUploadLength { expected, detail: format!("Upload-Length is set to {supplied_upload_length}, but the actual length is {expected}.").into(), - }); + } + .into()); } } - if parameters.upload.is_complete() { - return Ok(AppendToUploadOutcome::UploadAlreadyComplete); + if upload.is_complete() { + return Err(ApiError::IanaUploadAlreadyComplete.into()); } - let outcome = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream()) - .await - .map_err(|report| { - ( - UploadCompleteResponseHeader(false), - ApiError::Internal { - report: report.wrap_err(format!("Error during file upload ({upload_id})")), - }, - ) - })?; + let response = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream()).await?; - match &outcome { - AppendToUploadOutcome::UploadComplete => { - upload_acquisition - .complete() - .await - .map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?; - } - AppendToUploadOutcome::Failed(reason) => { - upload_acquisition - .fail(*reason) - .await - .map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?; - } - _ => { - upload_acquisition.release().await; - } + match &response { + AppendToUploadResponse::UploadIncomplete { .. } => upload_acquisition.release().await, + AppendToUploadResponse::UploadComplete => upload_acquisition.complete().await?, + AppendToUploadResponse::UploadFailed { reason } => upload_acquisition.fail(*reason).await?, } - Ok(outcome) + Ok(response) } struct RequestParameters { - pub upload: Arc, pub supplied_content_length: Option, pub supplied_upload_length: Option, pub supplied_upload_offset: u64, pub supplied_upload_complete: bool, } -async fn parse_request_parameters( - upload_manager: &UploadManager, - upload_id: UploadId, - headers: &HeaderMap, -) -> Result, ApiError> { - let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id).await? { - match upload { - AnyStageUpload::Unfinished(u) => u, - AnyStageUpload::Finished { .. } => return Ok(Err(AppendToUploadOutcome::UploadAlreadyComplete)), - AnyStageUpload::Failed(reason) => return Ok(Err(AppendToUploadOutcome::Failed(reason))), - } - } else { - return Err(ApiError::UnknownResource { - resource_type: "upload".into(), - id: upload_id.to_string().into(), - }); - }; - +async fn parse_request_parameters(headers: &HeaderMap) -> Result { if !headers .get_exactly_once(&axum::http::header::CONTENT_TYPE)? .to_str() @@ -242,63 +170,65 @@ async fn parse_request_parameters( .get_exactly_once(&upload_headers::UPLOAD_COMPLETE)? .get_boolean(&upload_headers::UPLOAD_COMPLETE)?; - Ok(Ok(RequestParameters { - upload, + Ok(RequestParameters { supplied_content_length, supplied_upload_length, supplied_upload_offset, supplied_upload_complete, - })) + }) } async fn do_append( upload_acquisition: &mut Acquisition, parameters: RequestParameters, content_stream: BodyDataStream, -) -> Result { +) -> Result { let release_request_token = upload_acquisition.release_request_token(); let acquired_upload = upload_acquisition.inner(); + let upload = acquired_upload.upload(); let mut file = acquired_upload.file().get_or_open(true).await?; - let total_size = parameters.upload.total_size(); + let total_size = upload.total_size(); let current_offset = file.stream_position().await?; - if current_offset < parameters.upload.current_size() { - log::error!( - "The upload ({}) failed because the file contains less data than expected.", - parameters.upload.id() - ); - - return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData)); + if current_offset < upload.current_size() { + log::error!("The upload ({}) failed because the file contains less data than expected.", upload.id()); + return Ok(AppendToUploadResponse::UploadFailed { + reason: UploadFailureReason::MissingData, + } + .into()); } let remaining_content_length = total_size - current_offset; if parameters.supplied_upload_offset != current_offset { - return Ok(AppendToUploadOutcome::UploadOffsetMismatch { + return Err(ApiError::CaosUploadOffsetMismatch { expected: current_offset, provided: parameters.supplied_upload_offset, - }); + } + .into()); } let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length { if parameters.supplied_upload_complete { if remaining_content_length != supplied_content_length { - return Ok(AppendToUploadOutcome::InconsistentUploadLength { + return Err(ApiError::CaosInconsistentUploadLength { expected: total_size, detail: "Upload-Complete is set to true, and Content-Length is set, \ but the value of Content-Length does not equal the length of the remaining content." .into(), - }); + } + .into()); } } else { if supplied_content_length >= remaining_content_length { - return Ok(AppendToUploadOutcome::InconsistentUploadLength { + return Err(ApiError::CaosInconsistentUploadLength { expected: total_size, detail: "Upload-Complete is set to false, and Content-Length is set, \ but the value of Content-Length is not smaller than the length of the remaining content." .into(), - }); + } + .into()); } } @@ -330,21 +260,21 @@ async fn do_append( false }; - Ok(if let Some(outcome) = outcome { + if let Some(outcome) = outcome { match outcome { - StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly, - StreamToFileOutcome::TooMuchContent => AppendToUploadOutcome::TooMuchContent, + StreamToFileOutcome::StoppedUnexpectedly => Err(ApiError::RequestBodyTooShort.into()), + StreamToFileOutcome::TooMuchContent => Err(ApiError::RequestBodyTooLong.into()), StreamToFileOutcome::Success => { if is_upload_complete { - AppendToUploadOutcome::UploadComplete + Ok(AppendToUploadResponse::UploadComplete) } else { - AppendToUploadOutcome::UploadIncomplete { offset: new_size } + Ok(AppendToUploadResponse::UploadIncomplete { offset: new_size }) } } } } else { - AppendToUploadOutcome::RequestSuperseded - }) + Err(ApiError::CaosUploadRequestSuperseded.into()) + } } #[derive(Debug)] diff --git a/src/http_api/upload/headers.rs b/src/http_api/upload/headers.rs new file mode 100644 index 0000000..accf4c9 --- /dev/null +++ b/src/http_api/upload/headers.rs @@ -0,0 +1,26 @@ +use crate::http_api::headers::upload_headers; +use axum::http::HeaderValue; +use axum::response::{IntoResponseParts, ResponseParts}; + +pub struct UploadCompleteResponseHeader(pub bool); + +impl IntoResponseParts for UploadCompleteResponseHeader { + type Error = (); + + fn into_response_parts(self, mut res: ResponseParts) -> Result { + res.headers_mut() + .insert(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static(if self.0 { "?1" } else { "?0" })); + Ok(res) + } +} + +pub struct UploadOffsetResponseHeader(pub u64); + +impl IntoResponseParts for UploadOffsetResponseHeader { + type Error = (); + + fn into_response_parts(self, mut res: ResponseParts) -> Result { + res.headers_mut().insert(upload_headers::UPLOAD_OFFSET, self.0.into()); + Ok(res) + } +} diff --git a/src/http_api/upload/mod.rs b/src/http_api/upload/mod.rs index d72d01b..08e6f93 100644 --- a/src/http_api/upload/mod.rs +++ b/src/http_api/upload/mod.rs @@ -3,40 +3,20 @@ use crate::http_api::api_error::ApiError; use crate::http_api::auth::AppAuthorization; use crate::http_api::headers::{CACHE_CONTROL_CACHE_FOREVER, CACHE_CONTROL_NEVER_CACHE, upload_headers}; use crate::http_api::upload::append_to_upload::append_to_upload; -use crate::upload_manager::{AnyStageUpload, UploadFailureReason, UploadId}; +use crate::upload_manager::{UploadFailureReason, UploadId}; +use crate::util::id::BucketId; use axum::extract::{Path, State}; use axum::http::HeaderValue; -use axum::response::{IntoResponse, IntoResponseParts, Response, ResponseParts}; +use axum::response::{IntoResponse, Response}; use axum::{Json, Router, routing}; +use color_eyre::Report; use serde::{Deserialize, Serialize}; pub mod append_to_upload; +pub mod headers; const PARTIAL_UPLOAD_MEDIA_TYPE: &'static str = "application/partial-upload"; -struct UploadCompleteResponseHeader(bool); - -impl IntoResponseParts for UploadCompleteResponseHeader { - type Error = (); - - fn into_response_parts(self, mut res: ResponseParts) -> Result { - res.headers_mut() - .insert(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static(if self.0 { "?1" } else { "?0" })); - Ok(res) - } -} - -struct UploadOffsetResponseHeader(u64); - -impl IntoResponseParts for UploadOffsetResponseHeader { - type Error = (); - - fn into_response_parts(self, mut res: ResponseParts) -> Result { - res.headers_mut().insert(upload_headers::UPLOAD_OFFSET, self.0.into()); - Ok(res) - } -} - #[derive(Debug, Deserialize)] struct CreateUploadPayload { size: u64, @@ -51,6 +31,7 @@ pub fn create_uploads_router() -> Router { Router::new() .route("/", routing::post(create_upload)) .route("/{upload_id}", routing::get(get_upload_metadata).post(append_to_upload)) + .route("/{upload_id}/accept", routing::post(accept_upload)) } async fn create_upload( @@ -137,30 +118,78 @@ async fn get_upload_metadata( State(context): State, Path(GetUploadMetadataPathParameters { upload_id }): Path, ) -> Result { - let upload = if let Some(upload) = context.upload_manager.get_upload_by_id(&upload_id).await? { - upload + let mut tx = context.database.begin().await.map_err(Into::::into)?; + let state: GetUploadMetadataResponseState = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await { + if upload.is_complete() { + GetUploadMetadataResponseState::Complete { size: upload.total_size() } + } else { + GetUploadMetadataResponseState::Ongoing { + current_size: upload.current_size(), + total_size: upload.total_size(), + } + } + } else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? { + GetUploadMetadataResponseState::Failed { reason } + } else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? { + GetUploadMetadataResponseState::Finished { + size: metadata.size, + hash: metadata.hash, + } } else { return Err(ApiError::UnknownResource { resource_type: "upload".into(), id: upload_id.to_string().into(), - }); + } + .into()); }; Ok(GetUploadMetadataResponse { id: upload_id.to_string().into_boxed_str(), - state: match upload { - AnyStageUpload::Unfinished(upload) => { - if upload.is_complete() { - GetUploadMetadataResponseState::Complete { size: upload.total_size() } - } else { - GetUploadMetadataResponseState::Ongoing { - current_size: upload.current_size(), - total_size: upload.total_size(), - } - } - } - AnyStageUpload::Finished { size, hash } => GetUploadMetadataResponseState::Finished { size, hash }, - AnyStageUpload::Failed(reason) => GetUploadMetadataResponseState::Failed { reason }, - }, + state, }) } + +#[derive(Debug, Deserialize)] +struct AcceptUploadPathParameters { + upload_id: UploadId, +} + +#[derive(Debug, Deserialize)] +struct AcceptUploadRequestBody { + bucket_ids: Vec, +} + +async fn accept_upload( + State(context): State, + Path(AcceptUploadPathParameters { upload_id }): Path, + Json(body): Json, +) -> Result { + let mut tx = context.database.begin().await.map_err(Into::::into)?; + let _hash = if let Some(_) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await { + return Err(ApiError::CaosUploadNotFinished); + } else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? { + return Err(ApiError::CaosUploadFailed { reason }); + } else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? { + metadata.hash + } else { + return Err(ApiError::UnknownResource { + resource_type: "upload".into(), + id: upload_id.to_string().into(), + } + .into()); + }; + + for bucket_id in &body.bucket_ids { + if !context.config.buckets.contains_key(bucket_id) { + return Err(ApiError::CaosUnknownBucket { + bucket_id: bucket_id.to_string().into(), + } + .into()); + } + } + + context.upload_manager.accept_finished_upload(&mut tx, upload_id, body.bucket_ids).await?; + tx.commit().await.map_err(Into::::into)?; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 3f5f6e0..b7342da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,11 +9,13 @@ mod util; use crate::config::{ConfigBucket, ConfigBucketBackend, load_config}; use crate::http_api::start_http_api_server; use crate::upload_manager::UploadManager; +use crate::util::id::BucketId; use camino::Utf8Path; use color_eyre::Result; use color_eyre::eyre::{WrapErr, eyre}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::io::ErrorKind; +use std::sync::Arc; use tokio::fs; #[tokio::main] @@ -21,7 +23,7 @@ async fn main() -> Result<()> { color_eyre::install().unwrap(); env_logger::init(); - let config = load_config()?; + let config = Arc::new(load_config()?); log::debug!("Loaded configuration: {:#?}", config); @@ -32,11 +34,11 @@ async fn main() -> Result<()> { .await .wrap_err("Failed to open the database connection.")?; - let upload_manager = UploadManager::create(database.clone(), config.staging_directory, config.enable_multithreaded_hashing).await?; + let upload_manager = UploadManager::create(database.clone(), &config.staging_directory, config.enable_multithreaded_hashing).await?; log::info!("Initialization successful."); - start_http_api_server(upload_manager, config.http_address, config.http_port, config.api_secret).await?; + start_http_api_server(upload_manager, database, Arc::clone(&config), config.http_address, config.http_port).await?; Ok(()) } @@ -53,11 +55,11 @@ async fn initialize_staging_directory(path: &Utf8Path) -> Result<()> { Ok(()) } -async fn initialize_buckets(bucket_configs: &Vec) -> Result<()> { +async fn initialize_buckets(bucket_configs: &HashMap) -> Result<()> { let mut filesystem_backend_paths = HashSet::new(); - for bucket_config in bucket_configs { - log::info!("Initializing bucket: {}", bucket_config.id); + for (bucket_id, bucket_config) in bucket_configs { + log::info!("Initializing bucket: {}", bucket_id); match &bucket_config.backend { ConfigBucketBackend::Filesystem(filesystem_backend_config) => { @@ -81,7 +83,7 @@ async fn initialize_buckets(bucket_configs: &Vec) -> Result<()> { check_directory_writable(&path) .await - .wrap_err_with(|| format!("The writable check for the {} bucket failed.", &bucket_config.id))?; + .wrap_err_with(|| format!("The writable check for the {} bucket failed.", bucket_id))?; filesystem_backend_paths.insert(path); } diff --git a/src/processing_worker.rs b/src/processing_worker.rs index 05ccbc8..5476435 100644 --- a/src/processing_worker.rs +++ b/src/processing_worker.rs @@ -1,4 +1,4 @@ -use crate::upload_manager::{AcquiredUnfinishedUpload, UnfinishedUpload}; +use crate::upload_manager::AcquiredUnfinishedUpload; use crate::util::hash_to_hex_string::HashExt; use crate::util::temporal_formatting::TemporalFormatting; use blake3::Hasher; diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 0faf9ac..e484f40 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -1,19 +1,17 @@ use crate::processing_worker::do_processing_work; use crate::util::acquirable::{Acquirable, Acquisition}; pub(crate) use crate::util::file_reference::FileReference; -use crate::util::id::generate_id; +use crate::util::id::{BucketId, generate_id}; use camino::Utf8PathBuf; use color_eyre::{Report, Result}; use dashmap::DashMap; use fstr::FStr; use serde::Serialize; -use sqlx::SqlitePool; +use sqlx::{SqlitePool, SqliteTransaction}; use std::fmt::Debug; -use std::str::FromStr; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use strum::{Display, EnumString}; -use tokio::sync::RwLock; const LARGE_FILE_SIZE_THRESHOLD: u64 = 1024; pub const UPLOAD_ID_LENGTH: usize = 16; @@ -29,7 +27,7 @@ pub struct UploadManager { } impl UploadManager { - pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result> { + pub async fn create(database: SqlitePool, staging_directory_path: &Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result> { log::info!("Loading uploads…"); let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -65,7 +63,7 @@ impl UploadManager { }); if is_complete { - upload.acquire().await.unwrap().complete().await; + upload.acquire().await.unwrap().complete().await?; } manager.unfinished_uploads.insert(id, upload); @@ -114,37 +112,52 @@ impl UploadManager { Ok(upload) } - pub async fn get_upload_by_id(&self, id: &str) -> Result, Report> { - if let Some(upload) = self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) { - Ok(Some(AnyStageUpload::Unfinished(upload))) - } else if let Some(upload) = sqlx::query!( - "SELECT id, finished_uploads.hash, objects.size FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?", + pub async fn get_unfinished_upload_by_id(&self, id: &UploadId) -> Option> { + self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) + } + + pub async fn get_upload_failure_reason_by_id(&self, tx: &mut SqliteTransaction<'_>, id: &UploadId) -> Result> { + let id = id.to_string(); + Ok(sqlx::query!("SELECT reason FROM failed_uploads WHERE id = ?", id) + .fetch_optional(&mut **tx) + .await? + .map(|r| r.reason.parse().unwrap())) + } + + pub async fn get_finished_upload_metadata_by_id(&self, tx: &mut SqliteTransaction<'_>, id: &UploadId) -> Result> { + let id = id.to_string(); + Ok(sqlx::query!( + "SELECT objects.hash, objects.size FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?", id ) - .map(|row| AnyStageUpload::Finished { - size: row.size as u64, - hash: row.hash.into_boxed_str(), - }) - .fetch_optional(&self.database) + .fetch_optional(&mut **tx) .await? - { - Ok(Some(upload)) - } else if let Some(upload) = sqlx::query!("SELECT reason FROM failed_uploads WHERE id = ?", id) - .map(|row| AnyStageUpload::Failed(UploadFailureReason::from_str(&row.reason).unwrap())) - .fetch_optional(&self.database) + .map(|r| FinishedUploadMetadata { + size: r.size as u64, + hash: r.hash.into_boxed_str(), + })) + } + + pub async fn accept_finished_upload(&self, tx: &mut SqliteTransaction<'_>, id: UploadId, bucket_ids: Vec) -> Result> { + let id = id.to_string(); + let hash = sqlx::query!("DELETE FROM finished_uploads WHERE id = ? RETURNING hash", id) + .fetch_one(&mut **tx) .await? - { - Ok(Some(upload)) - } else { - Ok(None) + .hash; + + for bucket_id in bucket_ids { + sqlx::query!("INSERT INTO object_replicas (hash, bucket_id, is_present) VALUES (?, ?, 0)", hash, bucket_id) + .execute(&mut **tx) + .await?; } + + Ok(Some(())) } } -pub enum AnyStageUpload { - Unfinished(Arc), - Finished { size: u64, hash: Box }, - Failed(UploadFailureReason), +pub struct FinishedUploadMetadata { + pub hash: Box, + pub size: u64, } #[derive(Debug)] @@ -197,7 +210,9 @@ impl AcquiredUnfinishedUpload { pub async fn set_current_size(&self, current_size: u64) -> Result<(), Report> { let upload = self.upload.upgrade().unwrap(); let manager = upload.manager.upgrade().unwrap(); + upload.current_size.store(current_size, Ordering::Relaxed); + let id = upload.id.to_string(); let current_size = current_size as i64; diff --git a/src/util/id.rs b/src/util/id.rs index 3e62921..2e971a7 100644 --- a/src/util/id.rs +++ b/src/util/id.rs @@ -6,3 +6,5 @@ pub fn generate_id() -> FStr { let bytes: [u8; N] = std::array::from_fn(|_| rand::rng().sample(&Alphanumeric)); unsafe { FStr::from_inner_unchecked(bytes) } } + +pub type BucketId = Box;