ref files

migrate
prplV 2025-05-29 17:33:21 +03:00
parent b456c7aa18
commit 1c651c93ce
1 changed files with 9 additions and 118 deletions

View File

@ -1,16 +1,12 @@
use crate::options::structs::{CustomError, Files}; use crate::options::structs::CustomError;
use super::prcs::{is_active, is_frozen}; use inotify::{EventMask, Inotify, WatchMask};
use inotify::{EventMask, Inotify, WatchMask}; use std::path::Path;
use std::borrow::BorrowMut; use std::sync::Arc;
use std::path::Path; use tokio::sync::mpsc::Sender as Sender;
use std::sync::Arc; use crate::options::structs::Events;
use tokio::sync::mpsc; use async_trait::async_trait;
use tokio::sync::mpsc::Sender as Sender;
use tokio::time::Duration;
use crate::options::structs::Events;
use async_trait::async_trait;
pub mod v2 { pub mod v2 {
use log::{error, info, warn}; use log::{error, info, warn};
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*; use super::*;
@ -92,14 +88,10 @@
#[async_trait] #[async_trait]
impl ProcessUnit for FilesController { impl ProcessUnit for FilesController {
async fn process(&mut self) { async fn process(&mut self) {
// polling file check
// 1) existing check
// dbg!(&self);
if let Ok(_) = check_file(&self.name, &self.path).await { if let Ok(_) = check_file(&self.name, &self.path).await {
if let FileState::NotFound = self.state { if let FileState::NotFound = self.state {
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name); info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name);
self.state = FileState::Ok; self.state = FileState::Ok;
// reseting negative outcome in prc
self.trigger_on(None).await; self.trigger_on(None).await;
} }
match &mut self.watcher { match &mut self.watcher {
@ -112,7 +104,6 @@
b || mask.mask == EventMask::MODIFY, b || mask.mask == EventMask::MODIFY,
) )
}); });
if let (recreate_watcher, true) = (need_to_recreate, was_modifired) { if let (recreate_watcher, true) = (need_to_recreate, was_modifired) {
warn!("File {} ({}) was changed", self.name, &self.path); warn!("File {} ({}) was changed", self.name, &self.path);
if recreate_watcher { if recreate_watcher {
@ -144,10 +135,9 @@
return; return;
} }
self.trigger_on(None).await; self.trigger_on(None).await;
// 2) change check
} }
} }
} }
/// # Fn `create_watcher` /// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events /// ## for creating watcher on file's delete | update events
@ -169,105 +159,6 @@
Ok(inotify) Ok(inotify)
} }
/// # Fn `create_watcher`
/// ## for managing processes by checking dep files' states
///
/// *input* : `&str`, `&[Files]`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc<mpsc::Sender<u8>>`, mut list of file watchers`Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *depends on* : Files
///
pub async fn file_handler(
name: &str,
files: &[Files],
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
) -> anyhow::Result<()> {
for (i, file) in files.iter().enumerate() {
// let src = format!("{}{}", file.src, file.filename);
if check_file(&file.filename, &file.src).await.is_err() {
if !is_active(name).await || is_frozen(name).await {
return Err(anyhow::Error::msg("Process is frozen or stopped"));
}
match file.triggers.on_delete.as_str() {
"stay" => {
tx.send(9).await.unwrap();
continue;
}
"stop" => {
if is_active(name).await {
tx.send(1).await.unwrap();
}
return Err(anyhow::Error::msg("Process was stopped"));
}
"hold" => {
if is_active(name).await {
tx.send(2).await.unwrap();
return Err(anyhow::Error::msg("Process was frozen"));
}
}
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(anyhow::Error::msg("Impermissible character or word in file trigger"));
}
}
} else if is_active(name).await && !is_frozen(name).await {
let watchers = watchers.clone();
// println!("mutex: {:?}", watchers);
let mut buffer = [0; 128];
let mut mutex_guard = watchers.lock().await;
if let Some(notify) = mutex_guard.get_mut(i) {
let events = notify.read_events(&mut buffer);
// println!("{:?}", events);
if events.is_ok() {
let events: Vec<EventMask> = events
.unwrap()
.map(|mask| mask.mask)
.filter(|mask| {
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
})
.collect();
for event in events {
if let EventMask::DELETE_SELF = event {
// ! warning (DELETE_SELF event) !
// println!("! warning (DELETE_SELF event) !");
// * watcher recreation after dealing with file recreation mechanism in text editors
let mutex = notify.borrow_mut();
// *mutex = create_watcher(&file.filename, &file.src).await.unwrap();
if let Ok(watcher) = create_watcher(&file.filename, &file.src) {
*mutex = watcher;
}
}
match file.triggers.on_change.as_str() {
"stop" => {
let _ = tx.send(7).await;
}
"restart" => {
let _ = tx.send(8).await;
}
"stay" => {
let _ = tx.send(9).await;
}
_ => {
let _ = tx.send(101).await;
}
}
}
}
}
}
}
tokio::task::yield_now().await;
Ok(())
}
/// # Fn `check_file` /// # Fn `check_file`
/// ## for checking existance of current file /// ## for checking existance of current file
/// ///