From 8c54da772ff4e9460d6c99b10d245629563fcc9e Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Sat, 19 Apr 2025 22:44:44 +0200 Subject: [PATCH] WIP: v1.0.0 --- Cargo.lock | 2 +- Cargo.toml | 40 ++++++++++----------- README.md | 70 ++++++++++++++++++++++++++++--------- src/http_api/mod.rs | 7 ++-- src/http_api/uploads/mod.rs | 30 +++++++++++----- src/main.rs | 2 +- src/processing_worker.rs | 1 + src/upload_manager.rs | 28 +++++++++++---- 8 files changed, 123 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index baa037d..a05ea00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1437,7 +1437,7 @@ dependencies = [ ] [[package]] -name = "minna_caos" +name = "minna-caos" version = "0.1.0" dependencies = [ "axum", diff --git a/Cargo.toml b/Cargo.toml index 80a105f..983ccf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "minna_caos" +name = "minna-caos" version = "0.1.0" edition = "2024" @@ -7,27 +7,27 @@ edition = "2024" opt-level = 3 [dependencies] -sqlx = { version = "0.8.3", features = ["tls-rustls-ring-native-roots", "sqlite", "runtime-tokio"] } axum = { version = "0.8.3", default-features = false, features = ["json", "http1", "tokio", "macros"] } -tokio = { version = "1.44.1", features = ["rt-multi-thread", "macros", "parking_lot"] } +blake3 = { version = "1.8.1", features = ["rayon", "mmap"] } +camino = { version = "1.1.9", features = ["serde1"] } color-eyre = "0.6.3" -log = "0.4.26" +constant_time_eq = "0.4.2" +dashmap = "7.0.0-rc2" env_logger = "0.11.7" figment = { version = "0.10.19", features = ["env", "toml", "parking_lot"] } -serde = { version = "1.0.219", features = ["derive"] } -validator = { version = "0.20.0", features = ["derive"] } -once_cell = "1.21.1" -regex = "1.11.1" -serde_json = "1.0.140" -constant_time_eq = "0.4.2" -fstr = { version = "0.2.13", features = ["serde"] } -camino = { version = "1.1.9", features = ["serde1"] } -dashmap = "7.0.0-rc2" -tokio-util = { version = "0.7.14", features = ["io"] } -replace_with = "0.1.7" -rand = "0.9.0" -futures = "0.3.31" -strum = { version = "0.27.1", features = ["derive"] } -blake3 = { version = "1.8.1", features = ["rayon", "mmap"] } file_type = { version = "0.8.3", default-features = false, features = ["std", "wikidata"] } -temporal_rs = "0.0.6" \ No newline at end of file +fstr = { version = "0.2.13", features = ["serde"] } +futures = "0.3.31" +log = "0.4.26" +once_cell = "1.21.1" +rand = "0.9.0" +regex = "1.11.1" +replace_with = "0.1.7" +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.140" +sqlx = { version = "0.8.3", features = ["tls-rustls-ring-native-roots", "sqlite", "runtime-tokio"] } +strum = { version = "0.27.1", features = ["derive"] } +temporal_rs = "0.0.6" +tokio = { version = "1.44.1", features = ["rt-multi-thread", "macros", "parking_lot"] } +tokio-util = { version = "0.7.14", features = ["io"] } +validator = { version = "0.20.0", features = ["derive"] } \ No newline at end of file diff --git a/README.md b/README.md index 1e5c43b..330e0e6 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,10 @@ -# minna-caos +# [WIP] minna-caos > Content-Addressed Object Storage server intended for usage with Minna. **minna-caos** was created because the developers found themselves writing similar storage backends in every project, again and again. -It is intended to be a companion to a single application (“the app”) that has full authority. **You should imagine it as a library**, not a stand-alone + +It is intended as a companion to a single application (“the app”) that has full authority. **You should imagine it as a library**, not a stand-alone application. @@ -22,32 +23,69 @@ application. - direct download from the underlying storage backend (if supported) -- named storage buckets - -- object operations: - - upload - - set target buckets - - delete +- named storage buckets (backed by a single storage backend each) - deletion tombstones -- administration CLI +- SQLite database -## Upload steps +- CLI for administration tasks -- client to app: request to upload something, returns `{upload_id}` - - app to caos: `POST /uploads` returns `{upload_ìd}` +## Notes -- client to caos: `PATCH /uploads/{upload_id}` with upload data -- app to caos: `GET /uploads/{upload_id}`, returns metadata (including `{hash}`) as soon as the upload is finished. -- app to caos: `POST /uploads/{upload_id}/accept` with target bucket IDs +### Example file upload flow + +1. The client notifies the app that it wants to upload something. +2. The app sends a `POST /uploads/` request to minna-caos. It receives the following response: + ```json + { + "upload_id": "UPLOAD_ID" + } + ``` + +3. The app sends the upload ID to the client. +4. The client sends a `PATCH /uploads/UPLOAD_ID` request with the upload data to minna-caos. +5. The client goes offline during uploading. When it is online again, it sends a `HEAD /uploads/UPLOAD_ID` request to minna-caos. + It receives the current upload offset `123` in the `Upload-Offset` header. +6. The client resumes the upload by sending another `PATCH /uploads/UPLOAD_ID` request with the upload data to minna-caos. + The `Upload-Offset` request header is set to `123`. +7. Meanwhile, the app continuously polls the `GET /uploads/UPLOAD_ID` endpoint of minna-caos until it returns a response with the `state` field set to + `"finished"`: + ```json + { + "id": "UPLOAD_ID", + "state": "finished", + "hash": "OBJECT_HASH", + "size": 200, + "media_type": "image/png", + "bucket_ids": ["staging"] + } + ``` + +8. Based on the returned metadata, the app decides that it will accept the upload and place it in the `local` bucket. + It sends a `POST /uploads/UPLOAD_ID/accept` request to minna-caos with the following payload: + ```json + { + "buckets": ["local"] + } + ``` + +9. minna-caos starts copying/uploading the object from the `staging` bucket to the `local` bucket. + + Even while this process is still running, the object data is already accessible at `/objects/OBJECT_HASH`. ## Roadmap - metadata endpoints +- endpoint for settling an upload with the hash of an existing object - upload expiration - garbage-collect failed uploads - add code comments - graceful shutdown -- more storage backends \ No newline at end of file +- more storage backends +- CLI + +## See also + +- [**minna-pima**](https://git.moritzruth.de/minna/minna-pima) — integrates with minna-caos to provide picture management features \ No newline at end of file diff --git a/src/http_api/mod.rs b/src/http_api/mod.rs index 4f55471..279a78a 100644 --- a/src/http_api/mod.rs +++ b/src/http_api/mod.rs @@ -11,7 +11,6 @@ use crate::upload_manager::UploadManager; use axum::Router; use color_eyre::Result; use sqlx::SqlitePool; -use std::net::IpAddr; use std::sync::Arc; #[derive(Debug)] @@ -23,7 +22,9 @@ struct ContextInner { type Context = Arc; -pub async fn start_http_api_server(upload_manager: Arc, database: SqlitePool, config: Arc, address: IpAddr, port: u16) -> Result<()> { +pub async fn start_http_api_server(upload_manager: Arc, database: SqlitePool, config: Arc) -> Result<()> { + let listener = tokio::net::TcpListener::bind((config.http_address, config.http_port)).await?; + let router = Router::new() .nest("/uploads", create_uploads_router()) .nest("/objects", create_objects_router()) @@ -33,8 +34,6 @@ pub async fn start_http_api_server(upload_manager: Arc, database: config, })); - let listener = tokio::net::TcpListener::bind((address, port)).await?; axum::serve(listener, router).await?; - Ok(()) } diff --git a/src/http_api/uploads/mod.rs b/src/http_api/uploads/mod.rs index 0685cf1..f17d1ec 100644 --- a/src/http_api/uploads/mod.rs +++ b/src/http_api/uploads/mod.rs @@ -66,10 +66,22 @@ struct GetUploadMetadataResponse { #[derive(Debug, Serialize)] #[serde(rename_all = "snake_case", tag = "state")] enum GetUploadMetadataResponseState { - Ongoing { current_size: u64, total_size: u64 }, - Complete { size: u64 }, - Finished { size: u64, hash: String }, - Failed { reason: UploadFailureReason }, + Ongoing { + current_size: u64, + total_size: u64, + }, + Complete { + size: u64, + }, + Finished { + hash: String, + size: u64, + media_type: String, + bucket_ids: Vec, + }, + Failed { + reason: UploadFailureReason, + }, } impl IntoResponse for GetUploadMetadataResponse { @@ -132,8 +144,10 @@ async fn get_upload_metadata( GetUploadMetadataResponseState::Failed { reason } } else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? { GetUploadMetadataResponseState::Finished { - size: metadata.size, hash: metadata.hash, + size: metadata.size, + media_type: metadata.media_type, + bucket_ids: metadata.bucket_ids, } } else { return Err(ApiError::UnknownResource { @@ -151,7 +165,7 @@ async fn get_upload_metadata( #[derive(Debug, Deserialize)] struct AcceptUploadPayload { - bucket_ids: Vec, + buckets: Vec, } async fn accept_upload( @@ -174,7 +188,7 @@ async fn accept_upload( .into()); }; - for bucket_id in &payload.bucket_ids { + for bucket_id in &payload.buckets { if !context.config.buckets.contains_key(bucket_id) { return Err(ApiError::CaosUnknownBucket { bucket_id: bucket_id.to_string().into(), @@ -183,7 +197,7 @@ async fn accept_upload( } } - context.upload_manager.accept_finished_upload(&mut tx, upload_id, payload.bucket_ids).await?; + context.upload_manager.accept_finished_upload(&mut tx, upload_id, payload.buckets).await?; tx.commit().await.map_err(Into::::into)?; Ok(()) diff --git a/src/main.rs b/src/main.rs index b7342da..c7fd182 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,7 +38,7 @@ async fn main() -> Result<()> { log::info!("Initialization successful."); - start_http_api_server(upload_manager, database, Arc::clone(&config), config.http_address, config.http_port).await?; + start_http_api_server(upload_manager, database, Arc::clone(&config)).await?; Ok(()) } diff --git a/src/processing_worker.rs b/src/processing_worker.rs index 37a21be..513e3b2 100644 --- a/src/processing_worker.rs +++ b/src/processing_worker.rs @@ -27,6 +27,7 @@ pub async fn do_processing_work( sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await.unwrap(); + // TODO: Handle the case of there already being an object with this hash sqlx::query!( "INSERT INTO objects (hash, size, media_type, creation_date) VALUES (?, ?, ?, ?)", hash, diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 4869226..208653d 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -126,16 +126,28 @@ impl UploadManager { pub async fn get_finished_upload_metadata_by_id(&self, tx: &mut SqliteTransaction<'_>, id: &UploadId) -> Result> { let id = id.to_string(); - Ok(sqlx::query!( - "SELECT objects.hash, objects.size FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?", + let object = sqlx::query!( + "SELECT objects.hash, objects.size, objects.media_type FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?", id ) .fetch_optional(&mut **tx) - .await? - .map(|r| FinishedUploadMetadata { - size: r.size as u64, - hash: r.hash, - })) + .await?; + + Ok(if let Some(object) = object { + let bucket_ids = sqlx::query!("SELECT bucket_id FROM object_replicas WHERE hash = ?", object.hash) + .map(|r| r.bucket_id) + .fetch_all(&mut **tx) + .await?; + + Some(FinishedUploadMetadata { + hash: object.hash, + size: object.size as u64, + media_type: object.media_type, + bucket_ids, + }) + } else { + None + }) } pub async fn accept_finished_upload(&self, tx: &mut SqliteTransaction<'_>, id: UploadId, bucket_ids: Vec) -> Result> { @@ -158,6 +170,8 @@ impl UploadManager { pub struct FinishedUploadMetadata { pub hash: String, pub size: u64, + pub media_type: String, + pub bucket_ids: Vec, } #[derive(Debug)]