From cb995d2a90114664200cd07db42a39afdff48393 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Fri, 4 Apr 2025 00:35:08 +0200 Subject: [PATCH] WIP: v1.0.0 --- Cargo.lock | 385 ++++++++++++++++++++++-- Cargo.toml | 5 +- README.md | 1 - src/config.rs | 2 + src/http_api/upload/append_to_upload.rs | 6 +- src/http_api/upload/mod.rs | 2 +- src/main.rs | 2 +- src/processing_worker.rs | 91 +++++- src/upload_manager.rs | 19 +- src/util/file_reference.rs | 10 +- src/util/hash_to_hex_string.rs | 22 ++ src/util/mod.rs | 2 + src/util/temporal_formatting.rs | 24 ++ 13 files changed, 526 insertions(+), 45 deletions(-) create mode 100644 src/util/hash_to_hex_string.rs create mode 100644 src/util/temporal_formatting.rs diff --git a/Cargo.lock b/Cargo.lock index 0c690ed..212f12d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,18 @@ version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" +[[package]] +name = "arrayref" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" + +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-trait" version = "0.1.88" @@ -247,6 +259,21 @@ dependencies = [ "serde", ] +[[package]] +name = "blake3" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389a099b34312839e16420d499a9cad9650541715937ffbdd40d36f49e77eeb3" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq 0.3.1", + "memmap2", + "rayon-core", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -280,6 +307,16 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "calendrical_calculations" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97f73e95d668625c9b28a3072e6326773785a0cf807de9f3d632778438f3d38" +dependencies = [ + "core_maths", + "displaydoc", +] + [[package]] name = "camino" version = "1.1.9" @@ -370,6 +407,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "constant_time_eq" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" + [[package]] name = "constant_time_eq" version = "0.4.2" @@ -392,6 +435,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_maths" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77745e017f5edba1a9c1d854f6f3a52dac8a12dd5af5d2f54aecf61e43d80d30" +dependencies = [ + "libm", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -416,6 +468,25 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -631,6 +702,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "file_type" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2f511d3f93055b5e7d74761d035e9b530314b53224f4ef8a9ee25c672efe569" +dependencies = [ + "phf", +] + [[package]] name = "flume" version = "0.11.1" @@ -1012,6 +1092,32 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_calendar" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f664d19093224c9de27db5d1797b4105ae9545c0c540faf0d351884d1b24ca6" +dependencies = [ + "calendrical_calculations", + "displaydoc", + "icu_calendar_data", + "icu_locale_core", + "icu_provider 2.0.0-beta2", + "tinystr 0.8.1", + "writeable 0.6.1", + "zerovec 0.11.1", +] + +[[package]] +name = "icu_calendar_data" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd70bb6c7a5d0d24c94fa18309118879bbde09052b18eec96fc75aa4c6dbf659" +dependencies = [ + "icu_locale", + "icu_provider_baked", +] + [[package]] name = "icu_collections" version = "1.5.0" @@ -1019,9 +1125,60 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" dependencies = [ "displaydoc", - "yoke", + "yoke 0.7.5", "zerofrom", - "zerovec", + "zerovec 0.10.4", +] + +[[package]] +name = "icu_collections" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63df3227b8f369b3f7cc4003f0bdd9ca0083b871e2672811f699d69b473cc174" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke 0.8.0", + "zerofrom", + "zerovec 0.11.1", +] + +[[package]] +name = "icu_locale" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afa4c80f106c1cf0f1b66e0ae9806f603f1c2c41d004229af1b0c6cebe84c74a" +dependencies = [ + "displaydoc", + "icu_collections 2.0.0-beta2", + "icu_locale_core", + "icu_locale_data", + "icu_provider 2.0.0-beta2", + "potential_utf", + "tinystr 0.8.1", + "zerovec 0.11.1", +] + +[[package]] +name = "icu_locale_core" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b80161b66511e4eb415ef110c67ea8cab4400b749f9e30c8691fff1354934b6b" +dependencies = [ + "displaydoc", + "litemap", + "tinystr 0.8.1", + "writeable 0.6.1", + "zerovec 0.11.1", +] + +[[package]] +name = "icu_locale_data" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c1adc94a0bde584f8751381c0427d763ef5068fd388d670fabf966569f01465" +dependencies = [ + "icu_provider_baked", ] [[package]] @@ -1032,9 +1189,9 @@ checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" dependencies = [ "displaydoc", "litemap", - "tinystr", - "writeable", - "zerovec", + "tinystr 0.7.6", + "writeable 0.5.5", + "zerovec 0.10.4", ] [[package]] @@ -1046,9 +1203,9 @@ dependencies = [ "displaydoc", "icu_locid", "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", + "icu_provider 1.5.0", + "tinystr 0.7.6", + "zerovec 0.10.4", ] [[package]] @@ -1064,15 +1221,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" dependencies = [ "displaydoc", - "icu_collections", + "icu_collections 1.5.0", "icu_normalizer_data", "icu_properties", - "icu_provider", + "icu_provider 1.5.0", "smallvec", "utf16_iter", "utf8_iter", "write16", - "zerovec", + "zerovec 0.10.4", ] [[package]] @@ -1088,12 +1245,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" dependencies = [ "displaydoc", - "icu_collections", + "icu_collections 1.5.0", "icu_locid_transform", "icu_properties_data", - "icu_provider", - "tinystr", - "zerovec", + "icu_provider 1.5.0", + "tinystr 0.7.6", + "zerovec 0.10.4", ] [[package]] @@ -1112,11 +1269,39 @@ dependencies = [ "icu_locid", "icu_provider_macros", "stable_deref_trait", - "tinystr", - "writeable", - "yoke", + "tinystr 0.7.6", + "writeable 0.5.5", + "yoke 0.7.5", "zerofrom", - "zerovec", + "zerovec 0.10.4", +] + +[[package]] +name = "icu_provider" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0d462aad52985bb71e3140fcc44e54d816cf7f2c3f25cd9b090cc77a9798504" +dependencies = [ + "displaydoc", + "icu_locale_core", + "stable_deref_trait", + "tinystr 0.8.1", + "writeable 0.6.1", + "yoke 0.8.0", + "zerofrom", + "zerovec 0.11.1", +] + +[[package]] +name = "icu_provider_baked" +version = "2.0.0-beta2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2794f00ee1999495f4f1a1e35aee8f54fe7cfcbcf909ec05b60522377200aecb" +dependencies = [ + "icu_provider 2.0.0-beta2", + "writeable 0.6.1", + "zerotrie", + "zerovec 0.11.1", ] [[package]] @@ -1197,6 +1382,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "ixdtf" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3be3d801e2817c5311a3be4f1e1b2148dcd2b10baadb3a5eade0544a0521ac9" +dependencies = [ + "displaydoc", + "utf8_iter", +] + [[package]] name = "jiff" version = "0.2.4" @@ -1313,6 +1508,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memmap2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" +dependencies = [ + "libc", +] + [[package]] name = "mime" version = "0.3.17" @@ -1333,12 +1537,14 @@ name = "minna_caos" version = "0.1.0" dependencies = [ "axum", + "blake3", "camino", "color-eyre", - "constant_time_eq", + "constant_time_eq 0.4.2", "dashmap", "env_logger", "figment", + "file_type", "fstr", "futures", "log", @@ -1351,6 +1557,7 @@ dependencies = [ "serde_json", "sqlx", "strum", + "temporal_rs", "tokio", "tokio-util", "validator", @@ -1535,6 +1742,24 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1589,6 +1814,16 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "serde", + "zerovec 0.11.1", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1781,6 +2016,16 @@ dependencies = [ "getrandom 0.3.2", ] +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.10" @@ -2144,6 +2389,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.9" @@ -2476,6 +2727,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "temporal_rs" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d994d7580277c289d6e27ad6f54278338b82b1fd50a32991d8779a8810d4e8f" +dependencies = [ + "iana-time-zone", + "icu_calendar", + "ixdtf", + "num-traits", + "tinystr 0.8.1", + "web-time", + "writeable 0.6.1", +] + [[package]] name = "thiserror" version = "2.0.12" @@ -2513,7 +2779,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" dependencies = [ "displaydoc", - "zerovec", + "zerovec 0.10.4", +] + +[[package]] +name = "tinystr" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" +dependencies = [ + "displaydoc", + "zerovec 0.11.1", ] [[package]] @@ -3288,6 +3564,12 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "writeable" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" + [[package]] name = "yansi" version = "1.0.1" @@ -3302,7 +3584,19 @@ checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", - "yoke-derive", + "yoke-derive 0.7.5", + "zerofrom", +] + +[[package]] +name = "yoke" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive 0.8.0", "zerofrom", ] @@ -3318,6 +3612,18 @@ dependencies = [ "synstructure", ] +[[package]] +name = "yoke-derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.8.24" @@ -3365,15 +3671,35 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +[[package]] +name = "zerotrie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b7a6cf4865aac8394f19ad46e37f60b929c1ba5eed798b96a32820aa9392929" +dependencies = [ + "displaydoc", +] + [[package]] name = "zerovec" version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" dependencies = [ - "yoke", + "yoke 0.7.5", "zerofrom", - "zerovec-derive", + "zerovec-derive 0.10.3", +] + +[[package]] +name = "zerovec" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94e62113720e311984f461c56b00457ae9981c0bc7859d22306cc2ae2f95571c" +dependencies = [ + "yoke 0.8.0", + "zerofrom", + "zerovec-derive 0.11.1", ] [[package]] @@ -3386,3 +3712,14 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zerovec-derive" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index f045746..0aba898 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,4 +28,7 @@ tokio-util = "0.7.14" replace_with = "0.1.7" rand = "0.9.0" futures = "0.3.31" -strum = { version = "0.27.1", features = ["derive"] } \ No newline at end of file +strum = { version = "0.27.1", features = ["derive"] } +blake3 = { version = "1.8.1", features = ["rayon", "mmap"] } +file_type = "0.8.3" +temporal_rs = "0.0.6" \ No newline at end of file diff --git a/README.md b/README.md index 641f923..de0f5f6 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,6 @@ ## Roadmap -- basic uploading - media type detection - graceful shutdown - metadata endpoints diff --git a/src/config.rs b/src/config.rs index 0ca1e32..3e16b5a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,6 +18,8 @@ pub struct Config { pub api_secret: FStr<64>, pub database_file: Utf8PathBuf, pub staging_directory: Utf8PathBuf, + #[serde(default)] + pub enable_multithreaded_hashing: bool, #[validate(nested, custom(function = "validate_buckets"))] pub buckets: Vec, } diff --git a/src/http_api/upload/append_to_upload.rs b/src/http_api/upload/append_to_upload.rs index d38b04c..b9e34bb 100644 --- a/src/http_api/upload/append_to_upload.rs +++ b/src/http_api/upload/append_to_upload.rs @@ -228,12 +228,16 @@ async fn do_append( ) -> Result { let mut upload_state = parameters.upload.state().write().await; let release_request_token = file_acquisition.release_request_token(); - let mut file = file_acquisition.inner().get_or_open().await?; + let mut file = file_acquisition.inner().get_or_open(true).await?; let total_size = parameters.upload.total_size(); let current_offset = file.stream_position().await?; if current_offset < upload_state.current_size() { + log::error!( + "The upload ({}) failed because the file contains less data than expected.", + parameters.upload.id() + ); parameters.upload.fail(UploadFailureReason::MissingData).await?; return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData)); } diff --git a/src/http_api/upload/mod.rs b/src/http_api/upload/mod.rs index d74e32d..77a6102 100644 --- a/src/http_api/upload/mod.rs +++ b/src/http_api/upload/mod.rs @@ -67,5 +67,5 @@ async fn create_upload( let upload = context.upload_manager.create_upload(payload.size).await?; - Ok(Json(CreateUploadResponseBody { upload_id: upload.id() })) + Ok(Json(CreateUploadResponseBody { upload_id: *upload.id() })) } diff --git a/src/main.rs b/src/main.rs index edc40cd..3f5f6e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { .await .wrap_err("Failed to open the database connection.")?; - let upload_manager = UploadManager::create(database.clone(), config.staging_directory).await?; + let upload_manager = UploadManager::create(database.clone(), config.staging_directory, config.enable_multithreaded_hashing).await?; log::info!("Initialization successful."); diff --git a/src/processing_worker.rs b/src/processing_worker.rs index 506604e..5661676 100644 --- a/src/processing_worker.rs +++ b/src/processing_worker.rs @@ -1,21 +1,102 @@ -use crate::upload_manager::{FileReference, UnfinishedUpload, UploadId}; +use crate::upload_manager::{FileReference, UnfinishedUpload}; use crate::util::acquirable::Acquisition; +use crate::util::hash_to_hex_string::HashExt; +use crate::util::temporal_formatting::TemporalFormatting; +use blake3::Hasher; +use file_type::FileType; +use fstr::FStr; use sqlx::SqlitePool; +use std::io::SeekFrom; use std::sync::Arc; +use temporal_rs::Now; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; -pub async fn do_processing_work(mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver>, database: SqlitePool) { +pub async fn do_processing_work( + mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver>, + database: SqlitePool, + enable_multithreaded_hashing: bool, +) { while let Some(upload) = tasks_receiver.recv().await { let mut file_acquisition = upload .acquire_file() .await .expect("When an upload is marked as complete, requests no longer acquire the file."); - process(upload.id(), &mut file_acquisition).await; + match process(enable_multithreaded_hashing, &mut file_acquisition).await { + Ok(outcome) => { + let mut tx = database.begin().await.unwrap(); + + let id = upload.id().as_str(); + let hash = outcome.hash.as_str(); + let size = upload.total_size() as i64; + let creation_date = Now::zoneddatetime_iso(None).unwrap().to_string_with_defaults().unwrap(); + + sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await.unwrap(); + + sqlx::query!( + "INSERT INTO objects (hash, size, media_type, creation_date) VALUES (?, ?, ?, ?)", + hash, + size, + outcome.media_type, + creation_date + ) + .execute(&mut *tx) + .await + .unwrap(); + + sqlx::query!("INSERT INTO object_replicas (hash, is_present, bucket_id) VALUES (?, true, 'staging')", hash,) + .execute(&mut *tx) + .await + .unwrap(); + + sqlx::query!("INSERT INTO finished_uploads (id, hash) VALUES (?, ?)", id, hash) + .execute(&mut *tx) + .await + .unwrap(); + + tx.commit().await.unwrap(); + + log::debug!("Successfully processed upload ({}): {:?}", id, outcome); + } + Err(report) => { + log::error!("Error during upload processing ({}): {:#}", upload.id(), report); + } + } file_acquisition.destroy().await; } } -pub async fn process(id: UploadId, file_acquisition: &mut Acquisition) { - let file = file_acquisition.inner().get_or_open().await.unwrap(); // TODO: Handle errors +#[derive(Debug)] +struct ProcessingOutcome { + hash: FStr<64>, + media_type: &'static str, +} + +pub async fn process(enable_multithreaded_hashing: bool, file_acquisition: &mut Acquisition) -> Result { + let file_reference = file_acquisition.inner(); + let path = file_reference.path().to_owned(); + let file = file_reference.get_or_open(false).await?; + file.seek(SeekFrom::Start(0)).await?; + + const HEAD_LENGTH: usize = 16 * 1024; + let mut file_content_head = Vec::with_capacity(HEAD_LENGTH); + file.take(HEAD_LENGTH as u64).read_to_end(&mut file_content_head).await?; + + let mut hasher = Hasher::new(); + let hash = tokio::task::spawn_blocking(move || -> Result<_, std::io::Error> { + if enable_multithreaded_hashing { + hasher.update_mmap_rayon(path)?; + } else { + hasher.update_mmap(path)?; + } + + Ok(hasher.finalize().to_hex_fstr()) + }) + .await??; + + let file_type = FileType::from_bytes(file_content_head); + let media_type = file_type.media_types().first().map(|s| *s).unwrap_or("application/octet-stream"); + + Ok(ProcessingOutcome { hash, media_type }) } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index e528767..0a0c81d 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -6,12 +6,11 @@ use camino::Utf8PathBuf; use color_eyre::{Report, Result}; use dashmap::DashMap; use fstr::FStr; -use sqlx::{Row, SqlitePool}; +use sqlx::SqlitePool; use std::fmt::Debug; use std::str::FromStr; use std::sync::{Arc, Weak}; -use strum::{Display, EnumString, IntoStaticStr}; -use tokio::fs::{File, OpenOptions}; +use strum::{Display, EnumString}; use tokio::sync::RwLock; const LARGE_FILE_SIZE_THRESHOLD: u64 = 1024; @@ -28,7 +27,7 @@ pub struct UploadManager { } impl UploadManager { - pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf) -> Result> { + pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result> { log::info!("Loading uploads…"); let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -71,8 +70,12 @@ impl UploadManager { } log::info!("Starting upload processing…"); - tokio::spawn(do_processing_work(small_file_processing_tasks_receiver, database.clone())); - tokio::spawn(do_processing_work(large_file_processing_tasks_receiver, database)); + tokio::spawn(do_processing_work( + small_file_processing_tasks_receiver, + database.clone(), + enable_multithreaded_hashing, + )); + tokio::spawn(do_processing_work(large_file_processing_tasks_receiver, database, enable_multithreaded_hashing)); Ok(manager) } @@ -145,8 +148,8 @@ pub struct UnfinishedUpload { } impl UnfinishedUpload { - pub fn id(&self) -> UploadId { - self.id + pub fn id(&self) -> &UploadId { + &self.id } pub fn total_size(&self) -> u64 { diff --git a/src/util/file_reference.rs b/src/util/file_reference.rs index 8762c2a..3bb652e 100644 --- a/src/util/file_reference.rs +++ b/src/util/file_reference.rs @@ -1,4 +1,4 @@ -use camino::Utf8PathBuf; +use camino::{Utf8Path, Utf8PathBuf}; use tokio::fs::{File, OpenOptions}; #[derive(Debug)] @@ -12,16 +12,20 @@ impl FileReference { FileReference { path, file: None } } - pub async fn get_or_open(&mut self) -> color_eyre::Result<&mut File, std::io::Error> { + pub async fn get_or_open(&mut self, should_create: bool) -> color_eyre::Result<&mut File, std::io::Error> { let file = &mut self.file; if let Some(file) = file { Ok(file) } else { - *file = Some(OpenOptions::new().read(true).append(true).open(&self.path).await?); + *file = Some(OpenOptions::new().read(true).append(true).create(should_create).open(&self.path).await?); Ok(unsafe { file.as_mut().unwrap_unchecked() }) } } + pub fn path(&self) -> &Utf8Path { + &self.path + } + pub fn is_open(&self) -> bool { self.file.is_some() } diff --git a/src/util/hash_to_hex_string.rs b/src/util/hash_to_hex_string.rs new file mode 100644 index 0000000..65752bd --- /dev/null +++ b/src/util/hash_to_hex_string.rs @@ -0,0 +1,22 @@ +use blake3::{Hash, OUT_LEN}; +use fstr::FStr; +use std::mem; +use std::mem::MaybeUninit; + +pub trait HashExt { + fn to_hex_fstr(&self) -> FStr<{ 2 * OUT_LEN }>; +} + +impl HashExt for Hash { + fn to_hex_fstr(&self) -> FStr<{ 2 * OUT_LEN }> { + static ALPHABET: &[u8; 16] = b"0123456789abcdef"; + let mut result = [const { MaybeUninit::uninit() }; 2 * OUT_LEN]; + + for x in 0..OUT_LEN { + result[x * 2].write(ALPHABET[x >> 4]); + result[x * 2 + 1].write(ALPHABET[x & 0xf]); + } + + unsafe { FStr::from_inner_unchecked(mem::transmute::<_, [u8; 2 * OUT_LEN]>(result)) } + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 23f7370..9566b36 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,3 +1,5 @@ pub mod acquirable; pub mod file_reference; +pub mod hash_to_hex_string; pub mod id; +pub mod temporal_formatting; diff --git a/src/util/temporal_formatting.rs b/src/util/temporal_formatting.rs new file mode 100644 index 0000000..d00583d --- /dev/null +++ b/src/util/temporal_formatting.rs @@ -0,0 +1,24 @@ +use temporal_rs::options::{DisplayCalendar, DisplayOffset, DisplayTimeZone, ToStringRoundingOptions}; +use temporal_rs::parsers::Precision; +use temporal_rs::provider::NeverProvider; +use temporal_rs::{TemporalResult, ZonedDateTime}; + +pub trait TemporalFormatting { + fn to_string_with_defaults(&self) -> TemporalResult; +} + +impl TemporalFormatting for ZonedDateTime { + fn to_string_with_defaults(&self) -> TemporalResult { + self.to_ixdtf_string_with_provider( + DisplayOffset::Auto, + DisplayTimeZone::Auto, + DisplayCalendar::Never, + ToStringRoundingOptions { + precision: Precision::Auto, + smallest_unit: None, + rounding_mode: None, + }, + &NeverProvider, + ) + } +}