342 lines
11 KiB
Rust
342 lines
11 KiB
Rust
use crate::caddy::CaddyController;
|
|
use async_std::io::WriteExt;
|
|
use async_std::stream::StreamExt;
|
|
use async_std::sync::RwLock;
|
|
use async_std::task::JoinHandle;
|
|
use async_std::{fs, task};
|
|
use camino::{Utf8Path, Utf8PathBuf};
|
|
use color_eyre::Result;
|
|
use color_eyre::eyre::{WrapErr, eyre};
|
|
use log::{debug, error, info};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::io::ErrorKind;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
pub const SITE_STATE_FILE_NAME: &str = "state.toml";
|
|
pub const SITE_VERSION_CONFIG_FILE_NAME: &str = "site.toml";
|
|
|
|
#[derive(Deserialize, Default)]
|
|
pub struct SiteConfig {
|
|
#[serde(default)]
|
|
pub paths_mode: SiteConfigPathsMode,
|
|
#[serde(default)]
|
|
pub redirects: Vec<SiteConfigRedirect>,
|
|
#[serde(default)]
|
|
pub immutable_paths: Vec<String>,
|
|
}
|
|
|
|
#[derive(Deserialize, Default, Eq, PartialEq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum SiteConfigPathsMode {
|
|
#[default]
|
|
Normal,
|
|
Spa,
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
pub struct SiteConfigRedirect {
|
|
pub from: String,
|
|
pub to: String,
|
|
pub kind: SiteConfigRedirectKind,
|
|
}
|
|
|
|
#[derive(Deserialize, Eq, PartialEq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum SiteConfigRedirectKind {
|
|
Temporary,
|
|
Permanent,
|
|
Rewrite,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct SiteState {
|
|
active_version: Option<SiteStateVersion>,
|
|
current_version: Option<SiteStateVersion>,
|
|
}
|
|
|
|
impl SiteState {
|
|
async fn read_from_site_directory(site_directory_path: &Utf8Path) -> Result<Self> {
|
|
let string = fs::read_to_string(site_directory_path.join(SITE_STATE_FILE_NAME).into_std_path_buf()).await?;
|
|
Ok(toml::from_str::<SiteState>(&string)?)
|
|
}
|
|
|
|
async fn write_to_site_directory(&self, site_directory_path: &Utf8Path) -> Result<()> {
|
|
fs::create_dir_all(site_directory_path.as_std_path()).await?;
|
|
let mut file = fs::File::create(site_directory_path.join(SITE_STATE_FILE_NAME).into_std_path_buf()).await?;
|
|
file.write_all(toml::to_string(&self).unwrap().as_bytes()).await?;
|
|
Ok(())
|
|
}
|
|
|
|
fn get_current_version_if_outdated(&self) -> Option<&SiteStateVersion> {
|
|
if let Some(current_version) = &self.current_version {
|
|
if let Some(active_version) = &self.active_version {
|
|
if current_version.id != active_version.id {
|
|
return Some(current_version);
|
|
}
|
|
} else {
|
|
return Some(current_version);
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Clone)]
|
|
pub struct SiteStateVersion {
|
|
id: String,
|
|
download_url: String,
|
|
}
|
|
|
|
pub struct Site {
|
|
pub domain: String,
|
|
pub path: Utf8PathBuf,
|
|
pub state: RwLock<SiteState>,
|
|
}
|
|
|
|
type Sites = RwLock<HashMap<String, Arc<Site>>>;
|
|
|
|
pub struct SitesWorker {
|
|
sites: Arc<Sites>,
|
|
sites_directory_path: Utf8PathBuf,
|
|
download_tasks_sender: async_std::channel::Sender<String>,
|
|
}
|
|
|
|
impl SitesWorker {
|
|
pub async fn set_current_version(&self, domain: String, id: String, download_url: String) -> Result<()> {
|
|
let mut sites = self.sites.write().await;
|
|
let site = sites.entry(domain.clone()).or_insert_with(|| {
|
|
Arc::new(Site {
|
|
domain: domain.clone(),
|
|
path: self.sites_directory_path.join(&domain),
|
|
state: RwLock::new(SiteState {
|
|
current_version: None,
|
|
active_version: None,
|
|
}),
|
|
})
|
|
});
|
|
|
|
let mut state = site.state.write().await;
|
|
state.current_version = Some(SiteStateVersion { id, download_url });
|
|
state.write_to_site_directory(&site.path).await?;
|
|
self.download_tasks_sender.send(domain).await?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub async fn load_sites(sites_directory_path: &Utf8Path) -> Result<Sites> {
|
|
let mut sites: HashMap<String, Arc<Site>> = HashMap::new();
|
|
|
|
if fs::metadata(sites_directory_path.as_std_path()).await.map(|m| m.is_dir()).unwrap_or(false) {
|
|
let mut dir = fs::read_dir(sites_directory_path.as_std_path()).await?;
|
|
loop {
|
|
if let Some(entry) = dir.next().await {
|
|
let entry = entry?;
|
|
if entry.file_type().await?.is_dir() {
|
|
let path = Utf8PathBuf::from_path_buf(entry.path().into()).unwrap();
|
|
let domain = path.file_name().unwrap().to_string();
|
|
let state = SiteState::read_from_site_directory(&path).await?;
|
|
|
|
sites.insert(
|
|
domain.clone(),
|
|
Arc::new(Site {
|
|
path,
|
|
domain,
|
|
state: RwLock::new(state),
|
|
}),
|
|
);
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
fs::create_dir_all(sites_directory_path.as_std_path())
|
|
.await
|
|
.wrap_err("Failed to create the sites directory (and its parents)")?;
|
|
}
|
|
|
|
Ok(RwLock::new(sites))
|
|
}
|
|
|
|
async fn read_site_config(content_path: &Utf8Path) -> Result<Option<SiteConfig>> {
|
|
let path = content_path.join(SITE_VERSION_CONFIG_FILE_NAME).into_std_path_buf();
|
|
match fs::read_to_string(path).await {
|
|
Ok(config_string) => Ok(toml::from_str::<SiteConfig>(&config_string).ok()),
|
|
Err(e) if e.kind() == ErrorKind::NotFound => Ok(Some(SiteConfig::default())),
|
|
Err(e) => Err(e.into()),
|
|
}
|
|
}
|
|
|
|
async fn handle_download(ureq_agent: Arc<ureq::Agent>, sites: &Sites, caddy_controller: &mut CaddyController, domain: String) -> Result<()> {
|
|
let site = {
|
|
let sites = sites.read().await;
|
|
match sites.get(&domain) {
|
|
None => {
|
|
debug!("Skipping download for {domain} because it is no longer managed.");
|
|
return Ok(());
|
|
}
|
|
Some(a) => Arc::clone(a),
|
|
}
|
|
};
|
|
|
|
let mut state = site.state.write().await;
|
|
let current_version = match state.get_current_version_if_outdated() {
|
|
None => {
|
|
debug!("Skipping download for {domain} because it is not outdated.");
|
|
return Ok(());
|
|
}
|
|
Some(v) => v,
|
|
};
|
|
|
|
info!("Starting download for {domain} ({}).", current_version.id);
|
|
|
|
// Download
|
|
let archive_file_path = site.path.join(format!("{}.zip", current_version.id));
|
|
|
|
let response = task::spawn_blocking({
|
|
let url = current_version.download_url.clone();
|
|
move || ureq_agent.get(url).call()
|
|
})
|
|
.await?;
|
|
|
|
let status = response.status();
|
|
if !status.is_success() {
|
|
return Err(eyre!("Download request failed with status code {status}"));
|
|
}
|
|
|
|
let archive_file = task::spawn_blocking({
|
|
let archive_file_path = archive_file_path.to_owned().into_std_path_buf();
|
|
|
|
move || -> std::io::Result<std::fs::File> {
|
|
let mut writer = std::io::BufWriter::new(
|
|
std::fs::OpenOptions::new()
|
|
.truncate(true)
|
|
.create(true)
|
|
.write(true)
|
|
.read(true)
|
|
.open(archive_file_path)?,
|
|
);
|
|
|
|
std::io::copy(&mut response.into_body().into_reader(), &mut writer)?;
|
|
|
|
Ok(writer.into_inner()?)
|
|
}
|
|
})
|
|
.await?;
|
|
|
|
let extraction_directory_path = site.path.join(¤t_version.id);
|
|
let _ = fs::remove_dir_all(extraction_directory_path.as_std_path()).await;
|
|
fs::create_dir_all(extraction_directory_path.as_std_path()).await?;
|
|
|
|
debug!("Finished download for {domain} ({}), now unpacking…", current_version.id);
|
|
|
|
// Unpack to temp dir
|
|
task::spawn_blocking({
|
|
let extraction_directory_path = extraction_directory_path.clone();
|
|
|
|
move || -> Result<()> {
|
|
let mut archive = zip::ZipArchive::new(std::io::BufReader::new(archive_file))?;
|
|
archive.extract(&extraction_directory_path)?;
|
|
Ok(())
|
|
}
|
|
})
|
|
.await?;
|
|
|
|
// Load configuration
|
|
let site_config = if let Some(c) = read_site_config(&extraction_directory_path).await? {
|
|
c
|
|
} else {
|
|
error!(
|
|
"The configuration file of {domain} ({}) is invalid. The new version will be ignored.",
|
|
current_version.id
|
|
);
|
|
return Ok(());
|
|
};
|
|
|
|
// Update and write state
|
|
let current_version = current_version.clone();
|
|
let old_active_version = state.active_version.replace(current_version);
|
|
state.write_to_site_directory(&site.path).await?;
|
|
|
|
// Update Caddy configuration
|
|
info!("Download for {domain} successful, now updating proxy configuration…");
|
|
caddy_controller
|
|
.upsert_site_configuration(&domain, &extraction_directory_path, &site_config)
|
|
.await?;
|
|
|
|
// Cleanup
|
|
fs::remove_file(archive_file_path.as_std_path()).await?;
|
|
if let Some(old_active_version) = old_active_version {
|
|
let _ = fs::remove_dir_all(site.path.join(old_active_version.id).into_std_path_buf()).await;
|
|
}
|
|
|
|
info!("Cleanup finished for {domain}");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn sites_worker(download_tasks_receiver: async_std::channel::Receiver<String>, sites: Arc<Sites>, mut caddy_controller: CaddyController) -> Result<!> {
|
|
let ureq_agent = Arc::new(ureq::Agent::new_with_config(
|
|
ureq::Agent::config_builder()
|
|
.timeout_resolve(Some(Duration::from_secs(10)))
|
|
.timeout_connect(Some(Duration::from_secs(30)))
|
|
.timeout_global(Some(Duration::from_mins(15)))
|
|
.build(),
|
|
));
|
|
|
|
loop {
|
|
let domain = download_tasks_receiver.recv().await.unwrap();
|
|
handle_download(Arc::clone(&ureq_agent), &sites, &mut caddy_controller, domain).await?;
|
|
}
|
|
}
|
|
|
|
pub async fn start_sites_worker(sites_directory_path: Utf8PathBuf, mut caddy_controller: CaddyController) -> Result<(SitesWorker, JoinHandle<Result<!>>)> {
|
|
let (download_tasks_sender, download_tasks_receiver) = async_std::channel::unbounded();
|
|
|
|
info!("Discovering managed sites…");
|
|
let sites = Arc::new(load_sites(&sites_directory_path).await?);
|
|
let mut total_count = 0;
|
|
let mut outdated_count = 0;
|
|
|
|
for (_, site) in sites.read().await.iter() {
|
|
let state = site.state.read().await;
|
|
|
|
if let Some(current_version) = &state.current_version {
|
|
let mut is_outdated = true;
|
|
total_count += 1;
|
|
|
|
if let Some(active_version) = &state.active_version {
|
|
is_outdated = current_version.id != active_version.id;
|
|
|
|
let content_path = &site.path.join(&active_version.id);
|
|
if let Some(site_config) = read_site_config(&content_path).await? {
|
|
caddy_controller.upsert_site_configuration(&site.domain, content_path, &site_config).await?;
|
|
} else {
|
|
error!("The configuration file of {} ({}) is invalid.", site.domain, current_version.id);
|
|
};
|
|
}
|
|
|
|
if is_outdated {
|
|
outdated_count += 1;
|
|
download_tasks_sender.send(site.domain.clone()).await?;
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("Discovered {total_count} site(s), {outdated_count} outdated.");
|
|
|
|
let join_handle = task::spawn(sites_worker(download_tasks_receiver, Arc::clone(&sites), caddy_controller));
|
|
|
|
Ok((
|
|
SitesWorker {
|
|
sites,
|
|
sites_directory_path,
|
|
download_tasks_sender,
|
|
},
|
|
join_handle,
|
|
))
|
|
}
|