use crate::http_api::Context; use crate::http_api::api_error::{ApiError, ProblemJson}; use crate::http_api::headers::{HeaderMapExt, HeaderValueExt, upload_headers}; use crate::http_api::upload::{PARTIAL_UPLOAD_MEDIA_TYPE, UploadCompleteResponseHeader, UploadOffsetResponseHeader}; use crate::upload_manager::{AnyStageUpload, FileReference, UnfinishedUpload, UploadFailureReason, UploadId, UploadManager}; use crate::util::acquirable::Acquisition; use axum::Json; use axum::body::{Body, BodyDataStream}; use axum::extract::{Path, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use color_eyre::Report; use futures::TryStreamExt; use serde::Deserialize; use serde_json::json; use std::borrow::Cow; use std::io::ErrorKind; use std::sync::Arc; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio_util::io::StreamReader; #[derive(Debug, Deserialize)] pub(super) struct AppendToUploadPathParameters { upload_id: UploadId, } #[derive(Debug)] enum AppendToUploadOutcome { RequestSuperseded, UploadAlreadyComplete, Failed(UploadFailureReason), UploadOffsetMismatch { expected: u64, provided: u64 }, InconsistentUploadLength { expected: u64, detail: Cow<'static, str> }, ContentStreamStoppedUnexpectedly, TooMuchContent, UploadIncomplete { offset: u64 }, UploadComplete, } impl IntoResponse for AppendToUploadOutcome { fn into_response(self) -> Response { match self { AppendToUploadOutcome::RequestSuperseded => ( StatusCode::CONFLICT, UploadCompleteResponseHeader(false), ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/request-superseded", "title": "Another request superseded the current request.", })), ) .into_response(), AppendToUploadOutcome::UploadAlreadyComplete => ( StatusCode::CONFLICT, ProblemJson(json!({ "type": "https://iana.org/assignments/http-problem-types#completed-upload", "title": "The upload is already complete.", })), ) .into_response(), AppendToUploadOutcome::Failed(reason) => ( StatusCode::GONE, ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/request-superseded", "title": "The upload was cancelled or failed.", "reason": reason.to_string() })), ) .into_response(), AppendToUploadOutcome::UploadOffsetMismatch { expected, provided } => ( StatusCode::CONFLICT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(expected), ProblemJson(json!({ "type": "https://iana.org/assignments/http-problem-types#mismatching-upload-offset", "title": "The upload offset provided in the request does not match the actual offset of the resource.", "expected-offset": expected, "provided-offset": provided, })), ) .into_response(), AppendToUploadOutcome::InconsistentUploadLength { expected, detail } => ( StatusCode::CONFLICT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(expected), ProblemJson(json!({ "type": "https://iana.org/assignments/http-problem-types#inconsistent-upload-length", "title": "The provided upload lengths are inconsistent with one another or a previously established total length.", "detail": detail, })), ) .into_response(), AppendToUploadOutcome::ContentStreamStoppedUnexpectedly => ( StatusCode::BAD_REQUEST, UploadCompleteResponseHeader(false), ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/content-stream-stopped-unexpectedly", "title": "The content stream stopped unexpectedly.", })), ) .into_response(), AppendToUploadOutcome::TooMuchContent => ( StatusCode::BAD_REQUEST, UploadCompleteResponseHeader(false), ProblemJson(json!({ "type": "https://minna.media/api-problems/caos/too-much-content", "title": "The request contained more content than it should.", })), ) .into_response(), AppendToUploadOutcome::UploadIncomplete { offset } => ( StatusCode::NO_CONTENT, UploadCompleteResponseHeader(false), UploadOffsetResponseHeader(offset), Body::empty(), ) .into_response(), AppendToUploadOutcome::UploadComplete => (StatusCode::NO_CONTENT, UploadCompleteResponseHeader(true), Body::empty()).into_response(), } } } pub(super) async fn append_to_upload( State(context): State, Path(AppendToUploadPathParameters { upload_id }): Path, headers: HeaderMap, request_body: Body, ) -> Result { let parameters = match parse_request_parameters(&context.upload_manager, upload_id, &headers) .await .map_err(|e| (UploadCompleteResponseHeader(false), e))? { Ok(p) => p, Err(o) => return Ok(o), }; { let state = parameters.upload.state().read().await; if state.is_complete() { return Ok(AppendToUploadOutcome::UploadAlreadyComplete); } } let mut file_acquisition = if let Some(a) = parameters.upload.acquire_file().await { a } else { return Ok(AppendToUploadOutcome::RequestSuperseded); }; let outcome = do_append(&mut file_acquisition, parameters, request_body.into_data_stream()) .await .map_err(|report| { ( UploadCompleteResponseHeader(false), ApiError::Internal { report: report.wrap_err(format!("Error during file upload ({upload_id})")), }, ) })?; file_acquisition.release().await; Ok(outcome) } struct RequestParameters { pub upload: Arc, pub supplied_content_length: Option, pub supplied_upload_length: Option, pub supplied_upload_offset: u64, pub supplied_upload_complete: bool, } async fn parse_request_parameters( upload_manager: &UploadManager, upload_id: UploadId, headers: &HeaderMap, ) -> Result, ApiError> { let upload = if let Some(upload) = upload_manager.get_upload_by_id(&upload_id).await? { match upload { AnyStageUpload::Unfinished(u) => u, AnyStageUpload::Finished { .. } => return Ok(Err(AppendToUploadOutcome::UploadAlreadyComplete)), AnyStageUpload::Failed(reason) => return Ok(Err(AppendToUploadOutcome::Failed(reason))), } } else { return Err(ApiError::UnknownResource { resource_type: "upload".into(), id: upload_id.to_string().into(), }); }; if !headers .get_exactly_once(&axum::http::header::CONTENT_TYPE)? .to_str() .ok() .map(|v| v == PARTIAL_UPLOAD_MEDIA_TYPE) .unwrap_or(false) { return Err(ApiError::InvalidRequestHeader { name: axum::http::header::CONTENT_TYPE, message: format!("must be {}", PARTIAL_UPLOAD_MEDIA_TYPE.to_string()).into(), }); } let supplied_content_length = headers .get_at_most_once(&axum::http::header::CONTENT_LENGTH)? .map(|v| v.get_unsigned_decimal_number(&axum::http::header::CONTENT_LENGTH)) .transpose()?; let supplied_upload_length = headers .get_at_most_once(&upload_headers::UPLOAD_OFFSET)? .map(|v| v.get_unsigned_decimal_number(&upload_headers::UPLOAD_OFFSET)) .transpose()?; let supplied_upload_offset = headers .get_exactly_once(&upload_headers::UPLOAD_OFFSET)? .get_unsigned_decimal_number(&upload_headers::UPLOAD_OFFSET)?; let supplied_upload_complete = headers .get_exactly_once(&upload_headers::UPLOAD_COMPLETE)? .get_boolean(&upload_headers::UPLOAD_COMPLETE)?; Ok(Ok(RequestParameters { upload, supplied_content_length, supplied_upload_length, supplied_upload_offset, supplied_upload_complete, })) } async fn do_append( file_acquisition: &mut Acquisition, parameters: RequestParameters, content_stream: BodyDataStream, ) -> Result { let mut upload_state = parameters.upload.state().write().await; let release_request_token = file_acquisition.release_request_token(); let mut file = file_acquisition.inner().get_or_open(true).await?; let total_size = parameters.upload.total_size(); let current_offset = file.stream_position().await?; if current_offset < upload_state.current_size() { log::error!( "The upload ({}) failed because the file contains less data than expected.", parameters.upload.id() ); parameters.upload.fail(UploadFailureReason::MissingData).await?; return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData)); } let remaining_content_length = total_size - current_offset; if parameters.supplied_upload_offset != current_offset { return Ok(AppendToUploadOutcome::UploadOffsetMismatch { expected: current_offset, provided: parameters.supplied_upload_offset, }); } let payload_length_limit = if let Some(supplied_content_length) = parameters.supplied_content_length { if parameters.supplied_upload_complete { if remaining_content_length != supplied_content_length { return Ok(AppendToUploadOutcome::InconsistentUploadLength { expected: total_size, detail: "Upload-Complete is set to true, and Content-Length is set, \ but the value of Content-Length does not equal the length of the remaining content." .into(), }); } } else { if supplied_content_length >= remaining_content_length { return Ok(AppendToUploadOutcome::InconsistentUploadLength { expected: total_size, detail: "Upload-Complete is set to false, and Content-Length is set, \ but the value of Content-Length is not smaller than the length of the remaining content." .into(), }); } } supplied_content_length } else { remaining_content_length }; let outcome = tokio::select! { o = stream_to_file( content_stream, &mut file, remaining_content_length, parameters.supplied_content_length, parameters.supplied_upload_complete, payload_length_limit ) => Some(o?), _ = release_request_token.cancelled() => None }; file.sync_all().await?; let new_size = file.stream_position().await?; upload_state.set_current_size(new_size); let is_upload_complete = if let Some(StreamToFileOutcome::Success) = outcome { parameters.supplied_upload_complete } else { false }; if is_upload_complete { upload_state.set_complete(); parameters.upload.save_to_database(&upload_state).await?; parameters.upload.enqueue_for_processing(&upload_state).await; } else { parameters.upload.save_to_database(&upload_state).await?; } Ok(if let Some(outcome) = outcome { match outcome { StreamToFileOutcome::StoppedUnexpectedly => AppendToUploadOutcome::ContentStreamStoppedUnexpectedly, StreamToFileOutcome::TooMuchContent => AppendToUploadOutcome::TooMuchContent, StreamToFileOutcome::Success => { if is_upload_complete { AppendToUploadOutcome::UploadComplete } else { AppendToUploadOutcome::UploadIncomplete { offset: new_size } } } } } else { AppendToUploadOutcome::RequestSuperseded }) } #[derive(Debug)] pub enum StreamToFileOutcome { StoppedUnexpectedly, TooMuchContent, Success, } async fn stream_to_file( content_stream: BodyDataStream, file: &mut File, remaining_content_length: u64, supplied_content_length: Option, supplied_upload_complete: bool, payload_length_limit: u64, ) -> Result { let body_with_io_error = content_stream.into_stream().map_err(|err| std::io::Error::new(ErrorKind::Other, err)); let stream = StreamReader::new(body_with_io_error).take(payload_length_limit + 1); futures::pin_mut!(stream); match tokio::io::copy(&mut stream, file).await { Ok(n) => { if let Some(supplied_content_length) = supplied_content_length { if n < supplied_content_length { return Ok(StreamToFileOutcome::StoppedUnexpectedly); } } else { if supplied_upload_complete { if n < remaining_content_length { return Ok(StreamToFileOutcome::StoppedUnexpectedly); } } } if n > payload_length_limit { return Ok(StreamToFileOutcome::TooMuchContent); } Ok(StreamToFileOutcome::Success) } Err(error) => match error.kind() { ErrorKind::TimedOut => Ok(StreamToFileOutcome::StoppedUnexpectedly), ErrorKind::BrokenPipe => Ok(StreamToFileOutcome::StoppedUnexpectedly), ErrorKind::ConnectionReset => Ok(StreamToFileOutcome::StoppedUnexpectedly), _ => Err(error), }, } }