From 9718d0a5b4c7ef6880b3afec38d3190efeef6eaa Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Thu, 17 Apr 2025 00:53:56 +0200 Subject: [PATCH] WIP: v1.0.0 --- README.md | 2 +- src/http_api/api_error.rs | 35 ++++++-- src/http_api/headers.rs | 37 ++++---- src/http_api/upload/append_to_upload.rs | 27 +++--- src/http_api/upload/mod.rs | 111 ++++++++++++++++++++++-- src/upload_manager.rs | 42 +++++---- 6 files changed, 197 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index de0f5f6..3653c7c 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ - set target buckets - delete -- outgoing webhooks +- deletion tombstones ## Upload steps diff --git a/src/http_api/api_error.rs b/src/http_api/api_error.rs index 50463bf..b4557cc 100644 --- a/src/http_api/api_error.rs +++ b/src/http_api/api_error.rs @@ -1,15 +1,40 @@ use axum::Json; -use axum::http::{HeaderName, StatusCode}; +use axum::http::{HeaderName, HeaderValue, StatusCode, header}; use axum::response::{IntoResponse, Response}; use color_eyre::Report; +use serde::Serialize; use serde_json::json; use std::borrow::Cow; +use tokio_util::bytes::{BufMut, BytesMut}; + +pub struct ProblemJson(pub T); + +impl IntoResponse for ProblemJson { + fn into_response(self) -> Response { + // Same as IntoResponse::into_response for Json, but the content type header is changed. + + let mut buf = BytesMut::with_capacity(128).writer(); + match serde_json::to_writer(&mut buf, &self.0) { + Ok(()) => ( + [(header::CONTENT_TYPE, HeaderValue::from_static("application/problem+json"))], + buf.into_inner().freeze(), + ) + .into_response(), + Err(err) => ( + StatusCode::INTERNAL_SERVER_ERROR, + [(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"))], + err.to_string(), + ) + .into_response(), + } + } +} #[derive(Debug)] pub enum ApiError { Internal { report: Report }, Forbidden, - InvalidRequestHeader { name: &'static HeaderName, message: Cow<'static, str> }, + InvalidRequestHeader { name: HeaderName, message: Cow<'static, str> }, InvalidRequestContent { path: Cow<'static, str>, message: Cow<'static, str> }, UnknownResource { resource_type: Cow<'static, str>, id: Cow<'static, str> }, } @@ -36,7 +61,7 @@ impl IntoResponse for ApiError { ApiError::Forbidden => StatusCode::FORBIDDEN.into_response(), ApiError::InvalidRequestHeader { name, message } => ( StatusCode::UNPROCESSABLE_ENTITY, - Json(json!({ + ProblemJson(json!({ "type": "https://minna.media/api-problems/general/invalid-request-header", "title": "A specific request header value is invalid.", "detail": format!("The value of `{}` is invalid: {}", name.as_str(), message) @@ -45,7 +70,7 @@ impl IntoResponse for ApiError { .into_response(), ApiError::InvalidRequestContent { path, message } => ( StatusCode::UNPROCESSABLE_ENTITY, - Json(json!({ + ProblemJson(json!({ "type": "https://minna.media/api-problems/general/invalid-request-content", "title": "The request content is semantically invalid.", "detail": format!("`{path}`: {message}"), @@ -55,7 +80,7 @@ impl IntoResponse for ApiError { .into_response(), ApiError::UnknownResource { resource_type, id } => ( StatusCode::NOT_FOUND, - Json(json!({ + ProblemJson(json!({ "type": "https://minna.media/api-problems/general/unknown-resource", "title": "The requested resource is unknown.", "detail": format!("There is no {resource_type} resource with this ID: {id}"), diff --git a/src/http_api/headers.rs b/src/http_api/headers.rs index 914ef55..729841c 100644 --- a/src/http_api/headers.rs +++ b/src/http_api/headers.rs @@ -1,22 +1,22 @@ use crate::http_api::api_error::ApiError; -use axum::http::{HeaderMap, HeaderName, HeaderValue}; +use axum::http::{HeaderMap, HeaderName, HeaderValue, header}; +use axum::response::{IntoResponse, IntoResponseParts}; pub mod upload_headers { use axum::http::HeaderName; - use once_cell::sync::Lazy; - use std::str::FromStr; - pub static UPLOAD_OFFSET: Lazy = Lazy::new(|| HeaderName::from_str("upload-offset").unwrap()); - pub static UPLOAD_COMPLETE: Lazy = Lazy::new(|| HeaderName::from_str("upload-complete").unwrap()); + pub const UPLOAD_OFFSET: HeaderName = HeaderName::from_static("upload-offset"); + pub const UPLOAD_COMPLETE: HeaderName = HeaderName::from_static("upload-complete"); + pub const UPLOAD_LENGTH: HeaderName = HeaderName::from_static("upload-length"); } pub trait HeaderMapExt { - fn get_exactly_once(&self, key: &'static HeaderName) -> Result<&HeaderValue, ApiError>; - fn get_at_most_once(&self, key: &'static HeaderName) -> Result, ApiError>; + fn get_exactly_once(&self, key: &HeaderName) -> Result<&HeaderValue, ApiError>; + fn get_at_most_once(&self, key: &HeaderName) -> Result, ApiError>; } impl HeaderMapExt for HeaderMap { - fn get_exactly_once(&self, key: &'static HeaderName) -> Result<&HeaderValue, ApiError> { + fn get_exactly_once(&self, key: &HeaderName) -> Result<&HeaderValue, ApiError> { let mut values_iterator = self.get_all(key).into_iter(); if let Some(value) = values_iterator.next() { @@ -26,12 +26,12 @@ impl HeaderMapExt for HeaderMap { } Err(ApiError::InvalidRequestHeader { - name: key, + name: key.to_owned(), message: "must be specified exactly once".into(), }) } - fn get_at_most_once(&self, key: &'static HeaderName) -> Result, ApiError> { + fn get_at_most_once(&self, key: &HeaderName) -> Result, ApiError> { let mut values_iterator = self.get_all(key).into_iter(); if let Some(value) = values_iterator.next() { @@ -39,7 +39,7 @@ impl HeaderMapExt for HeaderMap { Ok(Some(value)) } else { Err(ApiError::InvalidRequestHeader { - name: key, + name: key.to_owned(), message: "must be specified at most once".into(), }) } @@ -50,23 +50,23 @@ impl HeaderMapExt for HeaderMap { } pub trait HeaderValueExt { - fn get_unsigned_decimal_number(&self, header_name_for_error: &'static HeaderName) -> Result; - fn get_boolean(&self, header_name_for_error: &'static HeaderName) -> Result; + fn get_unsigned_decimal_number(&self, header_name_for_error: &HeaderName) -> Result; + fn get_boolean(&self, header_name_for_error: &HeaderName) -> Result; } impl HeaderValueExt for HeaderValue { - fn get_unsigned_decimal_number(&self, header_name_for_error: &'static HeaderName) -> Result { + fn get_unsigned_decimal_number(&self, header_name_for_error: &HeaderName) -> Result { self.to_str() .ok() .map(|v| v.parse::().ok()) .flatten() .ok_or(ApiError::InvalidRequestHeader { - name: header_name_for_error, + name: header_name_for_error.to_owned(), message: "must be an unsigned 64-bit decimal number".into(), }) } - fn get_boolean(&self, header_name_for_error: &'static HeaderName) -> Result { + fn get_boolean(&self, header_name_for_error: &HeaderName) -> Result { if let Ok(value) = self.to_str() { if value == "?1" { return Ok(true); @@ -78,8 +78,11 @@ impl HeaderValueExt for HeaderValue { } Err(ApiError::InvalidRequestHeader { - name: header_name_for_error, + name: header_name_for_error.to_owned(), message: "must be `?1` (true) or `?0` (false)".into(), }) } } + +pub const CACHE_CONTROL_CACHE_FOREVER: (HeaderName, HeaderValue) = (header::CACHE_CONTROL, HeaderValue::from_static("public, max-age=31536000, immutable")); +pub const CACHE_CONTROL_NEVER_CACHE: (HeaderName, HeaderValue) = (header::CACHE_CONTROL, HeaderValue::from_static("no-store")); diff --git a/src/http_api/upload/append_to_upload.rs b/src/http_api/upload/append_to_upload.rs index b9e34bb..ca900b5 100644 --- a/src/http_api/upload/append_to_upload.rs +++ b/src/http_api/upload/append_to_upload.rs @@ -1,5 +1,5 @@ use crate::http_api::Context; -use crate::http_api::api_error::ApiError; +use crate::http_api::api_error::{ApiError, ProblemJson}; use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers}; use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader}; use crate::upload_manager::{AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager}; @@ -44,7 +44,7 @@ impl IntoResponse for AppendToUploadOutcome { AppendToUploadOutcome::RequestSuperseded => ( StatusCode::CONFLICT, UploadCompleteResponseHeader(false), - Json(json!({ + ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/request-superseded", "title": "Another request superseded the current request.", })), @@ -52,7 +52,7 @@ impl IntoResponse for AppendToUploadOutcome { .into_response(), AppendToUploadOutcome::UploadAlreadyComplete => ( StatusCode::CONFLICT, - Json(json!({ + ProblemJson(json!({ "type": "https://iana.org/assignments/http-problem-types#completed-upload", "title": "The upload is already complete.", })), @@ -60,7 +60,7 @@ impl IntoResponse for AppendToUploadOutcome { .into_response(), AppendToUploadOutcome::Failed(reason) => ( StatusCode::GONE, - Json(json!({ + ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/request-superseded", "title": "The upload was cancelled or failed.", "reason": reason.to_string() @@ -71,7 +71,7 @@ impl IntoResponse for AppendToUploadOutcome { StatusCode::CONFLICT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(expected), - Json(json!({ + ProblemJson(json!({ "type": "https://iana.org/assignments/http-problem-types#mismatching-upload-offset", "title": "The upload offset provided in the request does not match the actual offset of the resource.", "expected-offset": expected, @@ -83,7 +83,7 @@ impl IntoResponse for AppendToUploadOutcome { StatusCode::CONFLICT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(expected), - Json(json!({ + ProblemJson(json!({ "type": "https://iana.org/assignments/http-problem-types#inconsistent-upload-length", "title": "The provided upload lengths are inconsistent with one another or a previously established total length.", "detail": detail, @@ -93,7 +93,7 @@ impl IntoResponse for AppendToUploadOutcome { AppendToUploadOutcome::ContentStreamStoppedUnexpectedly => ( StatusCode::BAD_REQUEST, UploadCompleteResponseHeader(false), - Json(json!({ + ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/content-stream-stopped-unexpectedly", "title": "The content stream stopped unexpectedly.", })), @@ -102,7 +102,7 @@ impl IntoResponse for AppendToUploadOutcome { AppendToUploadOutcome::TooMuchContent => ( StatusCode::BAD_REQUEST, UploadCompleteResponseHeader(false), - Json(json!({ + ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/too-much-content", "title": "The request contained more content than it should.", })), @@ -165,6 +165,7 @@ pub(super) async fn append_to_upload( struct RequestParameters { pub upload: Arc, pub supplied_content_length: Option, + pub supplied_upload_length: Option, pub supplied_upload_offset: u64, pub supplied_upload_complete: bool, } @@ -177,7 +178,7 @@ async fn parse_request_parameters( let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id).await? { match upload { AnyStageUpload::Unfinished(u) => u, - AnyStageUpload::Finished => return Ok(Err(AppendToUploadOutcome::UploadAlreadyComplete)), + AnyStageUpload::Finished { .. } => return Ok(Err(AppendToUploadOutcome::UploadAlreadyComplete)), AnyStageUpload::Failed(reason) => return Ok(Err(AppendToUploadOutcome::Failed(reason))), } } else { @@ -195,7 +196,7 @@ async fn parse_request_parameters( .unwrap_or(false) { return Err(ApiError::InvalidRequestHeader { - name: &axum::http::header::CONTENT_TYPE, + name: axum::http::header::CONTENT_TYPE, message: format!("must be {}", PARTIAL_UPLOAD_MEDIA_TYPE.to_string()).into(), }); } @@ -205,6 +206,11 @@ async fn parse_request_parameters( .map(|v| v.get_unsigned_decimal_number(&axum::http::header::CONTENT_LENGTH)) .transpose()?; + let supplied_upload_length = headers + .get_at_most_once(&upload_headers::UPLOAD_OFFSET)? + .map(|v| v.get_unsigned_decimal_number(&upload_headers::UPLOAD_OFFSET)) + .transpose()?; + let supplied_upload_offset = headers .get_exactly_once(&upload_headers::UPLOAD_OFFSET)? .get_unsigned_decimal_number(&upload_headers::UPLOAD_OFFSET)?; @@ -216,6 +222,7 @@ async fn parse_request_parameters( Ok(Ok(RequestParameters { upload, supplied_content_length, + supplied_upload_length, supplied_upload_offset, supplied_upload_complete, })) diff --git a/src/http_api/upload/mod.rs b/src/http_api/upload/mod.rs index 77a6102..3b14964 100644 --- a/src/http_api/upload/mod.rs +++ b/src/http_api/upload/mod.rs @@ -1,12 +1,12 @@ use crate::http_api::Context; use crate::http_api::api_error::ApiError; use crate::http_api::auth::AppAuthorization; -use crate::http_api::headers::upload_headers; +use crate::http_api::headers::{CACHE_CONTROL_CACHE_FOREVER, CACHE_CONTROL_NEVER_CACHE, upload_headers}; use crate::http_api::upload::append_to_upload::append_to_upload; -use crate::upload_manager::UploadId; -use axum::extract::State; +use crate::upload_manager::{AnyStageUpload, UploadFailureReason, UploadId}; +use axum::extract::{Path, State}; use axum::http::HeaderValue; -use axum::response::{IntoResponseParts, ResponseParts}; +use axum::response::{IntoResponse, IntoResponseParts, Response, ResponseParts}; use axum::{Json, Router, routing}; use serde::{Deserialize, Serialize}; @@ -21,7 +21,7 @@ impl IntoResponseParts for UploadCompleteResponseHeader { fn into_response_parts(self, mut res: ResponseParts) -> Result { res.headers_mut() - .insert(&*upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static(if self.0 { "?1" } else { "?0" })); + .insert(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static(if self.0 { "?1" } else { "?0" })); Ok(res) } } @@ -32,7 +32,7 @@ impl IntoResponseParts for UploadOffsetResponseHeader { type Error = (); fn into_response_parts(self, mut res: ResponseParts) -> Result { - res.headers_mut().insert(&*upload_headers::UPLOAD_OFFSET, self.0.into()); + res.headers_mut().insert(upload_headers::UPLOAD_OFFSET, self.0.into()); Ok(res) } } @@ -50,7 +50,7 @@ struct CreateUploadResponseBody { pub fn create_uploads_router() -> Router { Router::new() .route("/", routing::post(create_upload)) - .route("/{upload_id}", routing::post(append_to_upload)) + .route("/{upload_id}", routing::post(append_to_upload).get(get_upload_metadata)) } async fn create_upload( @@ -69,3 +69,100 @@ async fn create_upload( Ok(Json(CreateUploadResponseBody { upload_id: *upload.id() })) } + +#[derive(Debug, Deserialize)] +struct GetUploadMetadataPathParameters { + upload_id: UploadId, +} + +#[derive(Debug, Serialize)] +struct GetUploadMetadataResponse { + id: Box, + #[serde(flatten)] + state: GetUploadMetadataResponseState, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "snake_case", tag = "state")] +enum GetUploadMetadataResponseState { + Ongoing { current_size: u64, total_size: u64 }, + Complete { size: u64 }, + Finished { size: u64, hash: Box }, + Failed { reason: UploadFailureReason }, +} + +impl IntoResponse for GetUploadMetadataResponse { + fn into_response(self) -> Response { + match self.state { + GetUploadMetadataResponseState::Ongoing { current_size, total_size } => ( + [ + (upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?0")), + (upload_headers::UPLOAD_OFFSET, HeaderValue::from(current_size)), + (upload_headers::UPLOAD_LENGTH, HeaderValue::from(total_size)), + CACHE_CONTROL_NEVER_CACHE, + ], + Json(self), + ) + .into_response(), + GetUploadMetadataResponseState::Complete { size } => ( + [ + (upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?1")), + (upload_headers::UPLOAD_OFFSET, HeaderValue::from(size)), + (upload_headers::UPLOAD_LENGTH, HeaderValue::from(size)), + CACHE_CONTROL_NEVER_CACHE, + ], + Json(self), + ) + .into_response(), + GetUploadMetadataResponseState::Finished { size, .. } => ( + [ + (upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?1")), + (upload_headers::UPLOAD_OFFSET, HeaderValue::from(size)), + (upload_headers::UPLOAD_LENGTH, HeaderValue::from(size)), + CACHE_CONTROL_CACHE_FOREVER, + ], + Json(self), + ) + .into_response(), + GetUploadMetadataResponseState::Failed { .. } => ( + [(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?0")), CACHE_CONTROL_CACHE_FOREVER], + Json(self), + ) + .into_response(), + } + } +} + +async fn get_upload_metadata( + State(context): State, + Path(GetUploadMetadataPathParameters { upload_id }): Path, +) -> Result { + let upload = if let Some(upload) = context.upload_manager.get_upload_by_id(&upload_id).await? { + upload + } else { + return Err(ApiError::UnknownResource { + resource_type: "upload".into(), + id: upload_id.to_string().into(), + }); + }; + + Ok(GetUploadMetadataResponse { + id: upload_id.to_string().into_boxed_str(), + state: match upload { + AnyStageUpload::Unfinished(upload) => { + let state = upload.state().read().await; + + if state.is_complete() { + GetUploadMetadataResponseState::Complete { size: upload.total_size() } + } else { + GetUploadMetadataResponseState::Ongoing { + current_size: state.current_size(), + total_size: upload.total_size(), + } + } + } + AnyStageUpload::Finished { size, hash } => GetUploadMetadataResponseState::Finished { size, hash }, + AnyStageUpload::Failed(reason) => GetUploadMetadataResponseState::Failed { reason }, + }, + }) +} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 0a0c81d..93793d9 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -6,6 +6,7 @@ use camino::Utf8PathBuf; use color_eyre::{Report, Result}; use dashmap::DashMap; use fstr::FStr; +use serde::Serialize; use sqlx::SqlitePool; use std::fmt::Debug; use std::str::FromStr; @@ -112,29 +113,35 @@ impl UploadManager { } pub async fn get_upload_by_id(&self, id: &str) -> Result, Report> { - if let Some(unfinished_uploads) = self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) { - Ok(Some(AnyStageUpload::Unfinished(unfinished_uploads))) - } else { - Ok(sqlx::query!( - "SELECT reason FROM (SELECT id, '' AS reason FROM finished_uploads UNION SELECT id, reason FROM failed_uploads) WHERE id = ?", - id - ) - .map(|row| { - if row.reason.is_empty() { - AnyStageUpload::Finished - } else { - AnyStageUpload::Failed(UploadFailureReason::from_str(&row.reason).unwrap()) - } - }) + if let Some(upload) = self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) { + Ok(Some(AnyStageUpload::Unfinished(upload))) + } else if let Some(upload) = sqlx::query!( + "SELECT id, finished_uploads.hash, objects.size FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?", + id + ) + .map(|row| AnyStageUpload::Finished { + size: row.size as u64, + hash: row.hash.into_boxed_str(), + }) + .fetch_optional(&self.database) + .await? + { + Ok(Some(upload)) + } else if let Some(upload) = sqlx::query!("SELECT reason FROM failed_uploads WHERE id = ?", id) + .map(|row| AnyStageUpload::Failed(UploadFailureReason::from_str(&row.reason).unwrap())) .fetch_optional(&self.database) - .await?) + .await? + { + Ok(Some(upload)) + } else { + Ok(None) } } } pub enum AnyStageUpload { Unfinished(Arc), - Finished, + Finished { size: u64, hash: Box }, Failed(UploadFailureReason), } @@ -232,8 +239,9 @@ impl UnfinishedUploadState { } } -#[derive(Debug, EnumString, Display)] +#[derive(Debug, EnumString, Display, Serialize)] #[strum(serialize_all = "snake_case")] +#[serde(rename_all = "snake_case")] pub enum UploadFailureReason { CancelledByClient, Expired,