files logic fixed

feature/configv2
prplV 2025-04-22 11:22:07 -04:00
parent 502ea114a6
commit c3fd0dd09f
1 changed files with 276 additions and 270 deletions

View File

@ -1,286 +1,292 @@
use crate::options::structs::{CustomError, Files};
use super::prcs::{is_active, is_frozen};
use inotify::{EventMask, Inotify, WatchMask};
use std::borrow::BorrowMut;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender as MpscSender;
use tokio::time::Duration;
use crate::options::structs::Events;
use crate::options::structs::{CustomError, Files};
use super::prcs::{is_active, is_frozen};
use inotify::{EventMask, Inotify, WatchMask};
use std::borrow::BorrowMut;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender as Sender;
use tokio::time::Duration;
use crate::options::structs::Events;
pub mod v2 {
use log::{error, info, warn};
pub mod v2 {
use log::{error, info, warn};
// use std::collections::HashMap;
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*;
use std::{collections::HashMap, path::Path};
// use std::collections::HashMap;
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*;
use std::{collections::HashMap, path::Path};
// type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<Events<'a>>)>;
type MpscSender<'a> = Arc<Sender<Events<'a>>>;
// type EventHandlers<'a> = HashMap<service name, sender object>
type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>;
struct FilesController<'a> {
name: &'a str,
path: String,
watcher: Option<Inotify>,
// obj: Arc<Files>,
triggers: EventHandlers<'a>,
}
struct FilesController<'a> {
name : &'a str,
path : String,
watcher : Option<Inotify>,
// obj: Arc<Files>,
triggers : EventHandlers<'a>,
code_name : String,
}
impl<'a> FilesController<'a> {
pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
Self {
name,
path : String::new(),
watcher: None,
triggers,
impl<'a> FilesController<'a> {
pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> {
Self {
name,
path : String::new(),
watcher: None,
triggers,
code_name : name.to_string(),
}
}
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> {
self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = {
match create_watcher(self.name, &self.path).await {
Ok(val) => Some(val),
Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
return Err(er)
}
}
};
self.code_name = format!("{}{}", &self.path, &self.code_name);
Ok(())
}
async fn trigger_on(&'a mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter()
.map(|(prc_name, (triggers, channel))| async {
let _ = channel.send({
match &trigger_type {
None => {
Events::Positive(&self.code_name)
},
Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
event.event_from_file_trigger_controller(self.name, triggers)
},
}
}).await;
});
}
}
pub async fn with_path(&mut self, path: impl AsRef<Path>) -> anyhow::Result<()> {
self.path = path.as_ref().to_string_lossy().into_owned();
self.watcher = {
match create_watcher(self.name, &self.path).await {
Ok(val) => Some(val),
Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er);
return Err(er)
}
}
};
Ok(())
}
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
let _ = self.triggers.iter()
.map(|(prc_name, (triggers, channel))| async {
let _ = channel.send({
match &trigger_type {
None => Events::Positive(self.name),
Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", self.name, &self.path, event, *prc_name);
event.event_from_file_trigger_controller(self.name, triggers)
},
}
}).await;
});
}
}
impl<'a> ProcessUnit<'a> for FilesController<'a> {
async fn process(&mut self) {
// polling file check
// 1) existing check
if let Ok(_) = check_file(self.name, &self.path).await {
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 1024];
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
if let (recreate_watcher, true) = (
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
) {
warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher {
self.watcher = match create_watcher(self.name, &self.path).await {
Ok(notifier) => Some(notifier),
Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}",
self.name,
&self.path,
er
);
None
},
impl<'a> ProcessUnit<'a> for FilesController<'a> {
async fn process(&'a mut self) {
// polling file check
// 1) existing check
if let Ok(_) = check_file(self.name, &self.path).await {
match &mut self.watcher {
Some(notify) => {
let mut buffer = [0; 1024];
if let Ok(mut notif_events) = notify.read_events(&mut buffer) {
if let (recreate_watcher, true) = (
notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF),
notif_events.any(|mask| mask.mask == EventMask::MODIFY)
) {
warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher {
self.watcher = match create_watcher(self.name, &self.path).await {
Ok(notifier) => Some(notifier),
Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}",
self.name,
&self.path,
er
);
None
},
}
}
self.trigger_on(Some(FileTriggerType::OnChange)).await;
return;
}
}
},
None => { /* DEAD END */},
}
} else {
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
return;
}
self.trigger_on(None).await;
// 2) change check
}
}
}
/// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events
///
/// *input* : `&str`, `&str`
///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
///
/// *managing* : current file's name: &str, path in local storage to current file: &str
///
/// *depends on* : -
///
pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify)
}
/// # Fn `create_watcher`
/// ## for managing processes by checking dep files' states
///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *depends on* : Files
///
pub async fn file_handler(
name: &str,
files: &[Files],
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> anyhow::Result<()> {
for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(anyhow::Error::msg("Process is frozen or stopped"));
}
match file.triggers.on_delete.as_str() {
"stay" => {
tx.send(9).await.unwrap();
continue;
}
"stop" => {
if is_active(name).await {
tx.send(1).await.unwrap();
}
return Err(anyhow::Error::msg("Process was stopped"));
}
"hold" => {
if is_active(name).await {
tx.send(2).await.unwrap();
return Err(anyhow::Error::msg("Process was frozen"));
}
}
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
}
}
} else if is_active(name).await && !is_frozen(name).await {
let watchers = watchers.clone();
// println!("mutex: {:?}", watchers);
let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().await;
if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer);
// println!("{:?}", events);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
for event in events {
if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
*mutex = watcher;
}
}
match file.triggers.on_change.as_str() {
"stop" => {
let _ = tx.send(7).await;
}
"restart" => {
let _ = tx.send(8).await;
}
"stay" => {
let _ = tx.send(9).await;
}
_ => {
let _ = tx.send(101).await;
}
self.trigger_on(Some(FileTriggerType::OnChange)).await;
return;
}
}
},
None => { /* DEAD END */},
}
}
}
}
tokio::task::yield_now().await;
Ok(())
}
/// # Fn `check_file`
/// ## for checking existance of current file
///
/// *input* : `&str`, `&str`
///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
///
/// *initiator* : fn `file_handler`
///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
///
/// *depends on* : network activity
///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || {
let file_concat = format!("{}{}", arc_path, arc_name);
let path = Path::new(&file_concat);
if path.exists() {
Ok(())
} else {
warn!("File {} ({}) was not found in determined scope", self.name, &self.path);
self.trigger_on(Some(FileTriggerType::OnDelete)).await;
return;
Err(CustomError::Fatal)
}
self.trigger_on(None).await;
// 2) change check
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
}
#[cfg(test)]
mod files_unittests {
use super::*;
#[tokio::test]
async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
#[tokio::test]
async fn check_existing_file() {
let res = check_file("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn check_non_existing_file() {
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
}
}
/// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events
///
/// *input* : `&str`, `&str`
///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons`
///
/// *managing* : current file's name: &str, path in local storage to current file: &str
///
/// *depends on* : -
///
pub async fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify)
}
/// # Fn `create_watcher`
/// ## for managing processes by checking dep files' states
///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *depends on* : Files
///
pub async fn file_handler(
name: &str,
files: &[Files],
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> anyhow::Result<()> {
for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(anyhow::Error::msg("Process is frozen or stopped"));
}
match file.triggers.on_delete.as_str() {
"stay" => {
tx.send(9).await.unwrap();
continue;
}
"stop" => {
if is_active(name).await {
tx.send(1).await.unwrap();
}
return Err(anyhow::Error::msg("Process was stopped"));
}
"hold" => {
if is_active(name).await {
tx.send(2).await.unwrap();
return Err(anyhow::Error::msg("Process was frozen"));
}
}
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
}
}
} else if is_active(name).await && !is_frozen(name).await {
let watchers = watchers.clone();
// println!("mutex: {:?}", watchers);
let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().await;
if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer);
// println!("{:?}", events);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
for event in events {
if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
*mutex = watcher;
}
}
match file.triggers.on_change.as_str() {
"stop" => {
let _ = tx.send(7).await;
}
"restart" => {
let _ = tx.send(8).await;
}
"stay" => {
let _ = tx.send(9).await;
}
_ => {
let _ = tx.send(101).await;
}
}
}
}
}
}
}
tokio::task::yield_now().await;
Ok(())
}
/// # Fn `check_file`
/// ## for checking existance of current file
///
/// *input* : `&str`, `&str`
///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
///
/// *initiator* : fn `file_handler`
///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
///
/// *depends on* : network activity
///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || {
let file_concat = format!("{}{}", arc_path, arc_name);
let path = Path::new(&file_concat);
if path.exists() {
Ok(())
} else {
Err(CustomError::Fatal)
}
})
.await
.unwrap_or_else(|_| {
panic!("Corrupted while file check process");
})
}
#[cfg(test)]
mod files_unittests {
use super::*;
#[tokio::test]
async fn try_to_create_watcher() {
let res = create_watcher("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn try_to_create_invalid_watcher() {
let res = create_watcher("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
#[tokio::test]
async fn check_existing_file() {
let res = check_file("dep-file", "./tests/examples/").await;
assert!(res.is_ok());
}
#[tokio::test]
async fn check_non_existing_file() {
let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err());
}
}