monitor/noxis-rs/src/utils/files.rs

294 lines
11 KiB
Rust

use crate::options::structs::CustomError;
use crate::options::structs::Events;
use async_trait::async_trait;
use inotify::{EventMask, Inotify, WatchMask};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
pub mod v2 {
use super::*;
use crate::options::structs::{
FileTriggerType, FileTriggersForController as Triggers, ProcessUnit,
};
use log::{error, info, warn};
use serde::Serialize;
use std::{collections::HashMap, path::Path};
type MpscSender = Arc<Sender<Events>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
#[derive(Debug, Serialize, Clone, Copy)]
pub enum FileState {
Ok,
NotFound,
}
#[derive(Debug)]
pub struct FilesController {
name: Arc<str>,
path: String,
code_name: Arc<str>,
backup_file : String,
state: FileState,
watcher: Option<Inotify>,
triggers: EventHandlers,
}
impl PartialEq for FilesController {
fn eq(&self, other: &Self) -> bool {
self.code_name == other.code_name
}
}
impl FilesController {
#[inline(always)]
pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
let name: Arc<str> = Arc::from(name);
Self {
name: name.clone(),
path: String::new(),
state: FileState::Ok,
watcher: None,
triggers,
code_name: name.clone(),
backup_file: String::new(),
}
}
#[inline(always)]
pub fn with_path(mut self, path: impl AsRef<Path>, backup : String) -> anyhow::Result<FilesController> {
self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = {
match create_watcher(&self.name, &self.path) {
Ok(val) => Some(val),
Err(er) => {
error!(
"Cannot create watcher for {} ({}) due to {}",
self.name, &self.path, er
);
return Err(er);
}
}
};
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
self.backup_file = backup;
match create_backup(&self.code_name, &self.backup_file) {
Ok(_) => info!("Backup file for {} was created ({})", &self.code_name, &self.backup_file),
Err(er) => warn!("{}. Ignoring ...", er),
}
Ok(self)
}
pub fn add_event(&mut self, file_controller: FilesController) {
for (k, v) in file_controller.triggers {
self.triggers.entry(k).or_insert(v);
}
}
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
for (prc_name, (triggers, channel)) in &self.triggers {
let msg = match &trigger_type {
None => Events::Positive(self.code_name.clone()),
Some(event) => {
info!(
"Event on file {} ({}) : {}. Notifying `{}` ...",
&self.name, &self.path, event, &prc_name
);
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
}
};
let _ = channel.send(msg).await;
}
}
pub fn get_state(&self) -> FileState {
self.state
}
pub fn get_code_name(&self) -> Arc<str> {
self.code_name.clone()
}
pub fn get_backup_file(&self) -> String {
self.backup_file.to_string()
}
}
#[async_trait]
impl ProcessUnit for FilesController {
async fn process(&mut self) {
if let Ok(_) = check_file(&self.name, &self.path).await {
if let FileState::NotFound = self.state {
info!(
"File {} ({}) was found in determined scope. Notifying ...",
self.name, self.code_name
);
self.state = FileState::Ok;
self.trigger_on(None).await;
}
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 128];
if let Ok(notif_events) = notify.read_events(&mut buffer) {
let (need_to_recreate, was_modifired) =
notif_events.fold((false, false), |(a, b), mask| {
(
a || mask.mask == EventMask::DELETE_SELF,
b || mask.mask == EventMask::MODIFY,
)
}
);
if self.backup_file.is_empty() {
} else {
}
if let (mut recreate_watcher, true) = (need_to_recreate, was_modifired) {
if self.backup_file.is_empty() {
warn!("File {} ({}) was changed", self.name, &self.path);
self.trigger_on(Some(FileTriggerType::OnChange)).await;
} else {
recreate_watcher = true;
match restore_file(&self.code_name, &self.backup_file).await {
Ok(_) => info!("File {} was successfully restored", &self.code_name),
Err(er) => error!("Cannot restore file {} : {}", &self.code_name, er),
}
}
if recreate_watcher {
self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier),
Err(er) => {
error!(
"Failed to recreate watcher for {} ({}) due to {}",
self.name, &self.path, er
);
None
}
}
}
}
}
}
None => return,
}
} else {
if let FileState::Ok = self.state {
if self.backup_file.is_empty() {
warn!(
"File {} ({}) was not found in determined scope",
self.name, &self.path
);
self.state = FileState::NotFound;
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
} else {
warn!(
"File {} ({}) was not found in determined scope. Restoring from backup-file ...",
self.name, &self.path
);
match restore_file(&self.code_name, &self.backup_file).await {
Err(er) => error!("Cannot restore file {} : {}", &self.code_name, er),
Ok(_) => {
info!("File {} was successfully restored", &self.code_name);
self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier),
Err(er) => {
error!(
"Failed to recreate watcher for {} ({}) : {}",
self.name, &self.path, er
);
None
}
}
},
}
}
}
return;
}
self.trigger_on(None).await;
}
}
}
pub fn create_backup(target: &str, backup: &str) -> anyhow::Result<u64> {
return if !backup.is_empty() {
Ok(std::fs::copy(target, backup)?)
} else {
Err(anyhow::Error::msg(format!("No need to create backup-file for {}", target)))
}
}
pub async fn restore_file(target: &str, backup: &str) -> anyhow::Result<u64> {
Ok(tokio::fs::copy(backup, target).await?)
}
/// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events
///
/// *input* : `&str`, `&str`
///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
///
/// *managing* : current file's name: &str, path in local storage to current file: &str
///
/// *depends on* : -
///
pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify)
}
/// # Fn `check_file`
/// ## for checking existance of current file
///
/// *input* : `&str`, `&str`
///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
///
/// *initiator* : fn `file_handler`
///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
///
/// *depends on* : network activity
///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || {
let file_concat = format!("{}{}", arc_path, arc_name);
let path = Path::new(&file_concat);
if path.exists() {
Ok(())
} else {
Err(CustomError::Fatal)
}
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
}
#[cfg(test)]
mod files_unittests {
use super::*;
#[tokio::test]
async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/");
assert!(res.is_ok());
}
#[tokio::test]
async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir");
assert!(res.is_err());
}
#[tokio::test]
async fn check_existing_file() {
let res = check_file("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn check_non_existing_file() {
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
}