WIP: v1.0.0

This commit is contained in:
Moritz Ruth 2025-04-04 00:35:08 +02:00
parent 96a8ea72f1
commit cb995d2a90
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
13 changed files with 526 additions and 45 deletions

385
Cargo.lock generated
View file

@ -103,6 +103,18 @@ version = "1.0.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
[[package]]
name = "arrayref"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb"
[[package]]
name = "arrayvec"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-trait"
version = "0.1.88"
@ -247,6 +259,21 @@ dependencies = [
"serde",
]
[[package]]
name = "blake3"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389a099b34312839e16420d499a9cad9650541715937ffbdd40d36f49e77eeb3"
dependencies = [
"arrayref",
"arrayvec",
"cc",
"cfg-if",
"constant_time_eq 0.3.1",
"memmap2",
"rayon-core",
]
[[package]]
name = "block-buffer"
version = "0.10.4"
@ -280,6 +307,16 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "calendrical_calculations"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97f73e95d668625c9b28a3072e6326773785a0cf807de9f3d632778438f3d38"
dependencies = [
"core_maths",
"displaydoc",
]
[[package]]
name = "camino"
version = "1.1.9"
@ -370,6 +407,12 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "constant_time_eq"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
[[package]]
name = "constant_time_eq"
version = "0.4.2"
@ -392,6 +435,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "core_maths"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77745e017f5edba1a9c1d854f6f3a52dac8a12dd5af5d2f54aecf61e43d80d30"
dependencies = [
"libm",
]
[[package]]
name = "cpufeatures"
version = "0.2.17"
@ -416,6 +468,25 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
@ -631,6 +702,15 @@ dependencies = [
"version_check",
]
[[package]]
name = "file_type"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2f511d3f93055b5e7d74761d035e9b530314b53224f4ef8a9ee25c672efe569"
dependencies = [
"phf",
]
[[package]]
name = "flume"
version = "0.11.1"
@ -1012,6 +1092,32 @@ dependencies = [
"cc",
]
[[package]]
name = "icu_calendar"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f664d19093224c9de27db5d1797b4105ae9545c0c540faf0d351884d1b24ca6"
dependencies = [
"calendrical_calculations",
"displaydoc",
"icu_calendar_data",
"icu_locale_core",
"icu_provider 2.0.0-beta2",
"tinystr 0.8.1",
"writeable 0.6.1",
"zerovec 0.11.1",
]
[[package]]
name = "icu_calendar_data"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd70bb6c7a5d0d24c94fa18309118879bbde09052b18eec96fc75aa4c6dbf659"
dependencies = [
"icu_locale",
"icu_provider_baked",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
@ -1019,9 +1125,60 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526"
dependencies = [
"displaydoc",
"yoke",
"yoke 0.7.5",
"zerofrom",
"zerovec",
"zerovec 0.10.4",
]
[[package]]
name = "icu_collections"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63df3227b8f369b3f7cc4003f0bdd9ca0083b871e2672811f699d69b473cc174"
dependencies = [
"displaydoc",
"potential_utf",
"yoke 0.8.0",
"zerofrom",
"zerovec 0.11.1",
]
[[package]]
name = "icu_locale"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afa4c80f106c1cf0f1b66e0ae9806f603f1c2c41d004229af1b0c6cebe84c74a"
dependencies = [
"displaydoc",
"icu_collections 2.0.0-beta2",
"icu_locale_core",
"icu_locale_data",
"icu_provider 2.0.0-beta2",
"potential_utf",
"tinystr 0.8.1",
"zerovec 0.11.1",
]
[[package]]
name = "icu_locale_core"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b80161b66511e4eb415ef110c67ea8cab4400b749f9e30c8691fff1354934b6b"
dependencies = [
"displaydoc",
"litemap",
"tinystr 0.8.1",
"writeable 0.6.1",
"zerovec 0.11.1",
]
[[package]]
name = "icu_locale_data"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c1adc94a0bde584f8751381c0427d763ef5068fd388d670fabf966569f01465"
dependencies = [
"icu_provider_baked",
]
[[package]]
@ -1032,9 +1189,9 @@ checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637"
dependencies = [
"displaydoc",
"litemap",
"tinystr",
"writeable",
"zerovec",
"tinystr 0.7.6",
"writeable 0.5.5",
"zerovec 0.10.4",
]
[[package]]
@ -1046,9 +1203,9 @@ dependencies = [
"displaydoc",
"icu_locid",
"icu_locid_transform_data",
"icu_provider",
"tinystr",
"zerovec",
"icu_provider 1.5.0",
"tinystr 0.7.6",
"zerovec 0.10.4",
]
[[package]]
@ -1064,15 +1221,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f"
dependencies = [
"displaydoc",
"icu_collections",
"icu_collections 1.5.0",
"icu_normalizer_data",
"icu_properties",
"icu_provider",
"icu_provider 1.5.0",
"smallvec",
"utf16_iter",
"utf8_iter",
"write16",
"zerovec",
"zerovec 0.10.4",
]
[[package]]
@ -1088,12 +1245,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5"
dependencies = [
"displaydoc",
"icu_collections",
"icu_collections 1.5.0",
"icu_locid_transform",
"icu_properties_data",
"icu_provider",
"tinystr",
"zerovec",
"icu_provider 1.5.0",
"tinystr 0.7.6",
"zerovec 0.10.4",
]
[[package]]
@ -1112,11 +1269,39 @@ dependencies = [
"icu_locid",
"icu_provider_macros",
"stable_deref_trait",
"tinystr",
"writeable",
"yoke",
"tinystr 0.7.6",
"writeable 0.5.5",
"yoke 0.7.5",
"zerofrom",
"zerovec",
"zerovec 0.10.4",
]
[[package]]
name = "icu_provider"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0d462aad52985bb71e3140fcc44e54d816cf7f2c3f25cd9b090cc77a9798504"
dependencies = [
"displaydoc",
"icu_locale_core",
"stable_deref_trait",
"tinystr 0.8.1",
"writeable 0.6.1",
"yoke 0.8.0",
"zerofrom",
"zerovec 0.11.1",
]
[[package]]
name = "icu_provider_baked"
version = "2.0.0-beta2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2794f00ee1999495f4f1a1e35aee8f54fe7cfcbcf909ec05b60522377200aecb"
dependencies = [
"icu_provider 2.0.0-beta2",
"writeable 0.6.1",
"zerotrie",
"zerovec 0.11.1",
]
[[package]]
@ -1197,6 +1382,16 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "ixdtf"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3be3d801e2817c5311a3be4f1e1b2148dcd2b10baadb3a5eade0544a0521ac9"
dependencies = [
"displaydoc",
"utf8_iter",
]
[[package]]
name = "jiff"
version = "0.2.4"
@ -1313,6 +1508,15 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "memmap2"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f"
dependencies = [
"libc",
]
[[package]]
name = "mime"
version = "0.3.17"
@ -1333,12 +1537,14 @@ name = "minna_caos"
version = "0.1.0"
dependencies = [
"axum",
"blake3",
"camino",
"color-eyre",
"constant_time_eq",
"constant_time_eq 0.4.2",
"dashmap",
"env_logger",
"figment",
"file_type",
"fstr",
"futures",
"log",
@ -1351,6 +1557,7 @@ dependencies = [
"serde_json",
"sqlx",
"strum",
"temporal_rs",
"tokio",
"tokio-util",
"validator",
@ -1535,6 +1742,24 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "phf"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
@ -1589,6 +1814,16 @@ dependencies = [
"portable-atomic",
]
[[package]]
name = "potential_utf"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585"
dependencies = [
"serde",
"zerovec 0.11.1",
]
[[package]]
name = "ppv-lite86"
version = "0.2.21"
@ -1781,6 +2016,16 @@ dependencies = [
"getrandom 0.3.2",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.10"
@ -2144,6 +2389,12 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "siphasher"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "slab"
version = "0.4.9"
@ -2476,6 +2727,21 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "temporal_rs"
version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d994d7580277c289d6e27ad6f54278338b82b1fd50a32991d8779a8810d4e8f"
dependencies = [
"iana-time-zone",
"icu_calendar",
"ixdtf",
"num-traits",
"tinystr 0.8.1",
"web-time",
"writeable 0.6.1",
]
[[package]]
name = "thiserror"
version = "2.0.12"
@ -2513,7 +2779,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f"
dependencies = [
"displaydoc",
"zerovec",
"zerovec 0.10.4",
]
[[package]]
name = "tinystr"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b"
dependencies = [
"displaydoc",
"zerovec 0.11.1",
]
[[package]]
@ -3288,6 +3564,12 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
[[package]]
name = "writeable"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb"
[[package]]
name = "yansi"
version = "1.0.1"
@ -3302,7 +3584,19 @@ checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40"
dependencies = [
"serde",
"stable_deref_trait",
"yoke-derive",
"yoke-derive 0.7.5",
"zerofrom",
]
[[package]]
name = "yoke"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc"
dependencies = [
"serde",
"stable_deref_trait",
"yoke-derive 0.8.0",
"zerofrom",
]
@ -3318,6 +3612,18 @@ dependencies = [
"synstructure",
]
[[package]]
name = "yoke-derive"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zerocopy"
version = "0.8.24"
@ -3365,15 +3671,35 @@ version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
[[package]]
name = "zerotrie"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b7a6cf4865aac8394f19ad46e37f60b929c1ba5eed798b96a32820aa9392929"
dependencies = [
"displaydoc",
]
[[package]]
name = "zerovec"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079"
dependencies = [
"yoke",
"yoke 0.7.5",
"zerofrom",
"zerovec-derive",
"zerovec-derive 0.10.3",
]
[[package]]
name = "zerovec"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94e62113720e311984f461c56b00457ae9981c0bc7859d22306cc2ae2f95571c"
dependencies = [
"yoke 0.8.0",
"zerofrom",
"zerovec-derive 0.11.1",
]
[[package]]
@ -3386,3 +3712,14 @@ dependencies = [
"quote",
"syn",
]
[[package]]
name = "zerovec-derive"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View file

@ -28,4 +28,7 @@ tokio-util = "0.7.14"
replace_with = "0.1.7"
rand = "0.9.0"
futures = "0.3.31"
strum = { version = "0.27.1", features = ["derive"] }
strum = { version = "0.27.1", features = ["derive"] }
blake3 = { version = "1.8.1", features = ["rayon", "mmap"] }
file_type = "0.8.3"
temporal_rs = "0.0.6"

View file

@ -40,7 +40,6 @@
## Roadmap
- basic uploading
- media type detection
- graceful shutdown
- metadata endpoints

View file

@ -18,6 +18,8 @@ pub struct Config {
pub api_secret: FStr<64>,
pub database_file: Utf8PathBuf,
pub staging_directory: Utf8PathBuf,
#[serde(default)]
pub enable_multithreaded_hashing: bool,
#[validate(nested, custom(function = "validate_buckets"))]
pub buckets: Vec<ConfigBucket>,
}

View file

@ -228,12 +228,16 @@ async fn do_append(
) -> Result<AppendToUploadOutcome, Report> {
let mut upload_state = parameters.upload.state().write().await;
let release_request_token = file_acquisition.release_request_token();
let mut file = file_acquisition.inner().get_or_open().await?;
let mut file = file_acquisition.inner().get_or_open(true).await?;
let total_size = parameters.upload.total_size();
let current_offset = file.stream_position().await?;
if current_offset < upload_state.current_size() {
log::error!(
"The upload ({}) failed because the file contains less data than expected.",
parameters.upload.id()
);
parameters.upload.fail(UploadFailureReason::MissingData).await?;
return Ok(AppendToUploadOutcome::Failed(UploadFailureReason::MissingData));
}

View file

@ -67,5 +67,5 @@ async fn create_upload(
let upload = context.upload_manager.create_upload(payload.size).await?;
Ok(Json(CreateUploadResponseBody { upload_id: upload.id() }))
Ok(Json(CreateUploadResponseBody { upload_id: *upload.id() }))
}

View file

@ -32,7 +32,7 @@ async fn main() -> Result<()> {
.await
.wrap_err("Failed to open the database connection.")?;
let upload_manager = UploadManager::create(database.clone(), config.staging_directory).await?;
let upload_manager = UploadManager::create(database.clone(), config.staging_directory, config.enable_multithreaded_hashing).await?;
log::info!("Initialization successful.");

View file

@ -1,21 +1,102 @@
use crate::upload_manager::{FileReference, UnfinishedUpload, UploadId};
use crate::upload_manager::{FileReference, UnfinishedUpload};
use crate::util::acquirable::Acquisition;
use crate::util::hash_to_hex_string::HashExt;
use crate::util::temporal_formatting::TemporalFormatting;
use blake3::Hasher;
use file_type::FileType;
use fstr::FStr;
use sqlx::SqlitePool;
use std::io::SeekFrom;
use std::sync::Arc;
use temporal_rs::Now;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub async fn do_processing_work(mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver<Arc<UnfinishedUpload>>, database: SqlitePool) {
pub async fn do_processing_work(
mut tasks_receiver: tokio::sync::mpsc::UnboundedReceiver<Arc<UnfinishedUpload>>,
database: SqlitePool,
enable_multithreaded_hashing: bool,
) {
while let Some(upload) = tasks_receiver.recv().await {
let mut file_acquisition = upload
.acquire_file()
.await
.expect("When an upload is marked as complete, requests no longer acquire the file.");
process(upload.id(), &mut file_acquisition).await;
match process(enable_multithreaded_hashing, &mut file_acquisition).await {
Ok(outcome) => {
let mut tx = database.begin().await.unwrap();
let id = upload.id().as_str();
let hash = outcome.hash.as_str();
let size = upload.total_size() as i64;
let creation_date = Now::zoneddatetime_iso(None).unwrap().to_string_with_defaults().unwrap();
sqlx::query!("DELETE FROM unfinished_uploads WHERE id = ?", id).execute(&mut *tx).await.unwrap();
sqlx::query!(
"INSERT INTO objects (hash, size, media_type, creation_date) VALUES (?, ?, ?, ?)",
hash,
size,
outcome.media_type,
creation_date
)
.execute(&mut *tx)
.await
.unwrap();
sqlx::query!("INSERT INTO object_replicas (hash, is_present, bucket_id) VALUES (?, true, 'staging')", hash,)
.execute(&mut *tx)
.await
.unwrap();
sqlx::query!("INSERT INTO finished_uploads (id, hash) VALUES (?, ?)", id, hash)
.execute(&mut *tx)
.await
.unwrap();
tx.commit().await.unwrap();
log::debug!("Successfully processed upload ({}): {:?}", id, outcome);
}
Err(report) => {
log::error!("Error during upload processing ({}): {:#}", upload.id(), report);
}
}
file_acquisition.destroy().await;
}
}
pub async fn process(id: UploadId, file_acquisition: &mut Acquisition<FileReference>) {
let file = file_acquisition.inner().get_or_open().await.unwrap(); // TODO: Handle errors
#[derive(Debug)]
struct ProcessingOutcome {
hash: FStr<64>,
media_type: &'static str,
}
pub async fn process(enable_multithreaded_hashing: bool, file_acquisition: &mut Acquisition<FileReference>) -> Result<ProcessingOutcome, std::io::Error> {
let file_reference = file_acquisition.inner();
let path = file_reference.path().to_owned();
let file = file_reference.get_or_open(false).await?;
file.seek(SeekFrom::Start(0)).await?;
const HEAD_LENGTH: usize = 16 * 1024;
let mut file_content_head = Vec::with_capacity(HEAD_LENGTH);
file.take(HEAD_LENGTH as u64).read_to_end(&mut file_content_head).await?;
let mut hasher = Hasher::new();
let hash = tokio::task::spawn_blocking(move || -> Result<_, std::io::Error> {
if enable_multithreaded_hashing {
hasher.update_mmap_rayon(path)?;
} else {
hasher.update_mmap(path)?;
}
Ok(hasher.finalize().to_hex_fstr())
})
.await??;
let file_type = FileType::from_bytes(file_content_head);
let media_type = file_type.media_types().first().map(|s| *s).unwrap_or("application/octet-stream");
Ok(ProcessingOutcome { hash, media_type })
}

View file

@ -6,12 +6,11 @@ use camino::Utf8PathBuf;
use color_eyre::{Report, Result};
use dashmap::DashMap;
use fstr::FStr;
use sqlx::{Row, SqlitePool};
use sqlx::SqlitePool;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use strum::{Display, EnumString, IntoStaticStr};
use tokio::fs::{File, OpenOptions};
use strum::{Display, EnumString};
use tokio::sync::RwLock;
const LARGE_FILE_SIZE_THRESHOLD: u64 = 1024;
@ -28,7 +27,7 @@ pub struct UploadManager {
}
impl UploadManager {
pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf) -> Result<Arc<Self>> {
pub async fn create(database: SqlitePool, staging_directory_path: Utf8PathBuf, enable_multithreaded_hashing: bool) -> Result<Arc<Self>> {
log::info!("Loading uploads…");
let (small_file_processing_tasks_sender, small_file_processing_tasks_receiver) = tokio::sync::mpsc::unbounded_channel();
@ -71,8 +70,12 @@ impl UploadManager {
}
log::info!("Starting upload processing…");
tokio::spawn(do_processing_work(small_file_processing_tasks_receiver, database.clone()));
tokio::spawn(do_processing_work(large_file_processing_tasks_receiver, database));
tokio::spawn(do_processing_work(
small_file_processing_tasks_receiver,
database.clone(),
enable_multithreaded_hashing,
));
tokio::spawn(do_processing_work(large_file_processing_tasks_receiver, database, enable_multithreaded_hashing));
Ok(manager)
}
@ -145,8 +148,8 @@ pub struct UnfinishedUpload {
}
impl UnfinishedUpload {
pub fn id(&self) -> UploadId {
self.id
pub fn id(&self) -> &UploadId {
&self.id
}
pub fn total_size(&self) -> u64 {

View file

@ -1,4 +1,4 @@
use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};
use tokio::fs::{File, OpenOptions};
#[derive(Debug)]
@ -12,16 +12,20 @@ impl FileReference {
FileReference { path, file: None }
}
pub async fn get_or_open(&mut self) -> color_eyre::Result<&mut File, std::io::Error> {
pub async fn get_or_open(&mut self, should_create: bool) -> color_eyre::Result<&mut File, std::io::Error> {
let file = &mut self.file;
if let Some(file) = file {
Ok(file)
} else {
*file = Some(OpenOptions::new().read(true).append(true).open(&self.path).await?);
*file = Some(OpenOptions::new().read(true).append(true).create(should_create).open(&self.path).await?);
Ok(unsafe { file.as_mut().unwrap_unchecked() })
}
}
pub fn path(&self) -> &Utf8Path {
&self.path
}
pub fn is_open(&self) -> bool {
self.file.is_some()
}

View file

@ -0,0 +1,22 @@
use blake3::{Hash, OUT_LEN};
use fstr::FStr;
use std::mem;
use std::mem::MaybeUninit;
pub trait HashExt {
fn to_hex_fstr(&self) -> FStr<{ 2 * OUT_LEN }>;
}
impl HashExt for Hash {
fn to_hex_fstr(&self) -> FStr<{ 2 * OUT_LEN }> {
static ALPHABET: &[u8; 16] = b"0123456789abcdef";
let mut result = [const { MaybeUninit::uninit() }; 2 * OUT_LEN];
for x in 0..OUT_LEN {
result[x * 2].write(ALPHABET[x >> 4]);
result[x * 2 + 1].write(ALPHABET[x & 0xf]);
}
unsafe { FStr::from_inner_unchecked(mem::transmute::<_, [u8; 2 * OUT_LEN]>(result)) }
}
}

View file

@ -1,3 +1,5 @@
pub mod acquirable;
pub mod file_reference;
pub mod hash_to_hex_string;
pub mod id;
pub mod temporal_formatting;

View file

@ -0,0 +1,24 @@
use temporal_rs::options::{DisplayCalendar, DisplayOffset, DisplayTimeZone, ToStringRoundingOptions};
use temporal_rs::parsers::Precision;
use temporal_rs::provider::NeverProvider;
use temporal_rs::{TemporalResult, ZonedDateTime};
pub trait TemporalFormatting {
fn to_string_with_defaults(&self) -> TemporalResult<String>;
}
impl TemporalFormatting for ZonedDateTime {
fn to_string_with_defaults(&self) -> TemporalResult<String> {
self.to_ixdtf_string_with_provider(
DisplayOffset::Auto,
DisplayTimeZone::Auto,
DisplayCalendar::Never,
ToStringRoundingOptions {
precision: Precision::Auto,
smallest_unit: None,
rounding_mode: None,
},
&NeverProvider,
)
}
}