WIP: v1.0.0
This commit is contained in:
parent
5cbab43a23
commit
cc84c6ac37
7 changed files with 369 additions and 306 deletions
493
Cargo.lock
generated
493
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -4,9 +4,8 @@ version = "0.1.0"
|
|||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
sqlx = { version = "0.7.4", features = ["runtime-tokio-rustls", "sqlite"] }
|
||||
sqlx = { version = "0.8.3", features = ["tls-rustls-ring-native-roots", "sqlite"] }
|
||||
rocket = { version = "0.5.1", default-features = false, features = ["http2", "json"] }
|
||||
rocket_db_pools = { version = "0.2.0", features = ["sqlx_sqlite"] }
|
||||
opendal = { version = "0.52.0", features = ["services-fs"] }
|
||||
tokio = { version = "1.44.1", features = ["rt-multi-thread", "macros", "parking_lot"] }
|
||||
color-eyre = "0.6.3"
|
||||
|
@ -17,3 +16,9 @@ serde = { version = "1.0.219", features = ["derive"] }
|
|||
validator = { version = "0.20.0", features = ["derive"] }
|
||||
once_cell = "1.21.1"
|
||||
regex = "1.11.1"
|
||||
bytesize = { version = "2.0.1", features = ["serde"] }
|
||||
serde_regex = "1.1.0"
|
||||
constant_time_eq = "0.4.2"
|
||||
fstr = { version = "0.2.13", features = ["serde"] }
|
||||
camino = { version = "1.1.9", features = ["serde1"] }
|
||||
nanoid = "0.4.0"
|
||||
|
|
11
README.md
11
README.md
|
@ -18,8 +18,6 @@
|
|||
|
||||
- direct download from the underlying storage backend (if supported)
|
||||
|
||||
- pre-signed download and upload URLs
|
||||
|
||||
- named storage buckets
|
||||
|
||||
- object operations:
|
||||
|
@ -28,3 +26,12 @@
|
|||
- delete
|
||||
|
||||
- outgoing webhooks
|
||||
|
||||
## Upload steps
|
||||
|
||||
- client to app: request to upload something, returns `{upload_id}`
|
||||
- app to caos: `POST /uploads` returns `{upload_ìd}`
|
||||
|
||||
- client to caos: `PATCH /uploads/{upload_id}` with upload data, optionally using tus
|
||||
- app to caos: `GET /staging-area/{upload_id}`, returns metadata (including `{hash}`) as soon as the upload is complete
|
||||
- app to caos: `POST /staging-area/{upload_id}/accept` with target bucket IDs
|
|
@ -1,8 +1,9 @@
|
|||
create table objects
|
||||
(
|
||||
hash text not null,
|
||||
media_type text not null,
|
||||
creation_date text not null, -- RFC 3339 format
|
||||
hash text not null,
|
||||
size integer not null, -- in bytes
|
||||
media_type text not null,
|
||||
creation_date text not null, -- RFC 3339 format
|
||||
primary key (hash)
|
||||
) without rowid, strict;
|
||||
|
||||
|
@ -14,3 +15,14 @@ create table object_replicas
|
|||
primary key (hash, bucket_id),
|
||||
foreign key (hash) references objects (hash) on delete restrict on update restrict
|
||||
) strict;
|
||||
|
||||
create table uploads
|
||||
(
|
||||
id text not null,
|
||||
current_size integer not null, -- in bytes
|
||||
total_size integer, -- in bytes, or null if the upload was not started yet
|
||||
|
||||
hash text,
|
||||
primary key (id),
|
||||
foreign key (hash) references objects (hash) on delete restrict on update restrict
|
||||
)
|
|
@ -1,13 +1,14 @@
|
|||
use camino::Utf8PathBuf;
|
||||
use color_eyre::Result;
|
||||
use color_eyre::eyre::WrapErr;
|
||||
use figment::Figment;
|
||||
use figment::providers::Format;
|
||||
use fstr::FStr;
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::net::IpAddr;
|
||||
use std::path::PathBuf;
|
||||
use validator::{Validate, ValidationError};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Validate)]
|
||||
|
@ -16,6 +17,9 @@ pub struct Config {
|
|||
pub http_port: u16,
|
||||
#[serde(default)]
|
||||
pub trust_http_reverse_proxy: bool,
|
||||
pub api_secret: FStr<64>,
|
||||
pub database_file: Utf8PathBuf,
|
||||
pub staging_directory: Utf8PathBuf,
|
||||
#[validate(nested, custom(function = "validate_buckets"))]
|
||||
pub buckets: Vec<ConfigBucket>,
|
||||
}
|
||||
|
@ -46,6 +50,9 @@ pub struct ConfigBucket {
|
|||
pub id: String,
|
||||
#[validate(length(min = 1, max = 128))]
|
||||
pub display_name: String,
|
||||
pub size_limit: Option<bytesize::ByteSize>,
|
||||
#[serde(with = "serde_regex")]
|
||||
pub media_type_pattern: Option<Regex>,
|
||||
#[serde(flatten)]
|
||||
pub backend: ConfigBucketBackend,
|
||||
}
|
||||
|
@ -58,19 +65,25 @@ pub enum ConfigBucketBackend {
|
|||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ConfigBucketBackendFilesystem {
|
||||
pub path: PathBuf,
|
||||
pub path: Utf8PathBuf,
|
||||
}
|
||||
|
||||
pub fn load_config() -> Result<Config> {
|
||||
let figment = Figment::new()
|
||||
.merge(figment::providers::Toml::file("config.toml"))
|
||||
.merge(figment::providers::Env::raw().only(&["HTTP_ADDRESS", "HTTP_PORT"]));
|
||||
.merge(figment::providers::Env::prefixed("CAOS_").only(&[
|
||||
"HTTP_ADDRESS",
|
||||
"HTTP_PORT",
|
||||
"API_SECRET",
|
||||
]));
|
||||
|
||||
let config = figment
|
||||
.extract::<Config>()
|
||||
.wrap_err("Failed to load configuration.")?;
|
||||
|
||||
config
|
||||
.validate()
|
||||
.wrap_err("Failed to validate configuration.")?;
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,16 @@
|
|||
use crate::config::Config;
|
||||
use color_eyre::Result;
|
||||
use fstr::FStr;
|
||||
use nanoid::nanoid;
|
||||
use rocket::form::validate::Len;
|
||||
use rocket::http::Status;
|
||||
use rocket::outcome::Outcome::Success;
|
||||
use rocket::request::{FromRequest, Outcome};
|
||||
use rocket::{Request, State, post, routes};
|
||||
use serde::Deserialize;
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
pub async fn start_http_api_server(config: &Config) -> Result<()> {
|
||||
pub async fn start_http_api_server(config: &Config, database: SqlitePool) -> Result<()> {
|
||||
let rocket_app = rocket::custom(rocket::config::Config {
|
||||
address: config.http_address,
|
||||
port: config.http_port,
|
||||
|
@ -20,6 +29,60 @@ pub async fn start_http_api_server(config: &Config) -> Result<()> {
|
|||
..rocket::Config::default()
|
||||
});
|
||||
|
||||
rocket_app.launch().await?;
|
||||
rocket_app
|
||||
.manage(CorrectApiSecret(config.api_secret.clone()))
|
||||
.manage(database)
|
||||
.mount("/", routes![create_upload])
|
||||
.launch()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CreateUploadRequest {}
|
||||
|
||||
#[post("/uploads")]
|
||||
async fn create_upload(_accessor: AuthorizedApiAccessor, database: &State<SqlitePool>) {
|
||||
let total_size = 20;
|
||||
let id = nanoid!();
|
||||
|
||||
sqlx::query!(
|
||||
"INSERT INTO uploads (id, current_size, total_size, hash) VALUES(?, 0, ?, null)",
|
||||
id,
|
||||
total_size
|
||||
)
|
||||
.execute(database.inner())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
struct CorrectApiSecret(FStr<64>);
|
||||
|
||||
struct AuthorizedApiAccessor();
|
||||
|
||||
#[rocket::async_trait]
|
||||
impl<'r> FromRequest<'r> for AuthorizedApiAccessor {
|
||||
type Error = ();
|
||||
|
||||
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
|
||||
let provided_secret = request
|
||||
.headers()
|
||||
.get_one("Authorization")
|
||||
.map(|v| v.strip_prefix("Bearer "))
|
||||
.take_if(|v| v.len() == 64)
|
||||
.flatten();
|
||||
|
||||
let correct_secret = request.rocket().state::<CorrectApiSecret>().unwrap().0;
|
||||
if let Some(provided_secret) = provided_secret {
|
||||
if constant_time_eq::constant_time_eq(
|
||||
provided_secret.as_bytes(),
|
||||
correct_secret.as_bytes(),
|
||||
) {
|
||||
return Success(AuthorizedApiAccessor());
|
||||
}
|
||||
}
|
||||
|
||||
Outcome::Error((Status::Forbidden, ()))
|
||||
}
|
||||
}
|
||||
|
|
56
src/main.rs
56
src/main.rs
|
@ -3,6 +3,7 @@ mod http_api;
|
|||
|
||||
use crate::config::{ConfigBucket, ConfigBucketBackend, load_config};
|
||||
use crate::http_api::start_http_api_server;
|
||||
use camino::Utf8Path;
|
||||
use color_eyre::Result;
|
||||
use color_eyre::eyre::{WrapErr, eyre};
|
||||
use std::collections::HashSet;
|
||||
|
@ -17,15 +18,33 @@ async fn main() -> Result<()> {
|
|||
let config = load_config()?;
|
||||
|
||||
log::debug!("Loaded configuration: {:#?}", config);
|
||||
|
||||
initialize_staging_directory(&config.staging_directory).await?;
|
||||
initialize_buckets(&config.buckets).await?;
|
||||
|
||||
let database = sqlx::SqlitePool::connect(&format!("sqlite://{}", config.database_file))
|
||||
.await
|
||||
.wrap_err("Failed to open the database connection.")?;
|
||||
|
||||
log::info!("Initialization successful.");
|
||||
|
||||
start_http_api_server(&config).await?;
|
||||
start_http_api_server(&config, database).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn initialize_staging_directory(path: &Utf8Path) -> Result<()> {
|
||||
log::info!("Initializing staging directory…");
|
||||
fs::create_dir_all(&path)
|
||||
.await
|
||||
.wrap_err_with(|| format!("Could not create directory: {path}"))?;
|
||||
|
||||
check_directory_writable(path)
|
||||
.await
|
||||
.wrap_err("The writeable check for the staging directory failed.")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn initialize_buckets(bucket_configs: &Vec<ConfigBucket>) -> Result<()> {
|
||||
let mut filesystem_backend_paths = HashSet::new();
|
||||
|
||||
|
@ -34,7 +53,7 @@ async fn initialize_buckets(bucket_configs: &Vec<ConfigBucket>) -> Result<()> {
|
|||
|
||||
match &bucket_config.backend {
|
||||
ConfigBucketBackend::Filesystem(filesystem_backend_config) => {
|
||||
let path = match filesystem_backend_config.path.canonicalize() {
|
||||
let path = match filesystem_backend_config.path.canonicalize_utf8() {
|
||||
Ok(path) => path,
|
||||
Err(error) if error.kind() == ErrorKind::NotFound => {
|
||||
fs::create_dir_all(&filesystem_backend_config.path)
|
||||
|
@ -42,33 +61,27 @@ async fn initialize_buckets(bucket_configs: &Vec<ConfigBucket>) -> Result<()> {
|
|||
.wrap_err_with(|| {
|
||||
format!(
|
||||
"Could not create directory: {}",
|
||||
filesystem_backend_config.path.to_string_lossy()
|
||||
filesystem_backend_config.path
|
||||
)
|
||||
})?;
|
||||
|
||||
filesystem_backend_config.path.canonicalize()?
|
||||
filesystem_backend_config.path.canonicalize_utf8()?
|
||||
}
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
|
||||
if filesystem_backend_paths.contains(&path) {
|
||||
return Err(eyre!(
|
||||
"More than one bucket using the filesystem backend is configured to use this path: {}",
|
||||
path.to_string_lossy()
|
||||
"More than one bucket using the filesystem backend is configured to use this path: {path}"
|
||||
));
|
||||
}
|
||||
|
||||
let write_test_file_path = path.join("./minna-caos-write-test");
|
||||
let _ = fs::File::create(&write_test_file_path)
|
||||
.await
|
||||
.wrap_err_with(|| {
|
||||
format!(
|
||||
"The write test file for the {} bucket failed.",
|
||||
&bucket_config.id
|
||||
)
|
||||
})?;
|
||||
|
||||
fs::remove_file(write_test_file_path).await?;
|
||||
check_directory_writable(&path).await.wrap_err_with(|| {
|
||||
format!(
|
||||
"The writable check for the {} bucket failed.",
|
||||
&bucket_config.id
|
||||
)
|
||||
})?;
|
||||
|
||||
filesystem_backend_paths.insert(path);
|
||||
}
|
||||
|
@ -77,3 +90,12 @@ async fn initialize_buckets(bucket_configs: &Vec<ConfigBucket>) -> Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn check_directory_writable(directory_path: &Utf8Path) -> Result<()> {
|
||||
let path = directory_path.join("./minna-caos-write-check");
|
||||
let _ = fs::File::create(&path)
|
||||
.await
|
||||
.wrap_err("Writable check failed.")?;
|
||||
fs::remove_file(path).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue