WIP: v1.0.0

This commit is contained in:
Moritz Ruth 2025-04-20 23:30:12 +02:00
parent 0ed22f9bf6
commit 3709f6efc4
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
10 changed files with 422 additions and 223 deletions

1
.gitignore vendored
View file

@ -1,4 +1,5 @@
/target
.idea/
.sqlx/
/run/
*.env

View file

@ -75,6 +75,7 @@ application.
Even while this process is still running, the object data is already accessible at `/objects/OBJECT_HASH`.
### Filesystem requirements
minna-caos uses the local filesystem as a staging area for uploads. (This is the special `staging` bucket seen in the example above.)
@ -87,7 +88,9 @@ The filesystem containing the staging directory must…
## Roadmap
- metadata endpoints
- endpoint for settling an upload with the hash of an existing object
- support non-resumable clients
- send the draft version header
- support upload cancellation
- upload expiration
- garbage-collect failed uploads
- add code comments

View file

@ -20,7 +20,8 @@ create table unfinished_uploads
(
id text not null,
current_size integer not null, -- in bytes
total_size integer not null, -- in bytes
total_size integer, -- in bytes
max_size integer, -- in bytes
is_complete integer not null, -- boolean
primary key (id)
) without rowid, strict;

View file

@ -46,11 +46,12 @@ pub enum ApiError {
CaosUploadRequestSuperseded,
CaosUploadFailed { reason: UploadFailureReason },
CaosUploadOffsetMismatch { expected: u64, provided: u64 },
CaosInconsistentUploadLength { expected: u64, detail: Cow<'static, str> },
CaosUploadNotFinished,
CaosInconsistentUploadLength { detail: Cow<'static, str> },
CaosWrongUploadStage { expected: Cow<'static, str>, actual: Cow<'static, str> },
CaosUnknownBucket { bucket_id: Cow<'static, str> },
CaosNoReplicaAvailable,
CaosStagingAreaFull,
CaosUploadSizeLimitExceeded { limit: u64, provided: u64 },
}
impl From<Report> for ApiError {
@ -164,21 +165,22 @@ impl IntoResponse for ApiError {
})),
)
.into_response(),
ApiError::CaosInconsistentUploadLength { expected, detail } => (
ApiError::CaosInconsistentUploadLength { detail } => (
StatusCode::CONFLICT,
UploadOffsetResponseHeader(expected),
ProblemJson(json!({
"type": "https://iana.org/assignments/http-problem-types#inconsistent-uploads-length",
"title": "The provided uploads lengths are inconsistent with one another or a previously established total length.",
"title": "The provided upload lengths are inconsistent with one another or a previously established total length.",
"detail": detail,
})),
)
.into_response(),
ApiError::CaosUploadNotFinished => (
ApiError::CaosWrongUploadStage { expected, actual } => (
StatusCode::CONFLICT,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/uploads-not-finished",
"title": "The uploads is not finished yet."
"type": "https://minna.media/api-problems/caos/wrong-upload-stage",
"title": "The upload is not in the expected stage.",
"expected": expected,
"actual": actual,
})),
)
.into_response(),
@ -209,6 +211,16 @@ impl IntoResponse for ApiError {
})),
)
.into_response(),
ApiError::CaosUploadSizeLimitExceeded { limit, provided } => (
StatusCode::BAD_REQUEST,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/upload-size-limit-exceeded",
"title": "The data is longer than allowed for this upload.",
"limit": limit,
"provided": provided,
})),
)
.into_response(),
}
}
}

View file

