From c3fd0dd09f20573092926eaf1b9cb1ca4a899172 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 22 Apr 2025 11:22:07 -0400 Subject: [PATCH] files logic fixed --- noxis-rs/src/utils/files.rs | 546 ++++++++++++++++++------------------ 1 file changed, 276 insertions(+), 270 deletions(-) diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index 8ba72f7..fad24f0 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -1,286 +1,292 @@ -use crate::options::structs::{CustomError, Files}; -use super::prcs::{is_active, is_frozen}; -use inotify::{EventMask, Inotify, WatchMask}; -use std::borrow::BorrowMut; -use std::path::Path; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::sync::mpsc::Sender as MpscSender; -use tokio::time::Duration; -use crate::options::structs::Events; + use crate::options::structs::{CustomError, Files}; + use super::prcs::{is_active, is_frozen}; + use inotify::{EventMask, Inotify, WatchMask}; + use std::borrow::BorrowMut; + use std::path::Path; + use std::sync::Arc; + use tokio::sync::mpsc; + use tokio::sync::mpsc::Sender as Sender; + use tokio::time::Duration; + use crate::options::structs::Events; -pub mod v2 { - use log::{error, info, warn}; + pub mod v2 { + use log::{error, info, warn}; - // use std::collections::HashMap; - use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; - use super::*; - use std::{collections::HashMap, path::Path}; + // use std::collections::HashMap; + use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; + use super::*; + use std::{collections::HashMap, path::Path}; - // type EventHandlers<'a> = HashMap - type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender>)>; + type MpscSender<'a> = Arc>>; + // type EventHandlers<'a> = HashMap + type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; - struct FilesController<'a> { - name: &'a str, - path: String, - watcher: Option, - // obj: Arc, - triggers: EventHandlers<'a>, - } + struct FilesController<'a> { + name : &'a str, + path : String, + watcher : Option, + // obj: Arc, + triggers : EventHandlers<'a>, + code_name : String, + } - impl<'a> FilesController<'a> { - pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> { - Self { - name, - path : String::new(), - watcher: None, - triggers, + impl<'a> FilesController<'a> { + pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> { + Self { + name, + path : String::new(), + watcher: None, + triggers, + code_name : name.to_string(), + } + } + pub async fn with_path(&mut self, path: impl AsRef) -> anyhow::Result<()> { + self.path = path.as_ref().to_string_lossy().into_owned(); + self.watcher = { + match create_watcher(self.name, &self.path).await { + Ok(val) => Some(val), + Err(er) => { + error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); + return Err(er) + } + } + }; + self.code_name = format!("{}{}", &self.path, &self.code_name); + Ok(()) + } + async fn trigger_on(&'a mut self, trigger_type: Option) { + let _ = self.triggers.iter() + .map(|(prc_name, (triggers, channel))| async { + let _ = channel.send({ + match &trigger_type { + None => { + Events::Positive(&self.code_name) + }, + Some(event) => { + info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name); + event.event_from_file_trigger_controller(self.name, triggers) + }, + } + }).await; + }); } } - pub async fn with_path(&mut self, path: impl AsRef) -> anyhow::Result<()> { - self.path = path.as_ref().to_string_lossy().into_owned(); - self.watcher = { - match create_watcher(self.name, &self.path).await { - Ok(val) => Some(val), - Err(er) => { - error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); - return Err(er) - } - } - }; - Ok(()) - } - async fn trigger_on(&mut self, trigger_type: Option) { - let _ = self.triggers.iter() - .map(|(prc_name, (triggers, channel))| async { - let _ = channel.send({ - match &trigger_type { - None => Events::Positive(self.name), - Some(event) => { - info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name); - event.event_from_file_trigger_controller(self.name, triggers) - }, - } - }).await; - }); - } - } - impl<'a> ProcessUnit<'a> for FilesController<'a> { - async fn process(&mut self) { - // polling file check - // 1) existing check - if let Ok(_) = check_file(self.name, &self.path).await { - match &mut self.watcher { - Some(notify) => { - let mut buffer = [0; 1024]; - if let Ok(mut notif_events) = notify.read_events(&mut buffer) { - if let (recreate_watcher, true) = ( - notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF), - notif_events.any(|mask| mask.mask == EventMask::MODIFY) - ) { - warn!("File {} ({}) was changed", self.name, &self.path); - if recreate_watcher { - self.watcher = match create_watcher(self.name, &self.path).await { - Ok(notifier) => Some(notifier), - Err(er) => { - error!("Failed to recreate watcher for {} ({}) due to {}", - self.name, - &self.path, - er - ); - None - }, + impl<'a> ProcessUnit<'a> for FilesController<'a> { + async fn process(&'a mut self) { + // polling file check + // 1) existing check + if let Ok(_) = check_file(self.name, &self.path).await { + match &mut self.watcher { + Some(notify) => { + let mut buffer = [0; 1024]; + if let Ok(mut notif_events) = notify.read_events(&mut buffer) { + if let (recreate_watcher, true) = ( + notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF), + notif_events.any(|mask| mask.mask == EventMask::MODIFY) + ) { + warn!("File {} ({}) was changed", self.name, &self.path); + if recreate_watcher { + self.watcher = match create_watcher(self.name, &self.path).await { + Ok(notifier) => Some(notifier), + Err(er) => { + error!("Failed to recreate watcher for {} ({}) due to {}", + self.name, + &self.path, + er + ); + None + }, + } } + self.trigger_on(Some(FileTriggerType::OnChange)).await; + return; + } + } + }, + None => { /* DEAD END */}, + } + } else { + warn!("File {} ({}) was not found in determined scope", self.name, &self.path); + self.trigger_on(Some(FileTriggerType::OnDelete)).await; + return; + } + self.trigger_on(None).await; + // 2) change check + } + } + } + + /// # Fn `create_watcher` + /// ## for creating watcher on file's delete | update events + /// + /// *input* : `&str`, `&str` + /// + /// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction + /// + /// *initiator* : fn `file_handler`, fn `utils::run_daemons` + /// + /// *managing* : current file's name: &str, path in local storage to current file: &str + /// + /// *depends on* : - + /// + pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result { + let src = format!("{}{}", path, filename); + let inotify: Inotify = Inotify::init()?; + inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; + Ok(inotify) + } + + /// # Fn `create_watcher` + /// ## for managing processes by checking dep files' states + /// + /// *input* : `&str`, `&[Files]`, `Arc>`, `Arc>>` + /// + /// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check + /// + /// *initiator* : fn `utils::running_handler` + /// + /// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` + /// + /// *depends on* : Files + /// + pub async fn file_handler( + name: &str, + files: &[Files], + tx: Arc>, + watchers: Arc>>, + ) -> anyhow::Result<()> { + for (i, file) in files.iter().enumerate() { + // let src = format!("{}{}", file.src, file.filename); + if check_file(&file.filename, &file.src).await.is_err() { + if !is_active(name).await || is_frozen(name).await { + return Err(anyhow::Error::msg("Process is frozen or stopped")); + } + match file.triggers.on_delete.as_str() { + "stay" => { + tx.send(9).await.unwrap(); + continue; + } + "stop" => { + if is_active(name).await { + tx.send(1).await.unwrap(); + } + return Err(anyhow::Error::msg("Process was stopped")); + } + "hold" => { + if is_active(name).await { + tx.send(2).await.unwrap(); + return Err(anyhow::Error::msg("Process was frozen")); + } + } + _ => { + tokio::time::sleep(Duration::from_millis(50)).await; + tx.send(101).await.unwrap(); + return Err(anyhow::Error::msg("Impermissible character or word in file trigger")); + } + } + } else if is_active(name).await && !is_frozen(name).await { + let watchers = watchers.clone(); + // println!("mutex: {:?}", watchers); + let mut buffer = [0; 128]; + let mut mutex_guard = watchers.lock().await; + if let Some(notify) = mutex_guard.get_mut(i) { + let events = notify.read_events(&mut buffer); + // println!("{:?}", events); + if events.is_ok() { + let events: Vec = events + .unwrap() + .map(|mask| mask.mask) + .filter(|mask| { + *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF + }) + .collect(); + for event in events { + if let EventMask::DELETE_SELF = event { + // ! warning (DELETE_SELF event) ! + // println!("! warning (DELETE_SELF event) !"); + // * watcher recreation after dealing with file recreation mechanism in text editors + let mutex = notify.borrow_mut(); + + // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); + if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { + *mutex = watcher; + } + } + match file.triggers.on_change.as_str() { + "stop" => { + let _ = tx.send(7).await; + } + "restart" => { + let _ = tx.send(8).await; + } + "stay" => { + let _ = tx.send(9).await; + } + _ => { + let _ = tx.send(101).await; } - self.trigger_on(Some(FileTriggerType::OnChange)).await; - return; } } - }, - None => { /* DEAD END */}, + } } + } + } + tokio::task::yield_now().await; + Ok(()) + } + + /// # Fn `check_file` + /// ## for checking existance of current file + /// + /// *input* : `&str`, `&str` + /// + /// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error + /// + /// *initiator* : fn `file_handler` + /// + /// *managing* : current file's name: `&str` and current file's path in local storage: `&str` + /// + /// *depends on* : network activity + /// + pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { + let arc_name = Arc::new(filename.to_string()); + let arc_path = Arc::new(path.to_string()); + tokio::task::spawn_blocking(move || { + let file_concat = format!("{}{}", arc_path, arc_name); + let path = Path::new(&file_concat); + if path.exists() { + Ok(()) } else { - warn!("File {} ({}) was not found in determined scope", self.name, &self.path); - self.trigger_on(Some(FileTriggerType::OnDelete)).await; - return; + Err(CustomError::Fatal) } - self.trigger_on(None).await; - // 2) change check + }) + .await + .unwrap_or_else(|_| { + panic!("Corrupted while file check process"); + }) + } + + #[cfg(test)] + mod files_unittests { + use super::*; + #[tokio::test] + async fn try_to_create_watcher() { + let res = create_watcher("dep-file", "./tests/examples/").await; + assert!(res.is_ok()); + } + #[tokio::test] + async fn try_to_create_invalid_watcher() { + let res = create_watcher("invalid-file", "/path/to/the/no/dir").await; + assert!(res.is_err()); + } + #[tokio::test] + async fn check_existing_file() { + let res = check_file("dep-file", "./tests/examples/").await; + assert!(res.is_ok()); + } + #[tokio::test] + async fn check_non_existing_file() { + let res = check_file("invalid-file", "/path/to/the/no/dir").await; + assert!(res.is_err()); } } -} - -/// # Fn `create_watcher` -/// ## for creating watcher on file's delete | update events -/// -/// *input* : `&str`, `&str` -/// -/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction -/// -/// *initiator* : fn `file_handler`, fn `utils::run_daemons` -/// -/// *managing* : current file's name: &str, path in local storage to current file: &str -/// -/// *depends on* : - -/// -pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result { - let src = format!("{}{}", path, filename); - let inotify: Inotify = Inotify::init()?; - inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; - Ok(inotify) -} - -/// # Fn `create_watcher` -/// ## for managing processes by checking dep files' states -/// -/// *input* : `&str`, `&[Files]`, `Arc>`, `Arc>>` -/// -/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check -/// -/// *initiator* : fn `utils::running_handler` -/// -/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` -/// -/// *depends on* : Files -/// -pub async fn file_handler( - name: &str, - files: &[Files], - tx: Arc>, - watchers: Arc>>, -) -> anyhow::Result<()> { - for (i, file) in files.iter().enumerate() { - // let src = format!("{}{}", file.src, file.filename); - if check_file(&file.filename, &file.src).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(anyhow::Error::msg("Process is frozen or stopped")); - } - match file.triggers.on_delete.as_str() { - "stay" => { - tx.send(9).await.unwrap(); - continue; - } - "stop" => { - if is_active(name).await { - tx.send(1).await.unwrap(); - } - return Err(anyhow::Error::msg("Process was stopped")); - } - "hold" => { - if is_active(name).await { - tx.send(2).await.unwrap(); - return Err(anyhow::Error::msg("Process was frozen")); - } - } - _ => { - tokio::time::sleep(Duration::from_millis(50)).await; - tx.send(101).await.unwrap(); - return Err(anyhow::Error::msg("Impermissible character or word in file trigger")); - } - } - } else if is_active(name).await && !is_frozen(name).await { - let watchers = watchers.clone(); - // println!("mutex: {:?}", watchers); - let mut buffer = [0; 128]; - let mut mutex_guard = watchers.lock().await; - if let Some(notify) = mutex_guard.get_mut(i) { - let events = notify.read_events(&mut buffer); - // println!("{:?}", events); - if events.is_ok() { - let events: Vec = events - .unwrap() - .map(|mask| mask.mask) - .filter(|mask| { - *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF - }) - .collect(); - for event in events { - if let EventMask::DELETE_SELF = event { - // ! warning (DELETE_SELF event) ! - // println!("! warning (DELETE_SELF event) !"); - // * watcher recreation after dealing with file recreation mechanism in text editors - let mutex = notify.borrow_mut(); - - // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); - if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { - *mutex = watcher; - } - } - match file.triggers.on_change.as_str() { - "stop" => { - let _ = tx.send(7).await; - } - "restart" => { - let _ = tx.send(8).await; - } - "stay" => { - let _ = tx.send(9).await; - } - _ => { - let _ = tx.send(101).await; - } - } - } - } - } - } - } - tokio::task::yield_now().await; - Ok(()) -} - -/// # Fn `check_file` -/// ## for checking existance of current file -/// -/// *input* : `&str`, `&str` -/// -/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error -/// -/// *initiator* : fn `file_handler` -/// -/// *managing* : current file's name: `&str` and current file's path in local storage: `&str` -/// -/// *depends on* : network activity -/// -pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { - let arc_name = Arc::new(filename.to_string()); - let arc_path = Arc::new(path.to_string()); - tokio::task::spawn_blocking(move || { - let file_concat = format!("{}{}", arc_path, arc_name); - let path = Path::new(&file_concat); - if path.exists() { - Ok(()) - } else { - Err(CustomError::Fatal) - } - }) - .await - .unwrap_or_else(|_| { - panic!("Corrupted while file check process"); - }) -} - -#[cfg(test)] -mod files_unittests { - use super::*; - #[tokio::test] - async fn try_to_create_watcher() { - let res = create_watcher("dep-file", "./tests/examples/").await; - assert!(res.is_ok()); - } - #[tokio::test] - async fn try_to_create_invalid_watcher() { - let res = create_watcher("invalid-file", "/path/to/the/no/dir").await; - assert!(res.is_err()); - } - #[tokio::test] - async fn check_existing_file() { - let res = check_file("dep-file", "./tests/examples/").await; - assert!(res.is_ok()); - } - #[tokio::test] - async fn check_non_existing_file() { - let res = check_file("invalid-file", "/path/to/the/no/dir").await; - assert!(res.is_err()); - } -}