WIP: v1.0.0

This commit is contained in:
Moritz Ruth 2025-04-19 22:44:44 +02:00
parent 06e0ce3fcf
commit 8c54da772f
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
8 changed files with 123 additions and 57 deletions

2
Cargo.lock generated
View file

@ -1437,7 +1437,7 @@ dependencies = [
] ]
[[package]] [[package]]
name = "minna_caos" name = "minna-caos"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"axum", "axum",

View file

@ -1,5 +1,5 @@
[package] [package]
name = "minna_caos" name = "minna-caos"
version = "0.1.0" version = "0.1.0"
edition = "2024" edition = "2024"
@ -7,27 +7,27 @@ edition = "2024"
opt-level = 3 opt-level = 3
[dependencies] [dependencies]
sqlx = { version = "0.8.3", features = ["tls-rustls-ring-native-roots", "sqlite", "runtime-tokio"] }
axum = { version = "0.8.3", default-features = false, features = ["json", "http1", "tokio", "macros"] } axum = { version = "0.8.3", default-features = false, features = ["json", "http1", "tokio", "macros"] }
tokio = { version = "1.44.1", features = ["rt-multi-thread", "macros", "parking_lot"] } blake3 = { version = "1.8.1", features = ["rayon", "mmap"] }
camino = { version = "1.1.9", features = ["serde1"] }
color-eyre = "0.6.3" color-eyre = "0.6.3"
log = "0.4.26" constant_time_eq = "0.4.2"
dashmap = "7.0.0-rc2"
env_logger = "0.11.7" env_logger = "0.11.7"
figment = { version = "0.10.19", features = ["env", "toml", "parking_lot"] } figment = { version = "0.10.19", features = ["env", "toml", "parking_lot"] }
serde = { version = "1.0.219", features = ["derive"] }
validator = { version = "0.20.0", features = ["derive"] }
once_cell = "1.21.1"
regex = "1.11.1"
serde_json = "1.0.140"
constant_time_eq = "0.4.2"
fstr = { version = "0.2.13", features = ["serde"] }
camino = { version = "1.1.9", features = ["serde1"] }
dashmap = "7.0.0-rc2"
tokio-util = { version = "0.7.14", features = ["io"] }
replace_with = "0.1.7"
rand = "0.9.0"
futures = "0.3.31"
strum = { version = "0.27.1", features = ["derive"] }
blake3 = { version = "1.8.1", features = ["rayon", "mmap"] }
file_type = { version = "0.8.3", default-features = false, features = ["std", "wikidata"] } file_type = { version = "0.8.3", default-features = false, features = ["std", "wikidata"] }
fstr = { version = "0.2.13", features = ["serde"] }
futures = "0.3.31"
log = "0.4.26"
once_cell = "1.21.1"
rand = "0.9.0"
regex = "1.11.1"
replace_with = "0.1.7"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
sqlx = { version = "0.8.3", features = ["tls-rustls-ring-native-roots", "sqlite", "runtime-tokio"] }
strum = { version = "0.27.1", features = ["derive"] }
temporal_rs = "0.0.6" temporal_rs = "0.0.6"
tokio = { version = "1.44.1", features = ["rt-multi-thread", "macros", "parking_lot"] }
tokio-util = { version = "0.7.14", features = ["io"] }
validator = { version = "0.20.0", features = ["derive"] }

View file

