WIP: v1.0.0

This commit is contained in:
Moritz Ruth 2025-04-20 00:15:05 +02:00
parent 8c54da772f
commit 0ed22f9bf6
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
10 changed files with 190 additions and 44 deletions

96
Cargo.lock generated
View file

@ -157,6 +157,7 @@ dependencies = [
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@ -176,6 +177,30 @@ dependencies = [
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-extra"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45bf463831f5131b7d3c756525b305d40f1185b688565648a92e1392ca35713d"
dependencies = [
"axum",
"axum-core",
"bytes",
"futures-util",
"headers",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"serde",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
@ -189,6 +214,21 @@ dependencies = [
"syn",
]
[[package]]
name = "axum-range"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8b09d24c2cfcf6596afc4b9d139ad62c53637c7e0f791ef8a25ce1cc431f73a"
dependencies = [
"axum",
"axum-extra",
"bytes",
"futures",
"http-body",
"pin-project",
"tokio",
]
[[package]]
name = "backtrace"
version = "0.3.71"
@ -204,6 +244,12 @@ dependencies = [
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.1"
@ -859,6 +905,30 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "headers"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9"
dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.5.0"
@ -1441,6 +1511,8 @@ name = "minna-caos"
version = "0.1.0"
dependencies = [
"axum",
"axum-extra",
"axum-range",
"blake3",
"camino",
"color-eyre",
@ -1636,6 +1708,26 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
@ -2295,7 +2387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233"
dependencies = [
"atoi",
"base64",
"base64 0.22.1",
"bitflags",
"byteorder",
"bytes",
@ -2337,7 +2429,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613"
dependencies = [
"atoi",
"base64",
"base64 0.22.1",
"bitflags",
"byteorder",
"crc",

View file

@ -8,6 +8,8 @@ opt-level = 3
[dependencies]
axum = { version = "0.8.3", default-features = false, features = ["json", "http1", "tokio", "macros"] }
axum-extra = { version = "0.10.1", features = ["typed-header"] }
axum-range = "0.5.0"
blake3 = { version = "1.8.1", features = ["rayon", "mmap"] }
camino = { version = "1.1.9", features = ["serde1"] }
color-eyre = "0.6.3"

View file

@ -31,6 +31,7 @@ application.
- CLI for administration tasks
## Notes
### Example file upload flow
@ -74,6 +75,14 @@ application.
Even while this process is still running, the object data is already accessible at `/objects/OBJECT_HASH`.
### Filesystem requirements
minna-caos uses the local filesystem as a staging area for uploads. (This is the special `staging` bucket seen in the example above.)
The filesystem containing the staging directory must…
- support hardlinks
- have enough storage space available to fit all uploads until they are uploaded to actual buckets
## Roadmap

View file

@ -50,6 +50,7 @@ pub enum ApiError {
CaosUploadNotFinished,
CaosUnknownBucket { bucket_id: Cow<'static, str> },
CaosNoReplicaAvailable,
CaosStagingAreaFull,
}
impl From<Report> for ApiError {
@ -192,7 +193,7 @@ impl IntoResponse for ApiError {
.into_response(),
ApiError::CaosNoReplicaAvailable => (
// This error should never occur in normal operation, because deleted objects are not only removed from `object_replicas` but also from `objects`.
// Therefore, this status code is appropriate.
// Thats why this status code is appropriate.
StatusCode::INTERNAL_SERVER_ERROR,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/no-replica-available",
@ -200,6 +201,14 @@ impl IntoResponse for ApiError {
})),
)
.into_response(),
ApiError::CaosStagingAreaFull => (
StatusCode::INSUFFICIENT_STORAGE,
ProblemJson(json!({
"type": "https://minna.media/api-problems/caos/staging-area-full",
"title": "The upload staging area is temporarily full.",
})),
)
.into_response(),
}
}
}

View file

@ -1,15 +1,15 @@
use crate::config::{ConfigBucketBackend, ConfigBucketBackendFilesystem};
use crate::config::ConfigBucketBackend;
use crate::http_api::Context;
use crate::http_api::api_error::ApiError;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::response::IntoResponse;
use axum::{Router, routing};
use axum_extra::{TypedHeader, headers};
use axum_range::{KnownSize, Ranged};
use color_eyre::eyre::eyre;
use color_eyre::{Report, Result};
use serde::Deserialize;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
pub fn create_objects_router() -> Router<Context> {
Router::new().route("/{hash}", routing::get(get_object_content))
@ -23,12 +23,14 @@ struct ObjectPathParameters {
async fn get_object_content(
State(context): State<Context>,
Path(ObjectPathParameters { hash }): Path<ObjectPathParameters>,
range: Option<TypedHeader<headers::Range>>,
) -> Result<impl IntoResponse, ApiError> {
let mut tx = context.database.begin().await.map_err(Report::from)?;
let object_size = sqlx::query!("SELECT size FROM objects").map(|r| r.size).fetch_optional(&mut *tx).await?;
if let Some(object_size) = object_size {
let object_size = object_size as u64;
let bucket_ids = sqlx::query!("SELECT bucket_id FROM object_replicas WHERE hash = ? AND is_present = 1", hash)
.map(|r| r.bucket_id)
.fetch_all(&mut *tx)
@ -37,7 +39,10 @@ async fn get_object_content(
if let Some(bucket_id) = bucket_ids.first() {
if let Some(bucket_config) = context.config.buckets.get(bucket_id) {
match &bucket_config.backend {
ConfigBucketBackend::Filesystem(filesystem_backend_config) => Ok(get_filesystem_backend_response(&hash, filesystem_backend_config).await?),
ConfigBucketBackend::Filesystem(filesystem_backend_config) => {
let file = File::open(filesystem_backend_config.path.join(hash)).await?;
Ok(Ranged::new(range.map(|h| h.0), KnownSize::sized(file, object_size)))
}
}
} else {
Err(ApiError::Internal {
@ -54,9 +59,3 @@ async fn get_object_content(
})
}
}
async fn get_filesystem_backend_response(hash: &str, filesystem_backend_config: &ConfigBucketBackendFilesystem) -> Result<impl IntoResponse + use<>> {
let file = File::open(filesystem_backend_config.path.join(hash)).await?;
let stream = ReaderStream::new(file);
Ok(Body::from_stream(stream))
}

View file

@ -250,6 +250,7 @@ async fn do_append(
if let Some(outcome) = outcome {
match outcome {
StreamToFileOutcome::StorageFull => Err(ApiError::CaosStagingAreaFull.into()),
StreamToFileOutcome::StoppedUnexpectedly => Err(ApiError::RequestBodyTooShort.into()),
StreamToFileOutcome::TooMuchContent => Err(ApiError::RequestBodyTooLong.into()),
StreamToFileOutcome::Success => {
@ -267,6 +268,7 @@ async fn do_append(
#[derive(Debug)]
pub enum StreamToFileOutcome {
StorageFull,
StoppedUnexpectedly,
TooMuchContent,
Success,
@ -308,6 +310,7 @@ async fn stream_to_file(
ErrorKind::TimedOut => Ok(StreamToFileOutcome::StoppedUnexpectedly),
ErrorKind::BrokenPipe => Ok(StreamToFileOutcome::StoppedUnexpectedly),
ErrorKind::ConnectionReset => Ok(StreamToFileOutcome::StoppedUnexpectedly),
ErrorKind::StorageFull | ErrorKind::QuotaExceeded => Ok(StreamToFileOutcome::StorageFull),
_ => Err(error),
},
}

View file

@ -1,5 +1,3 @@
extern crate core;
mod config;
mod http_api;
mod processing_worker;
@ -34,7 +32,7 @@ async fn main() -> Result<()> {
.await
.wrap_err("Failed to open the database connection.")?;
let upload_manager = UploadManager::create(database.clone(), &config.staging_directory, config.enable_multithreaded_hashing).await?;
let upload_manager = UploadManager::create(database.clone(), config.staging_directory.clone(), config.enable_multithreaded_hashing).await?;
log::info!("Initialization successful.");

View file

@ -2,20 +2,23 @@ use crate::upload_manager::AcquiredUnfinishedUpload;
use crate::util::hash_to_hex_string::HashExt;
use crate::util::temporal_formatting::TemporalFormatting;
use blake3::Hasher;
use camino::{Utf8Path, Utf8PathBuf};
use file_type::FileType;
use fstr::FStr;
use sqlx::SqlitePool;
use std::io::SeekFrom;
use temporal_rs::Now;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub async fn do_processing_work(
mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver<AcquiredUnfinishedUpload>,
database: SqlitePool,
enable_multithreaded_hashing: bool,
staging_directory_path: Utf8PathBuf,
mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver<AcquiredUnfinishedUpload>,
) {
while let Some(mut acquired_upload) = tasks_receiver.recv().await {
match process(enable_multithreaded_hashing, &mut acquired_upload).await {
match process(enable_multithreaded_hashing, &staging_directory_path, &mut acquired_upload).await {
Ok(outcome) => {
let mut tx = database.begin().await.unwrap();
let upload = acquired_upload.upload();
@ -25,11 +28,16 @@ pub async fn do_processing_work(
let size = upload.total_size() as i64;
let creation_date = Now::zoneddatetime_iso(None).unwrap().to_string_with_defaults().unwrap();
// This is all in a transaction, so doing this first is fine.
sqlx::query!("INSERT INTO finished_uploads (id, hash) VALUES (?, ?)", id, hash)
.execute(&mut *tx)
.await
.unwrap();
sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await.unwrap();
// TODO: Handle the case of there already being an object with this hash
sqlx::query!(
"INSERT INTO objects (hash, size, media_type, creation_date) VALUES (?, ?, ?, ?)",
let affected_rows_count = sqlx::query!(
"INSERT OR IGNORE INTO objects (hash, size, media_type, creation_date) VALUES (?, ?, ?, ?)",
hash,
size,
outcome.media_type,
@ -37,21 +45,28 @@ pub async fn do_processing_work(
)
.execute(&mut *tx)
.await
.unwrap()
.rows_affected();
let _is_new = affected_rows_count == 1;
// If there is already a row for this object in the `staging` bucket, it is replaced.
// This effectively means that `is_present` is always set to `true` for this `(hash, bucket_id)` tuple.
sqlx::query!(
"INSERT OR REPLACE INTO object_replicas (hash, is_present, bucket_id) VALUES (?, true, 'staging')",
hash,
)
.execute(&mut *tx)
.await
.unwrap();
sqlx::query!("INSERT INTO object_replicas (hash, is_present, bucket_id) VALUES (?, true, 'staging')", hash,)
.execute(&mut *tx)
.await
.unwrap();
sqlx::query!("INSERT INTO finished_uploads (id, hash) VALUES (?, ?)", id, hash)
.execute(&mut *tx)
.await
.unwrap();
tx.commit().await.unwrap();
log::debug!("Successfully processed uploads ({}): {:?}", id, outcome);
// The same file is also hardlinked under the hash of the object.
// This just removes the old link under the upload ID.
fs::remove_file(acquired_upload.file().path()).await.unwrap();
log::info!("Successfully processed upload ({}): {:?}", id, outcome);
}
Err(report) => {
let upload = acquired_upload.upload();
@ -67,7 +82,11 @@ struct ProcessingOutcome {
media_type: &'static str,
}
async fn process(enable_multithreaded_hashing: bool, acquired_upload: &mut AcquiredUnfinishedUpload) -> Result<ProcessingOutcome, std::io::Error> {
async fn process(
enable_multithreaded_hashing: bool,
staging_directory_path: &Utf8Path,
acquired_upload: &mut AcquiredUnfinishedUpload,
) -> Result<ProcessingOutcome, std::io::Error> {
let file_reference = acquired_upload.file();
let path = file_reference.path().to_owned();
let file = file_reference.get_or_open(false).await?;
@ -77,20 +96,27 @@ async fn process(enable_multithreaded_hashing: bool, acquired_upload: &mut Acqui
let mut file_content_head = Vec::with_capacity(HEAD_LENGTH);
file.take(HEAD_LENGTH as u64).read_to_end(&mut file_content_head).await?;
let mut hasher = Hasher::new();
let hash = tokio::task::spawn_blocking(move || -> Result<_, std::io::Error> {
if enable_multithreaded_hashing {
hasher.update_mmap_rayon(path)?;
} else {
hasher.update_mmap(path)?;
}
let hash = tokio::task::spawn_blocking({
let path = path.clone();
Ok(hasher.finalize().to_hex_fstr())
move || -> Result<_, std::io::Error> {
let mut hasher = Hasher::new();
if enable_multithreaded_hashing {
hasher.update_mmap_rayon(path)?;
} else {
hasher.update_mmap(path)?;
}
Ok(hasher.finalize().to_hex_fstr())
}
})
.await??;
let file_type = FileType::from_bytes(file_content_head);
let media_type = file_type.media_types().first().map(|s| *s).unwrap_or("application/octet-stream");
// The original link is removed after the database transaction was successfully committed.
fs::hard_link(&path, staging_directory_path.join(hash.as_str())).await?;
Ok(ProcessingOutcome { hash, media_type })
}

View file

@ -27,7 +27,7 @@ pub struct UploadManager {
}
impl UploadManager {
pub async fn create(database: SqlitePool, staging_directory_path: &Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result<Arc<Self>> {
pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result<Arc<Self>> {
log::info!("Loading uploads…");
let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel();
@ -71,11 +71,18 @@ impl UploadManager {
log::info!("Starting uploads processing…");
tokio::spawn(do_processing_work(
small_file_processing_tasks_receiver,
database.clone(),
enable_multithreaded_hashing,
staging_directory_path.clone(),
small_file_processing_tasks_receiver,
));
tokio::spawn(do_processing_work(
database,
enable_multithreaded_hashing,
staging_directory_path,
large_file_processing_tasks_receiver,
));
tokio::spawn(do_processing_work(large_file_processing_tasks_receiver, database, enable_multithreaded_hashing));
Ok(manager)
}

View file

@ -7,6 +7,7 @@ pub struct FileReference {
file: Option<File>,
}
#[allow(unused)]
impl FileReference {
pub fn new(path: Utf8PathBuf) -> FileReference {
FileReference { path, file: None }