use crate::caddy::CaddyController; use async_std::io::{BufReader, BufWriter, WriteExt}; use async_std::net::TcpStream; use async_std::stream::StreamExt; use async_std::sync::RwLock; use async_std::task::JoinHandle; use async_std::{fs, task}; use camino::{Utf8Path, Utf8PathBuf}; use color_eyre::Result; use color_eyre::eyre::{WrapErr, eyre}; use log::{debug, info}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; #[derive(Serialize, Deserialize)] pub struct SiteState { active_version: Option, current_version: Option, } impl SiteState { async fn read_from_site_directory(site_directory_path: &Utf8Path) -> Result { let string = fs::read_to_string(site_directory_path.join("site.toml").into_std_path_buf()).await?; Ok(toml::from_str::(&string)?) } async fn write_to_site_directory(&self, site_directory_path: &Utf8Path) -> Result<()> { fs::create_dir_all(site_directory_path.as_std_path()).await?; let mut file = fs::File::create(site_directory_path.join("site.toml").into_std_path_buf()).await?; file.write_all(toml::to_string(&self).unwrap().as_bytes()).await?; Ok(()) } fn get_current_version_if_outdated(&self) -> Option<&SiteStateVersion> { if let Some(current_version) = &self.current_version { if let Some(active_version) = &self.active_version { if current_version.id != active_version.id { return Some(current_version); } } else { return Some(current_version); } } None } } #[derive(Serialize, Deserialize, Clone)] pub struct SiteStateVersion { id: String, download_url: String, } pub struct Site { pub domain: String, pub path: Utf8PathBuf, pub state: RwLock, } type Sites = RwLock>>; pub struct SitesWorker { sites: Arc, sites_directory_path: Utf8PathBuf, download_tasks_sender: async_std::channel::Sender, } impl SitesWorker { pub async fn set_current_version(&self, domain: String, id: String, download_url: String) -> Result<()> { let mut sites = self.sites.write().await; let site = sites.entry(domain.clone()).or_insert_with(|| { Arc::new(Site { domain: domain.clone(), path: self.sites_directory_path.join(&domain), state: RwLock::new(SiteState { current_version: None, active_version: None, }), }) }); let mut state = site.state.write().await; state.current_version = Some(SiteStateVersion { id, download_url }); state.write_to_site_directory(&site.path).await?; self.download_tasks_sender.send(domain).await?; Ok(()) } } pub async fn load_sites(sites_directory_path: &Utf8Path) -> Result { let mut sites: HashMap> = HashMap::new(); if fs::metadata(sites_directory_path.as_std_path()).await.map(|m| m.is_dir()).unwrap_or(false) { let mut dir = fs::read_dir(sites_directory_path.as_std_path()).await?; loop { if let Some(entry) = dir.next().await { let entry = entry?; if entry.file_type().await?.is_dir() { let path = Utf8PathBuf::from_path_buf(entry.path().into()).unwrap(); let domain = path.file_name().unwrap().to_string(); let state = SiteState::read_from_site_directory(&path).await?; sites.insert( domain.clone(), Arc::new(Site { path, domain, state: RwLock::new(state), }), ); } } else { break; } } } else { fs::create_dir_all(sites_directory_path.as_std_path()) .await .wrap_err("Failed to create the sites directory (and its parents)")?; } Ok(RwLock::new(sites)) } async fn handle_download(ureq_agent: Arc, sites: &Sites, caddy_controller: &mut CaddyController, domain: String) -> Result<()> { let site = { let sites = sites.read().await; match sites.get(&domain) { None => { debug!("Skipping download for {domain} because it is no longer managed."); return Ok(()); } Some(a) => Arc::clone(a), } }; let mut state = site.state.write().await; let current_version = match state.get_current_version_if_outdated() { None => { debug!("Skipping download for {domain} because it is not outdated."); return Ok(()); } Some(v) => v, }; info!("Starting download for {domain} ({}).", current_version.id); // Download let archive_file_path = site.path.join(format!("{}.zip", current_version.id)); let response = task::spawn_blocking({ let url = current_version.download_url.clone(); move || ureq_agent.get(url).call() }) .await?; let status = response.status(); if !status.is_success() { return Err(eyre!("Download request failed with status code {status}")); } let archive_file = task::spawn_blocking({ let archive_file_path = archive_file_path.to_owned().into_std_path_buf(); move || -> std::io::Result { let mut writer = std::io::BufWriter::new( std::fs::OpenOptions::new() .truncate(true) .create(true) .write(true) .read(true) .open(archive_file_path)?, ); std::io::copy(&mut response.into_body().into_reader(), &mut writer)?; Ok(writer.into_inner()?) } }) .await?; let extraction_directory_path = site.path.join(¤t_version.id); let _ = fs::remove_dir_all(extraction_directory_path.as_std_path()).await; fs::create_dir_all(extraction_directory_path.as_std_path()).await?; debug!("Finished download for {domain} ({}), now unpacking…", current_version.id); // Unpack to temp dir task::spawn_blocking({ let extraction_directory_path = extraction_directory_path.clone(); move || -> Result<()> { let mut archive = zip::ZipArchive::new(std::io::BufReader::new(archive_file))?; archive.extract(&extraction_directory_path)?; Ok(()) } }) .await?; // Update and write state let current_version = current_version.clone(); let old_active_version = state.active_version.replace(current_version); state.write_to_site_directory(&site.path).await?; // Update Caddy configuration info!("Download for {domain} successful, now updating proxy configuration…"); caddy_controller.upsert_site_configuration(&domain, &extraction_directory_path).await?; // Cleanup fs::remove_file(archive_file_path.as_std_path()).await?; if let Some(old_active_version) = old_active_version { let _ = fs::remove_dir_all(site.path.join(old_active_version.id).into_std_path_buf()).await; } info!("Cleanup finished for {domain}"); Ok(()) } async fn sites_worker( mut download_tasks_receiver: async_std::channel::Receiver, sites: Arc, mut caddy_controller: CaddyController, ) -> Result { let ureq_agent = Arc::new(ureq::Agent::new_with_config( ureq::Agent::config_builder() .timeout_resolve(Some(Duration::from_secs(10))) .timeout_connect(Some(Duration::from_secs(30))) .timeout_global(Some(Duration::from_mins(15))) .build(), )); loop { let domain = download_tasks_receiver.recv().await.unwrap(); handle_download(Arc::clone(&ureq_agent), &sites, &mut caddy_controller, domain).await?; } } pub async fn start_sites_worker(sites_directory_path: Utf8PathBuf, mut caddy_controller: CaddyController) -> Result<(SitesWorker, JoinHandle>)> { let (download_tasks_sender, download_tasks_receiver) = async_std::channel::unbounded(); info!("Discovering managed sites…"); let sites = Arc::new(load_sites(&sites_directory_path).await?); let mut total_count = 0; let mut outdated_count = 0; for (_, site) in sites.read().await.iter() { let state = site.state.read().await; if let Some(current_version) = &state.current_version { let mut is_outdated = true; total_count += 1; if let Some(active_version) = &state.active_version { is_outdated = current_version.id != active_version.id; caddy_controller .upsert_site_configuration(&site.domain, &site.path.join(&active_version.id)) .await?; } if is_outdated { outdated_count += 1; download_tasks_sender.send(site.domain.clone()).await?; } } } info!("Discovered {total_count} site(s), {outdated_count} outdated."); let join_handle = task::spawn(sites_worker(download_tasks_receiver, Arc::clone(&sites), caddy_controller)); Ok(( SitesWorker { sites, sites_directory_path, download_tasks_sender, }, join_handle, )) }