@ -1,9 +1,10 @@
# minna-caos # [WIP] minna-caos
> Content-Addressed Object Storage server intended for usage with Minna. > Content-Addressed Object Storage server intended for usage with Minna.
**minna-caos** was created because the developers found themselves writing similar storage backends in every project, again and again. **minna-caos** was created because the developers found themselves writing similar storage backends in every project, again and again.
It is intended to be a companion to a single application (“the app”) that has full authority. **You should imagine it as a library**, not a stand-alone
It is intended as a companion to a single application (“the app”) that has full authority. **You should imagine it as a library**, not a stand-alone
application. application.
@ -22,32 +23,69 @@ application.
- direct download from the underlying storage backend (if supported) - direct download from the underlying storage backend (if supported)
- named storage buckets - named storage buckets (backed by a single storage backend each)
- object operations:
- upload
- set target buckets
- delete
- deletion tombstones - deletion tombstones
- administration CLI - SQLite database
## Upload steps - CLI for administration tasks
- client to app: request to upload something, returns `{upload_id}` ## Notes
- app to caos: `POST /uploads` returns `{upload_ìd}`
- client to caos: `PATCH /uploads/{upload_id}` with upload data ### Example file upload flow
- app to caos: `GET /uploads/{upload_id}`, returns metadata (including `{hash}`) as soon as the upload is finished.
- app to caos: `POST /uploads/{upload_id}/accept` with target bucket IDs 1. The client notifies the app that it wants to upload something.
2. The app sends a `POST /uploads/` request to minna-caos. It receives the following response:
```json
{
"upload_id": "UPLOAD_ID"
}
```
3. The app sends the upload ID to the client.
4. The client sends a `PATCH /uploads/UPLOAD_ID` request with the upload data to minna-caos.
5. The client goes offline during uploading. When it is online again, it sends a `HEAD /uploads/UPLOAD_ID` request to minna-caos.
It receives the current upload offset `123` in the `Upload-Offset` header.
6. The client resumes the upload by sending another `PATCH /uploads/UPLOAD_ID` request with the upload data to minna-caos.
The `Upload-Offset` request header is set to `123`.
7. Meanwhile, the app continuously polls the `GET /uploads/UPLOAD_ID` endpoint of minna-caos until it returns a response with the `state` field set to
`"finished"`:
```json
{
"id": "UPLOAD_ID",
"state": "finished",
"hash": "OBJECT_HASH",
"size": 200,
"media_type": "image/png",
"bucket_ids": ["staging"]
}
```
8. Based on the returned metadata, the app decides that it will accept the upload and place it in the `local` bucket.
It sends a `POST /uploads/UPLOAD_ID/accept` request to minna-caos with the following payload:
```json
{
"buckets": ["local"]
}
```
9. minna-caos starts copying/uploading the object from the `staging` bucket to the `local` bucket.
Even while this process is still running, the object data is already accessible at `/objects/OBJECT_HASH`.
## Roadmap ## Roadmap
- metadata endpoints - metadata endpoints
- endpoint for settling an upload with the hash of an existing object
- upload expiration - upload expiration
- garbage-collect failed uploads - garbage-collect failed uploads
- add code comments - add code comments
- graceful shutdown - graceful shutdown
- more storage backends - more storage backends
- CLI
## See also
- [**minna-pima**](https://git.moritzruth.de/minna/minna-pima) — integrates with minna-caos to provide picture management features

View file

@ -11,7 +11,6 @@ use crate::upload_manager::UploadManager;
use axum::Router; use axum::Router;
use color_eyre::Result; use color_eyre::Result;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use std::net::IpAddr;
use std::sync::Arc; use std::sync::Arc;
#[derive(Debug)] #[derive(Debug)]
@ -23,7 +22,9 @@ struct ContextInner {
type Context = Arc<ContextInner>; type Context = Arc<ContextInner>;
pub async fn start_http_api_server(upload_manager: Arc<UploadManager>, database: SqlitePool, config: Arc<Config>, address: IpAddr, port: u16) -> Result<()> { pub async fn start_http_api_server(upload_manager: Arc<UploadManager>, database: SqlitePool, config: Arc<Config>) -> Result<()> {
let listener = tokio::net::TcpListener::bind((config.http_address, config.http_port)).await?;
let router = Router::new() let router = Router::new()
.nest("/uploads", create_uploads_router()) .nest("/uploads", create_uploads_router())
.nest("/objects", create_objects_router()) .nest("/objects", create_objects_router())
@ -33,8 +34,6 @@ pub async fn start_http_api_server(upload_manager: Arc<UploadManager>, database:
config, config,
})); }));
let listener = tokio::net::TcpListener::bind((address, port)).await?;
axum::serve(listener, router).await?; axum::serve(listener, router).await?;
Ok(()) Ok(())
} }

View file

