WIP: v1.0.0

This commit is contained in:
Moritz Ruth 2025-04-18 21:24:07 +02:00
parent 12d38fb1b0
commit 486f30e4f3
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
12 changed files with 373 additions and 278 deletions

View file

@ -4,8 +4,7 @@ api_secret = "Xt99Hp%wU%zf&vczQ%bJPbr2$owC#wuM#7fxEy%Uc%pp4Thdk7V$4kxMJFupvNKk"
database_file = "./database.sqlite"
staging_directory = "./data/staging"
[[buckets]]
id = "local"
[buckets.local]
display_name = "Local"
backend = "filesystem"
path = "./data"

View file

@ -1,3 +1,4 @@
use crate::util::id::BucketId;
use camino::Utf8PathBuf;
use color_eyre::Result;
use color_eyre::eyre::WrapErr;
@ -7,7 +8,7 @@ use fstr::FStr;
use once_cell::sync::Lazy;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::collections::HashMap;
use std::net::IpAddr;
use validator::{Validate, ValidationError};
@ -21,36 +22,28 @@ pub struct Config {
#[serde(default)]
pub enable_multithreaded_hashing: bool,
#[validate(nested, custom(function = "validate_buckets"))]
pub buckets: Vec<ConfigBucket>,
pub buckets: HashMap<BucketId, ConfigBucket>,
}
fn validate_buckets(buckets: &Vec<ConfigBucket>) -> Result<(), ValidationError> {
let mut ids = HashSet::new();
for bucket_config in buckets {
if !ids.insert(&bucket_config.id) {
return Err(ValidationError::new("duplicate_id").with_message(format!("There is more than one bucket with this ID: {}", bucket_config.id).into()));
};
}
Ok(())
}
static BUCKET_ID_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[a-zA-z0-9_]+$").unwrap());
static BUCKET_ID_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[a-zA-z0-9_]{1,32}$").unwrap());
#[derive(Debug, Serialize, Deserialize, Validate)]
pub struct ConfigBucket {
#[validate(length(min = 1, max = 32), regex(path = *BUCKET_ID_PATTERN), custom(function = "validate_config_bucket_id"))]
pub id: String,
#[validate(length(min = 1, max = 128))]
pub display_name: String,
#[serde(flatten)]
pub backend: ConfigBucketBackend,
}
fn validate_config_bucket_id(value: &str) -> Result<(), ValidationError> {
if value == "staging" {
return Err(ValidationError::new("reserved_bucket_id").with_message("Reserved bucket ID: staging".into()));
fn validate_buckets(value: &HashMap<BucketId, ConfigBucket>) -> Result<(), ValidationError> {
for bucket_id in value.keys() {
if bucket_id.as_ref() == "staging" {
return Err(ValidationError::new("reserved_bucket_id").with_message("Reserved bucket ID: staging".into()));
}
if !BUCKET_ID_PATTERN.is_match(bucket_id) {
return Err(ValidationError::new("illegal_bucket_id").with_message("Bucket ID must match ^[a-zA-z0-9_]{1,32}$".into()));
}
}
Ok(())

View file

@ -1,3 +1,5 @@
use crate::http_api::upload::headers::UploadOffsetResponseHeader;
use crate::upload_manager::UploadFailureReason;
use axum::http::{HeaderName, HeaderValue, StatusCode, header};
use axum::response::{IntoResponse, Response};
use color_eyre::Report;
@ -36,6 +38,17 @@ pub enum ApiError {
InvalidRequestHeader { name: HeaderName, message: Cow<'static, str> },
InvalidRequestContent { path: Cow<'static, str>, message: Cow<'static, str> },
UnknownResource { resource_type: Cow<'static, str>, id: Cow<'static, str> },
RequestBodyTooLong,
RequestBodyTooShort,
IanaUploadAlreadyComplete,
CaosUploadRequestSuperseded,
CaosUploadFailed { reason: UploadFailureReason },
CaosUploadOffsetMismatch { expected: u64, provided: u64 },
CaosInconsistentUploadLength { expected: u64, detail: Cow<'static, str> },
CaosUploadNotFinished,
CaosUnknownBucket { bucket_id: Cow<'static, str> },
}
impl From<Report> for ApiError {
@ -88,6 +101,88 @@ impl IntoResponse for ApiError {
})),
)
.into_response(),
ApiError::RequestBodyTooLong => (
StatusCode::BAD_REQUEST,
ProblemJson(json!({
"type": "https://minna.media/api-problems/general/request-body-too-long",
"title": "The received request body is longer than expected.",
})),
)
.into_response(),
ApiError::RequestBodyTooShort => (
StatusCode::BAD_REQUEST,
ProblemJson(json!({
"type": "https://minna.media/api-problems/general/request-body-too-short",
"title": "The received request body is shorter than expected.",
})),
)
.into_response(),
ApiError::IanaUploadAlreadyComplete => (
StatusCode::CONFLICT,
// According to https://www.ietf.org/archive/id/draft-ietf-httpbis-resumable-upload-08.html#section-4.4.2-7,
// if the request did not (itself) complete the upload, the response must contain the `Upload-Complete: ?0` header.
// When the request is *already* complete, the request cannot ever possibly complete the upload.
ProblemJson(json!({
"type": "https://iana.org/assignments/http-problem-types#completed-upload",
"title": "The upload is already complete.",
})),
)
.into_response(),
ApiError::CaosUploadRequestSuperseded => (
StatusCode::CONFLICT,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/request-superseded",
"title": "Another request superseded the current request.",
})),
)
.into_response(),
ApiError::CaosUploadFailed { reason } => (
StatusCode::GONE,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/request-superseded",
"title": "The upload was cancelled or failed.",
"reason": reason.to_string()
})),
)
.into_response(),
ApiError::CaosUploadOffsetMismatch { expected, provided } => (
StatusCode::CONFLICT,
UploadOffsetResponseHeader(expected),
ProblemJson(json!({
"type": "https://iana.org/assignments/http-problem-types#mismatching-upload-offset",
"title": "The upload offset provided in the request does not match the actual offset of the resource.",
"expected-offset": expected,
"provided-offset": provided,
})),
)
.into_response(),
ApiError::CaosInconsistentUploadLength { expected, detail } => (
StatusCode::CONFLICT,
UploadOffsetResponseHeader(expected),
ProblemJson(json!({
"type": "https://iana.org/assignments/http-problem-types#inconsistent-upload-length",
"title": "The provided upload lengths are inconsistent with one another or a previously established total length.",
"detail": detail,
})),
)
.into_response(),
ApiError::CaosUploadNotFinished => (
StatusCode::CONFLICT,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/upload-not-finished",
"title": "The upload is not finished yet."
})),
)
.into_response(),
ApiError::CaosUnknownBucket { bucket_id } => (
StatusCode::CONFLICT,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/unknown-bucket",
"title": "There is no bucket with the specified ID.",
"bucketId": bucket_id
})),
)
.into_response(),
}
}
}

