use crate::options::structs::{CustomError, Files}; use super::prcs::{is_active, is_frozen}; use inotify::{EventMask, Inotify, WatchMask}; use std::borrow::BorrowMut; use std::path::Path; use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender as Sender; use tokio::time::Duration; use crate::options::structs::Events; use async_trait::async_trait; pub mod v2 { use log::{error, info, warn}; use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; use super::*; use std::{collections::HashMap, path::Path}; type MpscSender = Arc>; type EventHandlers = HashMap, (Triggers, MpscSender)>; #[derive(Debug)] enum FileState { Ok, NotFound, } #[derive(Debug)] pub struct FilesController { name : Arc, path : String, code_name : Arc, state : FileState, watcher : Option, triggers : EventHandlers, } impl PartialEq for FilesController { fn eq(&self, other: &Self) -> bool { self.code_name == other.code_name } } impl FilesController { #[inline(always)] pub fn new(name: &str, triggers: EventHandlers) -> FilesController { let name: Arc = Arc::from(name); Self { name : name.clone(), path : String::new(), state : FileState::Ok, watcher : None, triggers, code_name : name.clone(), } } #[inline(always)] pub fn with_path(mut self, path: impl AsRef) -> anyhow::Result { self.path = path.as_ref().to_string_lossy().into_owned(); self.watcher = { match create_watcher(&self.name, &self.path) { Ok(val) => Some(val), Err(er) => { error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); return Err(er) } } }; self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name)); Ok(self) } pub fn add_event(&mut self, file_controller : FilesController) { for (k, v) in file_controller.triggers { self.triggers.entry(k).or_insert(v); } } async fn trigger_on(&mut self, trigger_type: Option) { for (prc_name, (triggers, channel)) in &self.triggers { let msg = match &trigger_type { None => { Events::Positive(self.code_name.clone()) }, Some(event) => { info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name); event.event_from_file_trigger_controller(self.code_name.clone(), &triggers) }, }; let _ = channel.send(msg).await; } } } #[async_trait] impl ProcessUnit for FilesController { async fn process(&mut self) { // polling file check // 1) existing check // dbg!(&self); if let Ok(_) = check_file(&self.name, &self.path).await { if let FileState::NotFound = self.state { info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name); self.state = FileState::Ok; // reseting negative outcome in prc self.trigger_on(None).await; } match &mut self.watcher { Some(notify) => { let mut buffer = [0; 128]; if let Ok(notif_events) = notify.read_events(&mut buffer) { let (need_to_recreate, was_modifired) = notif_events.fold((false, false), |(a, b), mask| { ( a || mask.mask == EventMask::DELETE_SELF, b || mask.mask == EventMask::MODIFY, ) }); if let (recreate_watcher, true) = (need_to_recreate, was_modifired) { warn!("File {} ({}) was changed", self.name, &self.path); if recreate_watcher { self.watcher = match create_watcher(&self.name, &self.path) { Ok(notifier) => Some(notifier), Err(er) => { error!("Failed to recreate watcher for {} ({}) due to {}", self.name, &self.path, er ); None }, } } self.trigger_on(Some(FileTriggerType::OnChange)).await; return; } } }, None => { /* DEAD END */}, } } else { if let FileState::Ok = self.state { warn!("File {} ({}) was not found in determined scope", self.name, &self.path); self.state = FileState::NotFound; self.trigger_on(Some(FileTriggerType::OnDelete)).await; } return; } self.trigger_on(None).await; // 2) change check } } } /// # Fn `create_watcher` /// ## for creating watcher on file's delete | update events /// /// *input* : `&str`, `&str` /// /// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction /// /// *initiator* : fn `file_handler`, fn `utils::run_daemons` /// /// *managing* : current file's name: &str, path in local storage to current file: &str /// /// *depends on* : - /// pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result { let src = format!("{}{}", path, filename); let inotify: Inotify = Inotify::init()?; inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; Ok(inotify) } /// # Fn `create_watcher` /// ## for managing processes by checking dep files' states /// /// *input* : `&str`, `&[Files]`, `Arc>`, `Arc>>` /// /// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check /// /// *initiator* : fn `utils::running_handler` /// /// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` /// /// *depends on* : Files /// pub async fn file_handler( name: &str, files: &[Files], tx: Arc>, watchers: Arc>>, ) -> anyhow::Result<()> { for (i, file) in files.iter().enumerate() { // let src = format!("{}{}", file.src, file.filename); if check_file(&file.filename, &file.src).await.is_err() { if !is_active(name).await || is_frozen(name).await { return Err(anyhow::Error::msg("Process is frozen or stopped")); } match file.triggers.on_delete.as_str() { "stay" => { tx.send(9).await.unwrap(); continue; } "stop" => { if is_active(name).await { tx.send(1).await.unwrap(); } return Err(anyhow::Error::msg("Process was stopped")); } "hold" => { if is_active(name).await { tx.send(2).await.unwrap(); return Err(anyhow::Error::msg("Process was frozen")); } } _ => { tokio::time::sleep(Duration::from_millis(50)).await; tx.send(101).await.unwrap(); return Err(anyhow::Error::msg("Impermissible character or word in file trigger")); } } } else if is_active(name).await && !is_frozen(name).await { let watchers = watchers.clone(); // println!("mutex: {:?}", watchers); let mut buffer = [0; 128]; let mut mutex_guard = watchers.lock().await; if let Some(notify) = mutex_guard.get_mut(i) { let events = notify.read_events(&mut buffer); // println!("{:?}", events); if events.is_ok() { let events: Vec = events .unwrap() .map(|mask| mask.mask) .filter(|mask| { *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF }) .collect(); for event in events { if let EventMask::DELETE_SELF = event { // ! warning (DELETE_SELF event) ! // println!("! warning (DELETE_SELF event) !"); // * watcher recreation after dealing with file recreation mechanism in text editors let mutex = notify.borrow_mut(); // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); if let Ok(watcher) = create_watcher(&file.filename, &file.src) { *mutex = watcher; } } match file.triggers.on_change.as_str() { "stop" => { let _ = tx.send(7).await; } "restart" => { let _ = tx.send(8).await; } "stay" => { let _ = tx.send(9).await; } _ => { let _ = tx.send(101).await; } } } } } } } tokio::task::yield_now().await; Ok(()) } /// # Fn `check_file` /// ## for checking existance of current file /// /// *input* : `&str`, `&str` /// /// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error /// /// *initiator* : fn `file_handler` /// /// *managing* : current file's name: `&str` and current file's path in local storage: `&str` /// /// *depends on* : network activity /// pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let arc_name = Arc::new(filename.to_string()); let arc_path = Arc::new(path.to_string()); tokio::task::spawn_blocking(move || { let file_concat = format!("{}{}", arc_path, arc_name); let path = Path::new(&file_concat); if path.exists() { Ok(()) } else { Err(CustomError::Fatal) } }) .await .unwrap_or_else(|_| { panic!("Corrupted while file check process"); }) } #[cfg(test)] mod files_unittests { use super::*; #[tokio::test] async fn try_to_create_watcher() { let res = create_watcher("dep-file", "./tests/examples/"); assert!(res.is_ok()); } #[tokio::test] async fn try_to_create_invalid_watcher() { let res = create_watcher("invalid-file", "/path/to/the/no/dir"); assert!(res.is_err()); } #[tokio::test] async fn check_existing_file() { let res = check_file("dep-file", "./tests/examples/").await; assert!(res.is_ok()); } #[tokio::test] async fn check_non_existing_file() { let res = check_file("invalid-file", "/path/to/the/no/dir").await; assert!(res.is_err()); } }