@ -66,10 +66,22 @@ struct GetUploadMetadataResponse {
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case", tag = "state")] #[serde(rename_all = "snake_case", tag = "state")]
enum GetUploadMetadataResponseState { enum GetUploadMetadataResponseState {
Ongoing { current_size: u64, total_size: u64 }, Ongoing {
Complete { size: u64 }, current_size: u64,
Finished { size: u64, hash: String }, total_size: u64,
Failed { reason: UploadFailureReason }, },
Complete {
size: u64,
},
Finished {
hash: String,
size: u64,
media_type: String,
bucket_ids: Vec<BucketId>,
},
Failed {
reason: UploadFailureReason,
},
} }
impl IntoResponse for GetUploadMetadataResponse { impl IntoResponse for GetUploadMetadataResponse {
@ -132,8 +144,10 @@ async fn get_upload_metadata(
GetUploadMetadataResponseState::Failed { reason } GetUploadMetadataResponseState::Failed { reason }
} else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? { } else if let Some(metadata) = context.upload_manager.get_finished_upload_metadata_by_id(&mut tx, &upload_id).await? {
GetUploadMetadataResponseState::Finished { GetUploadMetadataResponseState::Finished {
size: metadata.size,
hash: metadata.hash, hash: metadata.hash,
size: metadata.size,
media_type: metadata.media_type,
bucket_ids: metadata.bucket_ids,
} }
} else { } else {
return Err(ApiError::UnknownResource { return Err(ApiError::UnknownResource {
@ -151,7 +165,7 @@ async fn get_upload_metadata(
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct AcceptUploadPayload { struct AcceptUploadPayload {
bucket_ids: Vec<BucketId>, buckets: Vec<BucketId>,
} }
async fn accept_upload( async fn accept_upload(
@ -174,7 +188,7 @@ async fn accept_upload(
.into()); .into());
}; };
for bucket_id in &payload.bucket_ids { for bucket_id in &payload.buckets {
if !context.config.buckets.contains_key(bucket_id) { if !context.config.buckets.contains_key(bucket_id) {
return Err(ApiError::CaosUnknownBucket { return Err(ApiError::CaosUnknownBucket {
bucket_id: bucket_id.to_string().into(), bucket_id: bucket_id.to_string().into(),
@ -183,7 +197,7 @@ async fn accept_upload(
} }
} }
context.upload_manager.accept_finished_upload(&mut tx, upload_id, payload.bucket_ids).await?; context.upload_manager.accept_finished_upload(&mut tx, upload_id, payload.buckets).await?;
tx.commit().await.map_err(Into::<Report>::into)?; tx.commit().await.map_err(Into::<Report>::into)?;
Ok(()) Ok(())

View file

@ -38,7 +38,7 @@ async fn main() -> Result<()> {
log::info!("Initialization successful."); log::info!("Initialization successful.");
start_http_api_server(upload_manager, database, Arc::clone(&config), config.http_address, config.http_port).await?; start_http_api_server(upload_manager, database, Arc::clone(&config)).await?;
Ok(()) Ok(())
} }

View file

@ -27,6 +27,7 @@ pub async fn do_processing_work(
sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await.unwrap(); sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await.unwrap();
// TODO: Handle the case of there already being an object with this hash
sqlx::query!( sqlx::query!(
"INSERT INTO objects (hash, size, media_type, creation_date) VALUES (?, ?, ?, ?)", "INSERT INTO objects (hash, size, media_type, creation_date) VALUES (?, ?, ?, ?)",
hash, hash,

View file

@ -126,16 +126,28 @@ impl UploadManager {
pub async fn get_finished_upload_metadata_by_id(&self, tx: &mut SqliteTransaction<'_>, id: &UploadId) -> Result<Option<FinishedUploadMetadata>> { pub async fn get_finished_upload_metadata_by_id(&self, tx: &mut SqliteTransaction<'_>, id: &UploadId) -> Result<Option<FinishedUploadMetadata>> {
let id = id.to_string(); let id = id.to_string();
Ok(sqlx::query!( let object = sqlx::query!(
"SELECT objects.hash, objects.size FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?", "SELECT objects.hash, objects.size, objects.media_type FROM finished_uploads JOIN objects ON finished_uploads.hash = objects.hash WHERE id = ?",
id id
) )
.fetch_optional(&mut **tx) .fetch_optional(&mut **tx)
.await? .await?;
.map(|r| FinishedUploadMetadata {
size: r.size as u64, Ok(if let Some(object) = object {
hash: r.hash, let bucket_ids = sqlx::query!("SELECT bucket_id FROM object_replicas WHERE hash = ?", object.hash)
})) .map(|r| r.bucket_id)
.fetch_all(&mut **tx)
.await?;
Some(FinishedUploadMetadata {
hash: object.hash,
size: object.size as u64,
media_type: object.media_type,
bucket_ids,
})
} else {
None
})
} }
pub async fn accept_finished_upload(&self, tx: &mut SqliteTransaction<'_>, id: UploadId, bucket_ids: Vec<BucketId>) -> Result<Option<()>> { pub async fn accept_finished_upload(&self, tx: &mut SqliteTransaction<'_>, id: UploadId, bucket_ids: Vec<BucketId>) -> Result<Option<()>> {
@ -158,6 +170,8 @@ impl UploadManager {
pub struct FinishedUploadMetadata { pub struct FinishedUploadMetadata {
pub hash: String, pub hash: String,
pub size: u64, pub size: u64,
pub media_type: String,
pub bucket_ids: Vec<BucketId>,
} }
#[derive(Debug)] #[derive(Debug)]