View file

@ -9,7 +9,7 @@ pub struct AppAuthorization;
impl FromRequestParts<Context> for AppAuthorization {
type Rejection = ApiError;
async fn from_request_parts(parts: &mut Parts, state: &Context) -> Result<Self, Self::Rejection> {
async fn from_request_parts(parts: &mut Parts, context: &Context) -> Result<Self, Self::Rejection> {
let provided_secret = parts
.headers
.get_at_most_once(&axum::http::header::AUTHORIZATION)?
@ -19,7 +19,7 @@ impl FromRequestParts<Context> for AppAuthorization {
.flatten()
.take_if(|v| v.len() == 64);
let correct_secret = state.api_secret;
let correct_secret = context.config.api_secret;
if let Some(provided_secret) = provided_secret {
if constant_time_eq::constant_time_eq(provided_secret.as_bytes(), correct_secret.as_bytes()) {
return Ok(AppAuthorization);

View file

@ -3,26 +3,30 @@ mod auth;
mod headers;
mod upload;
use crate::config::Config;
use crate::http_api::upload::create_uploads_router;
use crate::upload_manager::UploadManager;
use axum::Router;
use color_eyre::Result;
use fstr::FStr;
use sqlx::SqlitePool;
use std::net::IpAddr;
use std::sync::Arc;
#[derive(Debug)]
struct ContextInner {
pub upload_manager: Arc<UploadManager>,
pub api_secret: FStr<64>,
pub database: SqlitePool,
pub config: Arc<Config>,
}
type Context = Arc<ContextInner>;
pub async fn start_http_api_server(upload_manager: Arc<UploadManager>, address: IpAddr, port: u16, api_secret: FStr<64>) -> Result<()> {
let router = Router::new()
.nest("/uploads", create_uploads_router())
.with_state(Arc::new(ContextInner { upload_manager, api_secret }));
pub async fn start_http_api_server(upload_manager: Arc<UploadManager>, database: SqlitePool, config: Arc<Config>, address: IpAddr, port: u16) -> Result<()> {
let router = Router::new().nest("/uploads", create_uploads_router()).with_state(Arc::new(ContextInner {
upload_manager,
database,
config,
}));
let listener = tokio::net::TcpListener::bind((address, port)).await?;
axum::serve(listener, router).await?;

View file

@ -1,8 +1,9 @@
use crate::http_api::Context;
use crate::http_api::api_error::{ApiError, ProblemJson};
use crate::http_api::api_error::ApiError;
use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers};
use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader};
use crate::upload_manager::{AcquiredUnfinishedUpload, AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager};
use crate::http_api::upload::PARTIAL_UPLOAD_MEDIA_TYPE;
use crate::http_api::upload::headers::{UploadCompleteResponseHeader, UploadOffsetResponseHeader};
use crate::upload_manager::{AcquiredUnfinishedUpload, UploadFailureReason, UploadId};
use crate::util::acquirable::Acquisition;
use axum::body::{Body, BodyDataStream};
use axum::extract::{Path, State};
@ -11,10 +12,7 @@ use axum::response::{IntoResponse, Response};
use color_eyre::Report;
use futures::TryStreamExt;
use serde::Deserialize;
use serde_json::json;
use std::borrow::Cow;
use std::io::ErrorKind;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio_util::io::StreamReader;
@ -25,96 +23,54 @@ pub(super) struct AppendToUploadPathParameters {
}
#[derive(Debug)]
enum AppendToUploadOutcome {
RequestSuperseded,
UploadAlreadyComplete,
Failed(UploadFailureReason),
UploadOffsetMismatch { expected: u64, provided: u64 },
InconsistentUploadLength { expected: u64, detail: Cow<'static, str> },
ContentStreamStoppedUnexpectedly,
TooMuchContent,
pub(super) enum AppendToUploadResponse {
UploadIncomplete { offset: u64 },
UploadFailed { reason: UploadFailureReason },
UploadComplete,
}
impl IntoResponse for AppendToUploadOutcome {
impl IntoResponse for AppendToUploadResponse {
fn into_response(self) -> Response {
match self {
AppendToUploadOutcome::RequestSuperseded => (
StatusCode::CONFLICT,
UploadCompleteResponseHeader(false),
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/request-superseded",
"title": "Another request superseded the current request.",
})),
)
.into_response(),
AppendToUploadOutcome::UploadAlreadyComplete => (
StatusCode::CONFLICT,
ProblemJson(json!({
"type": "https://iana.org/assignments/http-problem-types#completed-upload",
"title": "The upload is already complete.",
})),
)
.into_response(),
AppendToUploadOutcome::Failed(reason) => (
StatusCode::GONE,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/request-superseded",
"title": "The upload was cancelled or failed.",
"reason": reason.to_string()
})),
)
.into_response(),
AppendToUploadOutcome::UploadOffsetMismatch { expected, provided } => (
StatusCode::CONFLICT,
UploadCompleteResponseHeader(false),
UploadOffsetResponseHeader(expected),
ProblemJson(json!({
"type": "https://iana.org/assignments/http-problem-types#mismatching-upload-offset",
"title": "The upload offset provided in the request does not match the actual offset of the resource.",
"expected-offset": expected,
"provided-offset": provided,
})),
)
.into_response(),
AppendToUploadOutcome::InconsistentUploadLength { expected, detail } => (
StatusCode::CONFLICT,
UploadCompleteResponseHeader(false),
UploadOffsetResponseHeader(expected),
ProblemJson(json!({
"type": "https://iana.org/assignments/http-problem-types#inconsistent-upload-length",
"title": "The provided upload lengths are inconsistent with one another or a previously established total length.",
"detail": detail,
})),
)
.into_response(),
AppendToUploadOutcome::ContentStreamStoppedUnexpectedly => (
StatusCode::BAD_REQUEST,
UploadCompleteResponseHeader(false),
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/content-stream-stopped-unexpectedly",
"title": "The content stream stopped unexpectedly.",
})),
)
.into_response(),
AppendToUploadOutcome::TooMuchContent => (
StatusCode::BAD_REQUEST,
UploadCompleteResponseHeader(false),
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/too-much-content",
"title": "The request contained more content than it should.",
})),
)
.into_response(),
AppendToUploadOutcome::UploadIncomplete { offset } => (
AppendToUploadResponse::UploadIncomplete { offset } => (
StatusCode::NO_CONTENT,
UploadCompleteResponseHeader(false),
UploadOffsetResponseHeader(offset),
Body::empty(),
)
.into_response(),
AppendToUploadOutcome::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(),
AppendToUploadResponse::UploadFailed { reason } => (UploadCompleteResponseHeader(false), ApiError::CaosUploadFailed { reason }).into_response(),
AppendToUploadResponse::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(),
}
}
}
pub(super) struct AppendToUploadError {
inner: ApiError,
}
impl IntoResponse for AppendToUploadError {
fn into_response(self) -> Response {
(UploadCompleteResponseHeader(false), self.inner).into_response()
}
}
impl From<ApiError> for AppendToUploadError {
fn from(value: ApiError) -> Self {
AppendToUploadError { inner: value }
}
}
impl From<Report> for AppendToUploadError {
fn from(value: Report) -> Self {
AppendToUploadError { inner: value.into() }
}
}
impl From<std::io::Error> for AppendToUploadError {
fn from(error: std::io::Error) -> Self {
AppendToUploadError {
inner: Report::new(error).into(),
}
}
}
@ -124,93 +80,65 @@ pub(super) async fn append_to_upload(
Path(AppendToUploadPathParameters { upload_id }): Path<AppendToUploadPathParameters>,
headers: HeaderMap,
request_body: Body,
) -> Result<impl IntoResponse, (UploadCompleteResponseHeader, ApiError)> {
let parameters = match parse_request_parameters(&context.upload_manager, upload_id, &headers)
.await
.map_err(|e| (UploadCompleteResponseHeader(false), e))?
{
Ok(p) => p,
Err(o) => return Ok(o),
) -> Result<AppendToUploadResponse, AppendToUploadError> {
let mut tx = context.database.begin().await.map_err(Into::<Report>::into)?;
let upload = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await {
upload
} else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? {
return Ok(AppendToUploadResponse::UploadFailed { reason }.into());
} else if let Some(_) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? {
return Err(ApiError::IanaUploadAlreadyComplete.into());
} else {
return Err(ApiError::UnknownResource {
resource_type: "upload".into(),
id: upload_id.to_string().into(),
}
.into());
};
let mut upload_acquisition = if let Some(a) = parameters.upload.acquire().await {
let parameters = parse_request_parameters(&headers).await?;
let mut upload_acquisition = if let Some(a) = upload.acquire().await {
a
} else {
return Ok(AppendToUploadOutcome::RequestSuperseded);
return Err(ApiError::CaosUploadRequestSuperseded.into());
};
if let Some(supplied_upload_length) = parameters.supplied_upload_length {
let expected = upload_acquisition.inner().upload().total_size();
if supplied_upload_length != expected {
return Ok(AppendToUploadOutcome::InconsistentUploadLength {
return Err(ApiError::CaosInconsistentUploadLength {
expected,
detail: format!("Upload-Length is set to {supplied_upload_length}, but the actual length is {expected}.").into(),
});
}
.into());
}
}
if parameters.upload.is_complete() {
return Ok(AppendToUploadOutcome::UploadAlreadyComplete);
if upload.is_complete() {
return Err(ApiError::IanaUploadAlreadyComplete.into());
}
let outcome = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream())
.await
.map_err(|report| {
(
UploadCompleteResponseHeader(false),
ApiError::Internal {
report: report.wrap_err(format!("Error during file upload ({upload_id})")),
},
)
})?;
let response = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream()).await?;
match &outcome {
AppendToUploadOutcome::UploadComplete => {
upload_acquisition
.complete()
.await
.map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?;
}
AppendToUploadOutcome::Failed(reason) => {
upload_acquisition
.fail(*reason)
.await
.map_err(|e| (UploadCompleteResponseHeader(false), e.into()))?;
}
_ => {
upload_acquisition.release().await;
}
match &response {
AppendToUploadResponse::UploadIncomplete { .. } => upload_acquisition.release().await,
AppendToUploadResponse::UploadComplete => upload_acquisition.complete().await?,
AppendToUploadResponse::UploadFailed { reason } => upload_acquisition.fail(*reason).await?,
}
Ok(outcome)
Ok(response)
}
struct RequestParameters {
pub upload: Arc<UnfinishedUpload>,
pub supplied_content_length: Option<u64>,
pub supplied_upload_length: Option<u64>,
pub supplied_upload_offset: u64,
pub supplied_upload_complete: bool,
}
async fn parse_request_parameters(
upload_manager: &UploadManager,
upload_id: UploadId,
headers: &HeaderMap,
) -> Result<Result<RequestParameters, AppendToUploadOutcome>, ApiError> {
let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id).await? {
match upload {
AnyStageUpload::Unfinished(u) => u,
AnyStageUpload::Finished { .. } => return Ok(Err(AppendToUploadOutcome::UploadAlreadyComplete)),
AnyStageUpload::Failed(reason) => return Ok(Err(AppendToUploadOutcome::Failed(reason))),
}
} else {
return Err(ApiError::UnknownResource {
resource_type: "upload".into(),
id: upload_id.to_string().into(),
});
};
async fn parse_request_parameters(headers: &HeaderMap) -> Result<RequestParameters, ApiError> {
if !headers
.get_exactly_once(&axum::http::header::CONTENT_TYPE)?
.to_str()
@ -242,63 +170,65 @@ async fn parse_request_parameters(
.get_exactly_once(&upload_headers::UPLOAD_COMPLETE)?
.get_boolean(&upload_headers::UPLOAD_COMPLETE)?;
Ok(Ok(RequestParameters {
upload,
Ok(RequestParameters {
supplied_content_length,
supplied_upload_length,
supplied_upload_offset,
supplied_upload_complete,
}))
})
}
async fn do_append(
upload_acquisition: &mut Acquisition<AcquiredUnfinishedUpload>,
parameters: RequestParameters,
content_stream: BodyDataStream,
) -> Result<AppendToUploadOutcome, Report> {
) -> Result<AppendToUploadResponse, AppendToUploadError> {
let release_request_token = upload_acquisition.release_request_token();
let acquired_upload = upload_acquisition.inner();
let upload = acquired_upload.upload();
let mut file = acquired_upload.file().get_or_open(true).await?;
let total_size = parameters.upload.total_size();
let total_size = upload.total_size();
let current_offset = file.stream_position().await?;
if current_offset < parameters.upload.current_size() {
log::error!(
"The upload ({}) failed because the file contains less data than expected.",
parameters.upload.id()
);
return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData));
if current_offset < upload.current_size() {
log::error!("The upload ({}) failed because the file contains less data than expected.", upload.id());
return Ok(AppendToUploadResponse::UploadFailed {
reason: UploadFailureReason::MissingData,
}
.into());
}
let remaining_content_length = total_size - current_offset;
if parameters.supplied_upload_offset != current_offset {
return Ok(AppendToUploadOutcome::UploadOffsetMismatch {
return Err(ApiError::CaosUploadOffsetMismatch {
expected: current_offset,
provided: parameters.supplied_upload_offset,
});
}
.into());
}
let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length {
if parameters.supplied_upload_complete {
if remaining_content_length != supplied_content_length {
return Ok(AppendToUploadOutcome::InconsistentUploadLength {
return Err(ApiError::CaosInconsistentUploadLength {
expected: total_size,
detail: "Upload-Complete is set to true, and Content-Length is set, \
but the value of Content-Length does not equal the length of the remaining content."
.into(),
});
}
.into());
}
} else {
if supplied_content_length >= remaining_content_length {
return Ok(AppendToUploadOutcome::InconsistentUploadLength {
return Err(ApiError::CaosInconsistentUploadLength {
expected: total_size,
detail: "Upload-Complete is set to false, and Content-Length is set, \
but the value of Content-Length is not smaller than the length of the remaining content."
.into(),
});
}
.into());
}
}
@ -330,21 +260,21 @@ async fn do_append(
false
};
Ok(if let Some(outcome) = outcome {
if let Some(outcome) = outcome {
match outcome {
StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly,
StreamToFileOutcome::TooMuchContent => AppendToUploadOutcome::TooMuchContent,
StreamToFileOutcome::StoppedUnexpectedly => Err(ApiError::RequestBodyTooShort.into()),
StreamToFileOutcome::TooMuchContent => Err(ApiError::RequestBodyTooLong.into()),
StreamToFileOutcome::Success => {
if is_upload_complete {
AppendToUploadOutcome::UploadComplete
Ok(AppendToUploadResponse::UploadComplete)
} else {
AppendToUploadOutcome::UploadIncomplete { offset: new_size }
Ok(AppendToUploadResponse::UploadIncomplete { offset: new_size })
}
}
}
} else {
AppendToUploadOutcome::RequestSuperseded
})
Err(ApiError::CaosUploadRequestSuperseded.into())
}
}
#[derive(Debug)]

View file

@ -0,0 +1,26 @@
use crate::http_api::headers::upload_headers;
use axum::http::HeaderValue;
use axum::response::{IntoResponseParts, ResponseParts};
pub struct UploadCompleteResponseHeader(pub bool);
impl IntoResponseParts for UploadCompleteResponseHeader {
type Error = ();
fn into_response_parts(self, mut res: ResponseParts) -> Result<ResponseParts, Self::Error> {
res.headers_mut()
.insert(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static(if self.0 { "?1" } else { "?0" }));
Ok(res)
}
}
pub struct UploadOffsetResponseHeader(pub u64);
impl IntoResponseParts for UploadOffsetResponseHeader {
type Error = ();
fn into_response_parts(self, mut res: ResponseParts) -> Result<ResponseParts, Self::Error> {
res.headers_mut().insert(upload_headers::UPLOAD_OFFSET, self.0.into());
Ok(res)
}
}

View file

@ -3,40 +3,20 @@ use crate::http_api::api_error::ApiError;
use crate::http_api::auth::AppAuthorization;
use crate::http_api::headers::{CACHE_CONTROL_CACHE_FOREVER, CACHE_CONTROL_NEVER_CACHE, upload_headers};
use crate::http_api::upload::append_to_upload::append_to_upload;
use crate::upload_manager::{AnyStageUpload, UploadFailureReason, UploadId};
use crate::upload_manager::{UploadFailureReason, UploadId};
use crate::util::id::BucketId;
use axum::extract::{Path, State};
use axum::http::HeaderValue;
use axum::response::{IntoResponse, IntoResponseParts, Response, ResponseParts};
use axum::response::{IntoResponse, Response};
use axum::{Json, Router, routing};
use color_eyre::Report;
use serde::{Deserialize, Serialize};
pub mod append_to_upload;
pub mod headers;
const PARTIAL_UPLOAD_MEDIA_TYPE: &'static str = "application/partial-upload";
struct UploadCompleteResponseHeader(bool);
impl IntoResponseParts for UploadCompleteResponseHeader {
type Error = ();
fn into_response_parts(self, mut res: ResponseParts) -> Result<ResponseParts, Self::Error> {
res.headers_mut()
.insert(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static(if self.0 { "?1" } else { "?0" }));
Ok(res)
}
}
struct UploadOffsetResponseHeader(u64);
impl IntoResponseParts for UploadOffsetResponseHeader {
type Error = ();
fn into_response_parts(self, mut res: ResponseParts) -> Result<ResponseParts, Self::Error> {
res.headers_mut().insert(upload_headers::UPLOAD_OFFSET, self.0.into());
Ok(res)
}
}
#[derive(Debug, Deserialize)]
struct CreateUploadPayload {
size: u64,
@ -51,6 +31,7 @@ pub fn create_uploads_router() -> Router<Context> {
Router::new()
.route("/", routing::post(create_upload))
.route("/{upload_id}", routing::get(get_upload_metadata).post(append_to_upload))
.route("/{upload_id}/accept", routing::post(accept_upload))
}
async fn create_upload(
@ -137,30 +118,78 @@ async fn get_upload_metadata(
State(context): State<Context>,
Path(GetUploadMetadataPathParameters { upload_id }): Path<GetUploadMetadataPathParameters>,
) -> Result<impl IntoResponse, ApiError> {
let upload = if let Some(upload) = context.upload_manager.get_upload_by_id(&upload_id).await? {
upload
let mut tx = context.database.begin().await.map_err(Into::<Report>::into)?;
let state: GetUploadMetadataResponseState = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await {
if upload.is_complete() {
GetUploadMetadataResponseState::Complete { size: upload.total_size() }
} else {
GetUploadMetadataResponseState::Ongoing {
current_size: upload.current_size(),
total_size: upload.total_size(),
}
}
} else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? {
GetUploadMetadataResponseState::Failed { reason }
} else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? {
GetUploadMetadataResponseState::Finished {
size: metadata.size,
hash: metadata.hash,
}
} else {
return Err(ApiError::UnknownResource {
resource_type: "upload".into(),
id: upload_id.to_string().into(),
});
}
.into());
};
Ok(GetUploadMetadataResponse {
id: upload_id.to_string().into_boxed_str(),
state: match upload {
AnyStageUpload::Unfinished(upload) => {
if upload.is_complete() {
GetUploadMetadataResponseState::Complete { size: upload.total_size() }
} else {
GetUploadMetadataResponseState::Ongoing {
current_size: upload.current_size(),
total_size: upload.total_size(),
}
}
}
AnyStageUpload::Finished { size, hash } => GetUploadMetadataResponseState::Finished { size, hash },
AnyStageUpload::Failed(reason) => GetUploadMetadataResponseState::Failed { reason },
},
state,
})
}
#[derive(Debug, Deserialize)]
struct AcceptUploadPathParameters {
upload_id: UploadId,
}
#[derive(Debug, Deserialize)]
struct AcceptUploadRequestBody {
bucket_ids: Vec<BucketId>,
}
async fn accept_upload(
State(context): State<Context>,
Path(AcceptUploadPathParameters { upload_id }): Path<AcceptUploadPathParameters>,
Json(body): Json<AcceptUploadRequestBody>,
) -> Result<impl IntoResponse, ApiError> {
let mut tx = context.database.begin().await.map_err(Into::<Report>::into)?;
let _hash = if let Some(_) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await {
return Err(ApiError::CaosUploadNotFinished);
} else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? {
return Err(ApiError::CaosUploadFailed { reason });
} else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? {
metadata.hash
} else {
return Err(ApiError::UnknownResource {
resource_type: "upload".into(),
id: upload_id.to_string().into(),
}
.into());
};
for bucket_id in &body.bucket_ids {
if !context.config.buckets.contains_key(bucket_id) {
return Err(ApiError::CaosUnknownBucket {
bucket_id: bucket_id.to_string().into(),
}
.into());
}
}
context.upload_manager.accept_finished_upload(&mut tx, upload_id, body.bucket_ids).await?;
tx.commit().await.map_err(Into::<Report>::into)?;
Ok(())
}

View file

@ -9,11 +9,13 @@ mod util;
use crate::config::{ConfigBucket, ConfigBucketBackend, load_config};
use crate::http_api::start_http_api_server;
use crate::upload_manager::UploadManager;
use crate::util::id::BucketId;
use camino::Utf8Path;
use color_eyre::Result;
use color_eyre::eyre::{WrapErr, eyre};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::io::ErrorKind;
use std::sync::Arc;
use tokio::fs;
#[tokio::main]
@ -21,7 +23,7 @@ async fn main() -> Result<()> {
color_eyre::install().unwrap();
env_logger::init();
let config = load_config()?;
let config = Arc::new(load_config()?);
log::debug!("Loaded configuration: {:#?}", config);
@ -32,11 +34,11 @@ async fn main() -> Result<()> {
.await
.wrap_err("Failed to open the database connection.")?;
let upload_manager = UploadManager::create(database.clone(), config.staging_directory, config.enable_multithreaded_hashing).await?;
let upload_manager = UploadManager::create(database.clone(), &config.staging_directory, config.enable_multithreaded_hashing).await?;
log::info!("Initialization successful.");
start_http_api_server(upload_manager, config.http_address, config.http_port, config.api_secret).await?;
start_http_api_server(upload_manager, database, Arc::clone(&config), config.http_address, config.http_port).await?;
Ok(())
}
@ -53,11 +55,11 @@ async fn initialize_staging_directory(path: &Utf8Path) -> Result<()> {
Ok(())
}
async fn initialize_buckets(bucket_configs: &Vec<ConfigBucket>) -> Result<()> {
async fn initialize_buckets(bucket_configs: &HashMap<BucketId, ConfigBucket>) -> Result<()> {
let mut filesystem_backend_paths = HashSet::new();
for bucket_config in bucket_configs {
log::info!("Initializing bucket: {}", bucket_config.id);
for (bucket_id, bucket_config) in bucket_configs {
log::info!("Initializing bucket: {}", bucket_id);
match &bucket_config.backend {
ConfigBucketBackend::Filesystem(filesystem_backend_config) => {
@ -81,7 +83,7 @@ async fn initialize_buckets(bucket_configs: &Vec<ConfigBucket>) -> Result<()> {
check_directory_writable(&path)
.await
.wrap_err_with(|| format!("The writable check for the {} bucket failed.", &bucket_config.id))?;
.wrap_err_with(|| format!("The writable check for the {} bucket failed.", bucket_id))?;
filesystem_backend_paths.insert(path);
}

View file

@ -1,4 +1,4 @@
use crate::upload_manager::{AcquiredUnfinishedUpload, UnfinishedUpload};
use crate::upload_manager::AcquiredUnfinishedUpload;
use crate::util::hash_to_hex_string::HashExt;
use crate::util::temporal_formatting::TemporalFormatting;
use blake3::Hasher;

View file

@ -1,19 +1,17 @@
use crate::processing_worker::do_processing_work;
use crate::util::acquirable::{Acquirable, Acquisition};
pub(crate) use crate::util::file_reference::FileReference;
use crate::util::id::generate_id;
use crate::util::id::{BucketId, generate_id};
use camino::Utf8PathBuf;
use color_eyre::{Report, Result};
use dashmap::DashMap;
use fstr::FStr;
use serde::Serialize;
use sqlx::SqlitePool;
use sqlx::{SqlitePool, SqliteTransaction};
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use strum::{Display, EnumString};
use tokio::sync::RwLock;
const LARGE_FILE_SIZE_THRESHOLD: u64 = 1024;
pub const UPLOAD_ID_LENGTH: usize = 16;
@ -29,7 +27,7 @@ pub struct UploadManager {
}
impl UploadManager {
pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result<Arc<Self>> {
pub async fn create(database: SqlitePool, staging_directory_path: &Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result<Arc<Self>> {
log::info!("Loading uploads…");
let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel();
@ -65,7 +63,7 @@ impl UploadManager {
});
if is_complete {
upload.acquire().await.unwrap().complete().await;
upload.acquire().await.unwrap().complete().await?;
}
manager.unfinished_uploads.insert(id, upload);
@ -114,37 +112,52 @@ impl UploadManager {
Ok(upload)
}
pub async fn get_upload_by_id(&self, id: &str) -> Result<Option<AnyStageUpload>, Report> {
if let Some(upload) = self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value())) {
Ok(Some(AnyStageUpload::Unfinished(upload)))
} else if let Some(upload) = sqlx::query!(
"SELECT id, finished_uploads.hash, objects.size FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?",
pub async fn get_unfinished_upload_by_id(&self, id: &UploadId) -> Option<Arc<UnfinishedUpload>> {
self.unfinished_uploads.get(id).map(|a| Arc::clone(a.value()))
}
pub async fn get_upload_failure_reason_by_id(&self, tx: &mut SqliteTransaction<'_>, id: &UploadId) -> Result<Option<UploadFailureReason>> {
let id = id.to_string();
Ok(sqlx::query!("SELECT reason FROM failed_uploads WHERE id = ?", id)
.fetch_optional(&mut **tx)
.await?
.map(|r| r.reason.parse().unwrap()))
}
pub async fn get_finished_upload_metadata_by_id(&self, tx: &mut SqliteTransaction<'_>, id: &UploadId) -> Result<Option<FinishedUploadMetadata>> {
let id = id.to_string();
Ok(sqlx::query!(
"SELECT objects.hash, objects.size FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?",
id
)
.map(|row| AnyStageUpload::Finished {
size: row.size as u64,
hash: row.hash.into_boxed_str(),
})
.fetch_optional(&self.database)
.fetch_optional(&mut **tx)
.await?
{
Ok(Some(upload))
} else if let Some(upload) = sqlx::query!("SELECT reason FROM failed_uploads WHERE id = ?", id)
.map(|row| AnyStageUpload::Failed(UploadFailureReason::from_str(&row.reason).unwrap()))
.fetch_optional(&self.database)
.map(|r| FinishedUploadMetadata {
size: r.size as u64,
hash: r.hash.into_boxed_str(),
}))
}
pub async fn accept_finished_upload(&self, tx: &mut SqliteTransaction<'_>, id: UploadId, bucket_ids: Vec<BucketId>) -> Result<Option<()>> {
let id = id.to_string();
let hash = sqlx::query!("DELETE FROM finished_uploads WHERE id = ? RETURNING hash", id)
.fetch_one(&mut **tx)
.await?
{
Ok(Some(upload))
} else {
Ok(None)
.hash;
for bucket_id in bucket_ids {
sqlx::query!("INSERT INTO object_replicas (hash, bucket_id, is_present) VALUES (?, ?, 0)", hash, bucket_id)
.execute(&mut **tx)
.await?;
}
Ok(Some(()))
}
}
pub enum AnyStageUpload {
Unfinished(Arc<UnfinishedUpload>),
Finished { size: u64, hash: Box<str> },
Failed(UploadFailureReason),
pub struct FinishedUploadMetadata {
pub hash: Box<str>,
pub size: u64,
}
#[derive(Debug)]
@ -197,7 +210,9 @@ impl AcquiredUnfinishedUpload {
pub async fn set_current_size(&self, current_size: u64) -> Result<(), Report> {
let upload = self.upload.upgrade().unwrap();
let manager = upload.manager.upgrade().unwrap();
upload.current_size.store(current_size, Ordering::Relaxed);
let id = upload.id.to_string();
let current_size = current_size as i64;

View file

@ -6,3 +6,5 @@ pub fn generate_id<const N: usize>() -> FStr<N> {
let bytes: [u8; N] = std::array::from_fn(|_| rand::rng().sample(&Alphanumeric));
unsafe { FStr::from_inner_unchecked(bytes) }
}
pub type BucketId = Box<str>;