@ -1,5 +1,6 @@
use crate::http_api::api_error::ApiError;
use axum::http::{HeaderMap, HeaderName, HeaderValue, header};
use std::num::NonZeroU64;
pub mod upload_headers {
use axum::http::HeaderName;
@ -50,6 +51,7 @@ impl HeaderMapExt for HeaderMap {
pub trait HeaderValueExt {
fn get_unsigned_decimal_number(&self, header_name_for_error: &HeaderName) -> Result<u64, ApiError>;
fn get_positive_decimal_number(&self, header_name_for_error: &HeaderName) -> Result<NonZeroU64, ApiError>;
fn get_boolean(&self, header_name_for_error: &HeaderName) -> Result<bool, ApiError>;
}
@ -65,6 +67,17 @@ impl HeaderValueExt for HeaderValue {
})
}
fn get_positive_decimal_number(&self, header_name_for_error: &HeaderName) -> Result<NonZeroU64, ApiError> {
self.to_str()
.ok()
.map(|v| v.parse::<NonZeroU64>().ok())
.flatten()
.ok_or(ApiError::InvalidRequestHeader {
name: header_name_for_error.to_owned(),
message: "must be a positive 64-bit decimal number".into(),
})
}
fn get_boolean(&self, header_name_for_error: &HeaderName) -> Result<bool, ApiError> {
if let Ok(value) = self.to_str() {
if value == "?1" {

View file

@ -3,7 +3,7 @@ use crate::http_api::api_error::ApiError;
use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers};
use crate::http_api::uploads::headers::{UploadCompleteResponseHeader, UploadOffsetResponseHeader};
use crate::http_api::uploads::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadPathParameters};
use crate::upload_manager::{AcquiredUnfinishedUpload, UploadFailureReason};
use crate::upload_manager::{AcquiredUnfinishedUpload, UnfinishedUploadMetadata, UnfinishedUploadStage, UploadFailureReason};
use crate::util::acquirable::Acquisition;
use axum::body::{Body, BodyDataStream};
use axum::extract::{Path, State};
@ -12,6 +12,7 @@ use axum::response::{IntoResponse, NoContent, Response};
use color_eyre::Report;
use futures::TryStreamExt;
use std::io::ErrorKind;
use std::num::NonZeroU64;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio_util::io::StreamReader;
@ -69,7 +70,7 @@ pub(super) async fn append_to_upload(
headers: HeaderMap,
request_body: Body,
) -> Result<AppendToUploadResponse, AppendToUploadError> {
let mut tx = context.database.begin().await.map_err(Into::<Report>::into)?;
let mut tx = context.database.begin().await.map_err(Report::from)?;
let upload = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await {
upload
@ -85,7 +86,8 @@ pub(super) async fn append_to_upload(
.into());
};
let parameters = parse_request_parameters(&headers).await?;
let metadata = upload.metadata(&mut tx).await.map_err(Report::from)?;
let parameters = parse_request_parameters(&headers)?;
let mut upload_acquisition = if let Some(a) = upload.acquire().await {
a
@ -93,40 +95,136 @@ pub(super) async fn append_to_upload(
return Err(ApiError::CaosUploadRequestSuperseded.into());
};
if let Some(supplied_upload_length) = parameters.supplied_upload_length {
let expected = upload_acquisition.inner().upload().total_size();
if supplied_upload_length != expected {
return Err(ApiError::CaosInconsistentUploadLength {
expected,
detail: format!("Upload-Length is set to {supplied_upload_length}, but the actual length is {expected}.").into(),
let new_total_size = if let Some(supplied_upload_length) = parameters.supplied_upload_length {
if let Some(expected) = metadata.total_size {
if supplied_upload_length != expected {
return Err(ApiError::CaosInconsistentUploadLength {
detail: format!("Upload-Length is set to {supplied_upload_length}, but the previously established length is {expected}.").into(),
}
.into());
}
.into());
None
} else {
Some(supplied_upload_length)
}
} else {
None
};
if let Some(new_total_size) = new_total_size {
upload.set_total_size(&mut tx, new_total_size).await?;
}
if upload.is_complete() {
return Err(ApiError::IanaUploadAlreadyComplete.into());
match metadata.stage() {
UnfinishedUploadStage::Created => {}
UnfinishedUploadStage::Ongoing => {}
UnfinishedUploadStage::Complete => return Err(ApiError::IanaUploadAlreadyComplete.into()),
}
let response = do_append(&mut upload_acquisition, parameters, request_body.into_data_stream()).await?;
let release_request_token = upload_acquisition.release_request_token();
let acquired_upload = upload_acquisition.inner();
let upload = acquired_upload.upload();
let mut file = acquired_upload.file().get_or_open(true).await?;
match &response {
AppendToUploadResponse::UploadIncomplete { .. } => upload_acquisition.release().await,
AppendToUploadResponse::UploadComplete => upload_acquisition.complete().await?,
AppendToUploadResponse::UploadFailed { reason } => upload_acquisition.fail(*reason).await?,
let current_offset = file.stream_position().await?;
if current_offset < metadata.current_size {
log::error!("The upload ({}) failed because the file contains less data than expected.", upload.id());
return Ok(AppendToUploadResponse::UploadFailed {
reason: UploadFailureReason::MissingData,
}
.into());
}
Ok(response)
let remaining_length = metadata.total_size.map(|s| s.get() - current_offset);
if parameters.supplied_upload_offset != current_offset {
return Err(ApiError::CaosUploadOffsetMismatch {
expected: current_offset,
provided: parameters.supplied_upload_offset,
}
.into());
}
let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length {
if let Some(remaining_length) = remaining_length {
if parameters.supplied_upload_complete {
if remaining_length != supplied_content_length {
return Err(ApiError::CaosInconsistentUploadLength {
detail: "Upload-Complete is set to true, and Content-Length is set, \
but the value of Content-Length does not equal the length of the remaining content."
.into(),
}
.into());
}
} else {
if supplied_content_length >= remaining_length {
return Err(ApiError::CaosInconsistentUploadLength {
detail: "Upload-Complete is set to false, and Content-Length is set, \
but the value of Content-Length is not smaller than the length of the remaining content."
.into(),
}
.into());
}
}
}
supplied_content_length
} else {
remaining_length.unwrap_or(u64::MAX)
};
let outcome = tokio::select! {
o = stream_to_file(
request_body.into_data_stream(),
&mut file,
remaining_length,
parameters.supplied_content_length,
parameters.supplied_upload_complete,
payload_length_limit
) => Some(o?),
_ = release_request_token.cancelled() => None
};
file.sync_all().await?;
let new_size = file.stream_position().await?;
acquired_upload.set_current_size(&mut tx, new_size).await?;
let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome {
parameters.supplied_upload_complete
} else {
false
};
if let Some(outcome) = outcome {
match outcome {
StreamToFileOutcome::StorageFull => Err(ApiError::CaosStagingAreaFull.into()),
StreamToFileOutcome::StoppedUnexpectedly => Err(ApiError::RequestBodyTooShort.into()),
StreamToFileOutcome::TooMuchContent => Err(ApiError::RequestBodyTooLong.into()),
StreamToFileOutcome::Success => {
if is_upload_complete {
upload_acquisition.complete().await?;
Ok(AppendToUploadResponse::UploadComplete)
} else {
Ok(AppendToUploadResponse::UploadIncomplete { offset: new_size })
}
}
}
} else {
Err(ApiError::CaosUploadRequestSuperseded.into())
}
}
#[derive(Debug)]
struct RequestParameters {
pub supplied_content_length: Option<u64>,
pub supplied_upload_length: Option<u64>,
pub supplied_upload_length: Option<NonZeroU64>,
pub supplied_upload_offset: u64,
pub supplied_upload_complete: bool,
}
async fn parse_request_parameters(headers: &HeaderMap) -> Result<RequestParameters, ApiError> {
fn parse_request_parameters(headers: &HeaderMap) -> Result<RequestParameters, ApiError> {
if !headers
.get_exactly_once(&axum::http::header::CONTENT_TYPE)?
.to_str()
@ -147,7 +245,7 @@ async fn parse_request_parameters(headers: &HeaderMap) -> Result<RequestParamete
let supplied_upload_length = headers
.get_at_most_once(&upload_headers::UPLOAD_OFFSET)?
.map(|v| v.get_unsigned_decimal_number(&upload_headers::UPLOAD_OFFSET))
.map(|v| v.get_positive_decimal_number(&upload_headers::UPLOAD_OFFSET))
.transpose()?;
let supplied_upload_offset = headers
@ -166,106 +264,6 @@ async fn parse_request_parameters(headers: &HeaderMap) -> Result<RequestParamete
})
}
async fn do_append(
upload_acquisition: &mut Acquisition<AcquiredUnfinishedUpload>,
parameters: RequestParameters,
content_stream: BodyDataStream,
) -> Result<AppendToUploadResponse, AppendToUploadError> {
let release_request_token = upload_acquisition.release_request_token();
let acquired_upload = upload_acquisition.inner();
let upload = acquired_upload.upload();
let mut file = acquired_upload.file().get_or_open(true).await?;
let total_size = upload.total_size();
let current_offset = file.stream_position().await?;
if current_offset < upload.current_size() {
log::error!("The uploads ({}) failed because the file contains less data than expected.", upload.id());
return Ok(AppendToUploadResponse::UploadFailed {
reason: UploadFailureReason::MissingData,
}
.into());
}
let remaining_content_length = total_size - current_offset;
if parameters.supplied_upload_offset != current_offset {
return Err(ApiError::CaosUploadOffsetMismatch {
expected: current_offset,
provided: parameters.supplied_upload_offset,
}
.into());
}
let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length {
if parameters.supplied_upload_complete {
if remaining_content_length != supplied_content_length {
return Err(ApiError::CaosInconsistentUploadLength {
expected: total_size,
detail: "Upload-Complete is set to true, and Content-Length is set, \
but the value of Content-Length does not equal the length of the remaining content."
.into(),
}
.into());
}
} else {
if supplied_content_length >= remaining_content_length {
return Err(ApiError::CaosInconsistentUploadLength {
expected: total_size,
detail: "Upload-Complete is set to false, and Content-Length is set, \
but the value of Content-Length is not smaller than the length of the remaining content."
.into(),
}
.into());
}
}
supplied_content_length
} else {
remaining_content_length
};
let outcome = tokio::select! {
o = stream_to_file(
content_stream,
&mut file,
remaining_content_length,
parameters.supplied_content_length,
parameters.supplied_upload_complete,
payload_length_limit
) => Some(o?),
_ = release_request_token.cancelled() => None
};
file.sync_all().await?;
let new_size = file.stream_position().await?;
acquired_upload.set_current_size(new_size).await?;
let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome {
parameters.supplied_upload_complete
} else {
false
};
if let Some(outcome) = outcome {
match outcome {
StreamToFileOutcome::StorageFull => Err(ApiError::CaosStagingAreaFull.into()),
StreamToFileOutcome::StoppedUnexpectedly => Err(ApiError::RequestBodyTooShort.into()),
StreamToFileOutcome::TooMuchContent => Err(ApiError::RequestBodyTooLong.into()),
StreamToFileOutcome::Success => {
if is_upload_complete {
Ok(AppendToUploadResponse::UploadComplete)
} else {
Ok(AppendToUploadResponse::UploadIncomplete { offset: new_size })
}
}
}
} else {
Err(ApiError::CaosUploadRequestSuperseded.into())
}
}
#[derive(Debug)]
pub enum StreamToFileOutcome {
StorageFull,
@ -277,7 +275,7 @@ pub enum StreamToFileOutcome {
async fn stream_to_file(
content_stream: BodyDataStream,
file: &mut File,
remaining_content_length: u64,
remaining_length: Option<u64>,
supplied_content_length: Option<u64>,
supplied_upload_complete: bool,
payload_length_limit: u64,
@ -294,7 +292,7 @@ async fn stream_to_file(
}
} else {
if supplied_upload_complete {
if n < remaining_content_length {
if n < remaining_length.unwrap_or(0) {
return Ok(StreamToFileOutcome::StoppedUnexpectedly);
}
}

View file

@ -3,14 +3,15 @@ use crate::http_api::api_error::ApiError;
use crate::http_api::auth::AppAuthorization;
use crate::http_api::headers::{CACHE_CONTROL_CACHE_FOREVER, CACHE_CONTROL_NEVER_CACHE, upload_headers};
use crate::http_api::uploads::append_to_upload::append_to_upload;
use crate::upload_manager::{UploadFailureReason, UploadId};
use crate::upload_manager::{UnfinishedUploadStage, UploadFailureReason, UploadId};
use crate::util::id::BucketId;
use axum::extract::{Path, State};
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::http::{HeaderMap, HeaderValue};
use axum::response::{IntoResponse, NoContent, Response};
use axum::{Json, Router, routing};
use color_eyre::Report;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
pub mod append_to_upload;
pub mod headers;
@ -32,6 +33,7 @@ pub fn create_uploads_router() -> Router<Context> {
.route("/", routing::post(create_upload))
.route("/{upload_id}", routing::get(get_upload_metadata).post(append_to_upload))
.route("/{upload_id}/accept", routing::post(accept_upload))
.route("/{upload_id}/complete", routing::post(complete_upload_directly))
}
async fn create_upload(
@ -66,13 +68,18 @@ struct GetUploadMetadataResponse {
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case", tag = "state")]
enum GetUploadMetadataResponseState {
// These three are unfinished
Created {
size_limit: u64,
},
Ongoing {
current_size: u64,
total_size: u64,
total_size: Option<NonZeroU64>,
},
Complete {
size: u64,
},
Finished {
hash: String,
size: u64,
@ -87,16 +94,26 @@ enum GetUploadMetadataResponseState {
impl IntoResponse for GetUploadMetadataResponse {
fn into_response(self) -> Response {
match self.state {
GetUploadMetadataResponseState::Ongoing { current_size, total_size } => (
GetUploadMetadataResponseState::Created { .. } => (
[
(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?0")),
(upload_headers::UPLOAD_OFFSET, HeaderValue::from(current_size)),
(upload_headers::UPLOAD_LENGTH, HeaderValue::from(total_size)),
(upload_headers::UPLOAD_OFFSET, HeaderValue::from(0)),
CACHE_CONTROL_NEVER_CACHE,
],
Json(self),
)
.into_response(),
GetUploadMetadataResponseState::Ongoing { current_size, total_size } => {
let mut headers = HeaderMap::new();
headers.insert(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?0"));
headers.insert(upload_headers::UPLOAD_OFFSET, HeaderValue::from(current_size));
if let Some(total_size) = total_size {
headers.insert(upload_headers::UPLOAD_LENGTH, HeaderValue::from(total_size.get()));
}
(headers, [CACHE_CONTROL_NEVER_CACHE], Json(self)).into_response()
}
GetUploadMetadataResponseState::Complete { size } => (
[
(upload_headers::UPLOAD_COMPLETE, HeaderValue::from_static("?1")),
@ -129,15 +146,16 @@ impl IntoResponse for GetUploadMetadataResponse {
async fn get_upload_metadata(
State(context): State<Context>,
Path(UploadPathParameters { upload_id }): Path<UploadPathParameters>,
) -> Result<impl IntoResponse, ApiError> {
) -> Result<GetUploadMetadataResponse, ApiError> {
let mut tx = context.database.begin().await.map_err(Into::<Report>::into)?;
let state: GetUploadMetadataResponseState = if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await {
if upload.is_complete() {
GetUploadMetadataResponseState::Complete { size: upload.total_size() }
let metadata = upload.metadata(&mut tx).await?;
if metadata.is_complete {
GetUploadMetadataResponseState::Complete { size: metadata.current_size }
} else {
GetUploadMetadataResponseState::Ongoing {
current_size: upload.current_size(),
total_size: upload.total_size(),
current_size: metadata.current_size,
total_size: metadata.total_size,
}
}
} else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? {
@ -172,10 +190,13 @@ async fn accept_upload(
State(context): State<Context>,
Path(UploadPathParameters { upload_id }): Path<UploadPathParameters>,
Json(payload): Json<AcceptUploadPayload>,
) -> Result<impl IntoResponse, ApiError> {
let mut tx = context.database.begin().await.map_err(Into::<Report>::into)?;
) -> Result<NoContent, ApiError> {
let mut tx = context.database.begin().await.map_err(Report::from)?;
let _hash = if let Some(_) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await {
return Err(ApiError::CaosUploadNotFinished);
return Err(ApiError::CaosWrongUploadStage {
expected: "finished".into(),
actual: "unfinished".into(),
});
} else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? {
return Err(ApiError::CaosUploadFailed { reason });
} else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? {
@ -198,7 +219,78 @@ async fn accept_upload(
}
context.upload_manager.accept_finished_upload(&mut tx, upload_id, payload.buckets).await?;
tx.commit().await.map_err(Into::<Report>::into)?;
tx.commit().await.map_err(Report::from)?;
Ok(())
Ok(NoContent)
}
#[derive(Debug, Deserialize)]
struct CompleteUploadDirectlyPayload {
existing_hash: String,
}
async fn complete_upload_directly(
State(context): State<Context>,
Path(UploadPathParameters { upload_id }): Path<UploadPathParameters>,
Json(payload): Json<CompleteUploadDirectlyPayload>,
) -> Result<impl IntoResponse, ApiError> {
let mut tx = context.database.begin().await.map_err(Report::from)?;
if let Some(upload) = context.upload_manager.get_unfinished_upload_by_id(&upload_id).await {
let upload_acquisition = if let Some(a) = upload.acquire().await {
a
} else {
return Err(ApiError::CaosUploadRequestSuperseded);
};
let metadata = upload.metadata(&mut tx).await?;
match metadata.stage() {
UnfinishedUploadStage::Created => {
let object_size = sqlx::query!("SELECT size FROM objects WHERE hash = ?", payload.existing_hash)
.map(|r| r.size as u64)
.fetch_optional(&mut *tx)
.await?;
if let Some(object_size) = object_size {
if let Some(max_size) = metadata.max_size {
if object_size <= max_size.get() {
upload_acquisition.complete_directly(&mut tx, &payload.existing_hash).await?;
Ok(())
} else {
Err(ApiError::CaosUploadSizeLimitExceeded {
limit: max_size.get(),
provided: object_size,
}
.into())
}
} else {
Ok(())
}
} else {
Err(ApiError::UnknownResource {
resource_type: "objects".into(),
id: payload.existing_hash.into(),
}
.into())
}
}
stage => Err(ApiError::CaosWrongUploadStage {
expected: "created".into(),
actual: stage.to_string().into(),
}),
}
} else if let Some(reason) = context.upload_manager.get_upload_failure_reason_by_id(&mut tx, &upload_id).await? {
Err(ApiError::CaosUploadFailed { reason })
} else if let Some(_) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? {
Err(ApiError::CaosWrongUploadStage {
expected: "created".into(),
actual: "finished".into(),
})
} else {
Err(ApiError::UnknownResource {
resource_type: "uploads".into(),
id: upload_id.to_string().into(),
}
.into())
}
}

View file

@ -1,4 +1,4 @@
use crate::upload_manager::AcquiredUnfinishedUpload;
use crate::upload_manager::{AcquiredUnfinishedUpload, UploadId};
use crate::util::hash_to_hex_string::HashExt;
use crate::util::temporal_formatting::TemporalFormatting;
use blake3::Hasher;
@ -10,22 +10,25 @@ use std::io::SeekFrom;
use temporal_rs::Now;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
pub async fn do_processing_work(
database: SqlitePool,
enable_multithreaded_hashing: bool,
staging_directory_path: Utf8PathBuf,
mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver<AcquiredUnfinishedUpload>,
mut tasks_receiver: UnboundedReceiver<AcquiredUnfinishedUpload>,
finished_upload_ids_sender: UnboundedSender<UploadId>,
) {
while let Some(mut acquired_upload) = tasks_receiver.recv().await {
match process(enable_multithreaded_hashing, &staging_directory_path, &mut acquired_upload).await {
Ok(outcome) => {
let mut tx = database.begin().await.unwrap();
let upload = acquired_upload.upload();
let metadata = upload.metadata(&mut tx).await.unwrap();
let id = upload.id().as_str();
let hash = outcome.hash.as_str();
let size = upload.total_size() as i64;
let size = metadata.current_size as i64;
let creation_date = Now::zoneddatetime_iso(None).unwrap().to_string_with_defaults().unwrap();
// This is all in a transaction, so doing this first is fine.
@ -66,6 +69,7 @@ pub async fn do_processing_work(
// This just removes the old link under the upload ID.
fs::remove_file(acquired_upload.file().path()).await.unwrap();
finished_upload_ids_sender.send(*upload.id()).unwrap();
log::info!("Successfully processed upload ({}): {:?}", id, outcome);
}
Err(report) => {

View file

@ -9,6 +9,7 @@ use fstr::FStr;
use serde::Serialize;
use sqlx::{SqlitePool, SqliteTransaction};
use std::fmt::Debug;
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use strum::{Display, EnumString};
@ -30,6 +31,7 @@ impl UploadManager {
pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result<Arc<Self>> {
log::info!("Loading uploads…");
let (finished_upload_ids_sender, mut finished_upload_ids_receiver) = tokio::sync::mpsc::unbounded_channel();
let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel();
let (large_file_processing_tasks_sender, large_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel();
@ -53,9 +55,6 @@ impl UploadManager {
let upload = Arc::new_cyclic(|upload| UnfinishedUpload {
manager: Arc::downgrade(&manager),
id,
total_size: row.total_size as u64,
current_size: AtomicU64::new(row.current_size as u64),
is_complete: AtomicBool::new(false),
acquirable: Acquirable::new(AcquiredUnfinishedUpload {
upload: upload.to_owned(),
file: FileReference::new(staging_file_path),
@ -75,6 +74,7 @@ impl UploadManager {
enable_multithreaded_hashing,
staging_directory_path.clone(),
small_file_processing_tasks_receiver,
finished_upload_ids_sender.clone(),
));
tokio::spawn(do_processing_work(
@ -82,8 +82,19 @@ impl UploadManager {
enable_multithreaded_hashing,
staging_directory_path,
large_file_processing_tasks_receiver,
finished_upload_ids_sender,
));
tokio::spawn({
let manager = Arc::clone(&manager);
async move {
while let Some(id) = finished_upload_ids_receiver.recv().await {
manager.unfinished_uploads.remove(&id);
}
}
});
Ok(manager)
}
@ -105,9 +116,6 @@ impl UploadManager {
let upload = Arc::new_cyclic(|upload| UnfinishedUpload {
manager: Arc::downgrade(&self),
id,
total_size,
current_size: AtomicU64::new(0),
is_complete: AtomicBool::new(false),
acquirable: Acquirable::new(AcquiredUnfinishedUpload {
upload: upload.to_owned(),
file: FileReference::new(self.staging_directory_path.join(id.as_str())),
@ -174,6 +182,7 @@ impl UploadManager {
}
}
#[derive(Debug)]
pub struct FinishedUploadMetadata {
pub hash: String,
pub size: u64,
@ -185,38 +194,70 @@ pub struct FinishedUploadMetadata {
pub struct UnfinishedUpload {
manager: Weak<UploadManager>,
id: UploadId,
total_size: u64,
current_size: AtomicU64,
is_complete: AtomicBool,
acquirable: Acquirable<AcquiredUnfinishedUpload>,
}
#[derive(Debug)]
pub struct AcquiredUnfinishedUpload {
upload: Weak<UnfinishedUpload>,
file: FileReference,
}
impl UnfinishedUpload {
pub fn id(&self) -> &UploadId {
&self.id
}
pub fn total_size(&self) -> u64 {
self.total_size
}
pub fn current_size(&self) -> u64 {
self.current_size.load(Ordering::Relaxed)
}
pub fn is_complete(&self) -> bool {
self.is_complete.load(Ordering::Relaxed)
pub async fn metadata(&self, tx: &mut SqliteTransaction<'_>) -> Result<UnfinishedUploadMetadata> {
let id = self.id.as_str();
Ok(sqlx::query!(
"SELECT current_size, total_size, max_size, is_complete FROM unfinished_uploads WHERE id = ?",
id
)
.map(|r| UnfinishedUploadMetadata {
current_size: r.current_size as u64,
total_size: r.total_size.map(|s| NonZeroU64::new(s as u64).unwrap()),
max_size: r.max_size.map(|s| NonZeroU64::new(s as u64).unwrap()),
is_complete: r.is_complete == 1,
})
.fetch_one(&mut **tx)
.await?)
}
pub async fn acquire(&self) -> Option<Acquisition<AcquiredUnfinishedUpload>> {
self.acquirable.acquire().await
}
pub async fn set_total_size(&self, tx: &mut SqliteTransaction<'_>, size: NonZeroU64) -> Result<()> {
let size = size.get() as i64;
let id = self.id.as_str();
sqlx::query!("UPDATE unfinished_uploads SET total_size = ? WHERE id = ?", size, id)
.execute(&mut **tx)
.await?;
Ok(())
}
}
#[derive(Debug)]
pub struct UnfinishedUploadMetadata {
pub current_size: u64,
pub total_size: Option<NonZeroU64>,
pub max_size: Option<NonZeroU64>,
pub is_complete: bool,
}
impl UnfinishedUploadMetadata {
pub fn stage(&self) -> UnfinishedUploadStage {
if self.is_complete {
UnfinishedUploadStage::Complete
} else if self.current_size == 0 {
UnfinishedUploadStage::Created
} else {
UnfinishedUploadStage::Ongoing
}
}
}
#[derive(Debug)]
pub struct AcquiredUnfinishedUpload {
upload: Weak<UnfinishedUpload>,
file: FileReference,
}
impl AcquiredUnfinishedUpload {
@ -228,17 +269,14 @@ impl AcquiredUnfinishedUpload {
&mut self.file
}
pub async fn set_current_size(&self, current_size: u64) -> Result<(), Report> {
pub async fn set_current_size(&self, tx: &mut SqliteTransaction<'_>, current_size: u64) -> Result<(), Report> {
let upload = self.upload.upgrade().unwrap();
let manager = upload.manager.upgrade().unwrap();
upload.current_size.store(current_size, Ordering::Relaxed);
let id = upload.id.to_string();
let current_size = current_size as i64;
sqlx::query!("UPDATE unfinished_uploads SET current_size = ? WHERE id = ?", current_size, id)
.execute(&manager.database)
.execute(&mut **tx)
.await?;
Ok(())
@ -246,21 +284,40 @@ impl AcquiredUnfinishedUpload {
}
impl Acquisition<AcquiredUnfinishedUpload> {
pub async fn complete(self) -> Result<(), Report> {
async fn consume(self) -> (AcquiredUnfinishedUpload, Arc<UnfinishedUpload>, Arc<UploadManager>) {
let inner = self.destroy().await;
let upload = inner.upload.upgrade().unwrap();
let manager = upload.manager.upgrade().unwrap();
upload.is_complete.store(true, Ordering::Relaxed);
manager.unfinished_uploads.remove(upload.id());
(inner, upload, manager)
}
let id = upload.id.to_string();
sqlx::query!("UPDATE unfinished_uploads SET is_complete = 1 WHERE id = ?", id)
.execute(&manager.database)
pub async fn complete_directly(self, tx: &mut SqliteTransaction<'_>, existing_hash: &str) -> Result<(), Report> {
let (_, upload, _) = self.consume().await;
let id = upload.id().to_string();
sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut **tx).await?;
sqlx::query!("INSERT INTO finished_uploads (id, hash) VALUES (?, ?)", id, existing_hash)
.execute(&mut **tx)
.await?;
if upload.total_size <= LARGE_FILE_SIZE_THRESHOLD {
manager.small_file_processing_tasks_sender.send(inner).unwrap()
Ok(())
}
pub async fn complete(self) -> Result<(), Report> {
let (acquired_upload, upload, manager) = self.consume().await;
let id = upload.id.to_string();
let size = sqlx::query!("UPDATE unfinished_uploads SET is_complete = 1 WHERE id = ? RETURNING current_size", id)
.map(|r| r.current_size as u64)
.fetch_one(&manager.database)
.await?;
if size <= LARGE_FILE_SIZE_THRESHOLD {
manager.small_file_processing_tasks_sender.send(acquired_upload).unwrap()
} else {
manager.large_file_processing_tasks_sender.send(inner).unwrap()
manager.large_file_processing_tasks_sender.send(acquired_upload).unwrap()
}
Ok(())
@ -289,6 +346,15 @@ impl Acquisition<AcquiredUnfinishedUpload> {
}
}
#[derive(Debug, Clone, Copy, Display, Serialize)]
#[strum(serialize_all = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum UnfinishedUploadStage {
Created,
Ongoing,
Complete,
}
#[derive(Debug, Clone, Copy, EnumString, Display, Serialize)]
#[strum(serialize_all = "snake_case")]
#[serde(rename_all = "snake_case")]

View file

@ -34,7 +34,7 @@ impl<T: 'static + Sync + Send> Acquirable<T> {
(
Outcome::Acquired(Acquisition {
inner,
inner: Some(inner),
acquirable_state: Arc::clone(&self.state),
release_request_token: release_request_token.clone(),
}),
@ -68,7 +68,7 @@ impl<T: 'static + Sync + Send> Acquirable<T> {
match data {
Ok((data, release_request_token)) => Some(Acquisition {
inner: data,
inner: Some(data),
acquirable_state: Arc::clone(&self.state),
release_request_token,
}),
@ -94,51 +94,60 @@ pub enum AcquirableState<T: 'static + Sync + Send> {
#[must_use]
pub struct Acquisition<T: 'static + Sync + Send> {
inner: T,
inner: Option<T>, // Only set to None when dropped or destroyed
acquirable_state: Arc<Mutex<AcquirableState<T>>>,
release_request_token: CancellationToken,
}
impl<T: 'static + Sync + Send> Acquisition<T> {
pub fn inner(&mut self) -> &mut T {
&mut self.inner
// SAFETY: inner is only None when dropped or destroyed (and then dropped)
unsafe { self.inner.as_mut().unwrap_unchecked() }
}
pub fn release_request_token(&self) -> CancellationToken {
self.release_request_token.clone()
}
pub async fn release(self) {
let mut state = self.acquirable_state.lock().await;
replace_with_or_abort(&mut *state, |state| match state {
AcquirableState::Acquired {
data_return_channel_sender, ..
} => {
let release_request_token = CancellationToken::new();
match data_return_channel_sender.send((self.inner, release_request_token.clone())) {
Ok(_) => {
let (data_return_channel_sender, data_return_channel_receiver) = tokio::sync::oneshot::channel();
drop(data_return_channel_receiver);
AcquirableState::Acquired {
release_request_token,
data_return_channel_sender,
}
}
Err((data, _)) => AcquirableState::Available { inner: data },
}
}
_ => unreachable!(),
});
}
/// Consume the acquisition without releasing it. The corresponding Acquirable will forever stay in the acquired state.
///
/// All outstanding calls to Acquirable::acquire will return None.
pub async fn destroy(self) -> T {
pub async fn destroy(mut self) -> T {
let mut state = self.acquirable_state.lock().await;
*state = AcquirableState::Destroyed;
self.inner
unsafe { self.inner.take().unwrap_unchecked() }
}
}
impl<T: 'static + Sync + Send> Drop for Acquisition<T> {
fn drop(&mut self) {
let state = Arc::clone(&self.acquirable_state);
if let Some(inner) = self.inner.take() {
tokio::spawn(async move {
let mut state = state.lock().await;
replace_with_or_abort(&mut *state, |state| match state {
AcquirableState::Acquired {
data_return_channel_sender, ..
} => {
let release_request_token = CancellationToken::new();
match data_return_channel_sender.send((inner, release_request_token.clone())) {
Ok(_) => {
let (data_return_channel_sender, data_return_channel_receiver) = tokio::sync::oneshot::channel();
drop(data_return_channel_receiver);
AcquirableState::Acquired {
release_request_token,
data_return_channel_sender,
}
}
Err((data, _)) => AcquirableState::Available { inner: data },
}
}
_ => unreachable!(),
});
});
}
}
}