diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 282a13d..ef098c7 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -7,11 +7,61 @@ pub enum DependencyType { File, Service, } + +pub struct ServiceWaitConfig { wait: u32, delay: u32} + +impl Default for ServiceWaitConfig { + fn default() -> Self { + Self { wait: 0, delay: 5 } + } +} + +pub enum FileTriggerType { + OnChange, + OnDelete, +} + +impl std::fmt::Display for FileTriggerType { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return match self { + FileTriggerType::OnChange => write!(f, "File was changed"), + FileTriggerType::OnDelete => write!(f, "File was moved or deleted"), + } + } +} + +impl<'a> FileTriggerType { + pub fn event(&self, file_name: &'a str, trigger: &'a str) -> Events<'a> { + return match self { + FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), + FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), + } + } + pub fn event_from_file_trigger_controller(&self, file_name: &'a str, trigger: &FileTriggersForController<'a>) -> Events<'a> { + return match self { + FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change)), + FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_delete)), + } + } +} + pub enum Triggers<'a> { File{ on_change: &'a str, on_delete: &'a str }, Service(&'a str), } +impl<'a> Triggers<'a> { + pub fn new_file(on_change: &'a str, on_delete: &'a str) -> Triggers<'a> { + Triggers::File { on_change, on_delete } + } + pub fn new_service(on_lost: &'a str) -> Triggers<'a> { + Triggers::Service(on_lost) + } +} + +pub struct FileTriggersForController<'a> { pub on_change: &'a str, pub on_delete: &'a str } +pub struct ServiceTriggersForController<'a>(&'a str); + impl std::fmt::Display for DependencyType { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { return match self { @@ -20,6 +70,7 @@ impl std::fmt::Display for DependencyType { } } } + pub enum ProcessState { Pending, Holding, diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index 678b084..22060fb 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -10,49 +10,103 @@ use tokio::time::Duration; use crate::options::structs::Events; pub mod v2 { - // use std::collections::HashMap; - use crate::options::structs::{Triggers, ProcessUnit}; - use super::*; - use std::path::Path; + use log::{error, info, warn}; - type EventHandlers<'a> = Vec>>; + // use std::collections::HashMap; + use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; + use super::*; + use std::{collections::HashMap, path::Path}; + + // type EventHandlers<'a> = HashMap + type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender>)>; + // + type FileTriggersWrapper<'a> = HashMap<&'a str, Triggers<'a>>; struct FilesController<'a> { name: &'a str, path: String, watcher: Option, // obj: Arc, - triggers: Triggers<'a>, - event_registrator: EventHandlers<'a>, + triggers: EventHandlers<'a>, } impl<'a> FilesController<'a> { - pub fn new(name: &'a str, triggers: Triggers<'a>, event_registrator: EventHandlers<'a>) -> FilesController<'a> { + pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> { Self { name, path : String::new(), watcher: None, triggers, - event_registrator, } } pub async fn with_path(&mut self, path: impl AsRef) -> anyhow::Result<()> { self.path = path.as_ref().to_string_lossy().into_owned(); - self.watcher = Some({ + self.watcher = { match create_watcher(self.name, &self.path).await { - Ok(val) => val, - Err(er) => return Err(er) + Ok(val) => Some(val), + Err(er) => { + error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); + return Err(er) + } } - }); + }; Ok(()) } - pub async fn trigger_on(&mut self) { - // trigger handler + async fn trigger_on(&mut self, trigger_type: Option) { + let _ = self.triggers.iter() + .map(|(_, (triggers, channel))| async { + let _ = channel.send({ + match &trigger_type { + None => Events::Positive(self.name), + Some(event) => { + info!("Event on {} ({}) : {}", self.name, &self.path, event); + event.event_from_file_trigger_controller(self.name, triggers) + }, + } + }).await; + }); } } impl<'a> ProcessUnit<'a> for FilesController<'a> { async fn process(&mut self) { // polling file check + // 1) existing check + if let Ok(_) = check_file(self.name, &self.path).await { + match &mut self.watcher { + Some(notify) => { + let mut buffer = [0; 1024]; + if let Ok(mut notif_events) = notify.read_events(&mut buffer) { + if let (recreate_watcher, true) = ( + notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF), + notif_events.any(|mask| mask.mask == EventMask::MODIFY) + ) { + warn!("File {} ({}) was changed", self.name, &self.path); + if recreate_watcher { + self.watcher = match create_watcher(self.name, &self.path).await { + Ok(notifier) => Some(notifier), + Err(er) => { + error!("Failed to recreate watcher for {} ({}) due to {}", + self.name, + &self.path, + er + ); + None + }, + } + } + self.trigger_on(Some(FileTriggerType::OnChange)).await; + } + } + }, + None => { /* DEAD END */}, + } + } else { + warn!("File {} ({}) was not found in determined scope", self.name, &self.path); + self.trigger_on(Some(FileTriggerType::OnDelete)).await; + return; + } + self.trigger_on(None).await; + // 2) change check } } } diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index ef88228..9d9c6c5 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -1,4 +1,3 @@ -use crate::options::structs::CustomError; use log::{error, warn}; use std::process::{Command, Output}; use std::sync::Arc; @@ -52,7 +51,7 @@ pub mod v2 { impl<'a> ProcessUnit<'a> for ProcessController<'a> { async fn process(&mut self) { - if let Ok(event) = self.event_reader.try_recv() { + while let Ok(event) = self.event_reader.try_recv() { match event { Events::Positive(target) => { if self.negative_events.contains(target) { @@ -82,12 +81,18 @@ pub mod v2 { match self.state { ProcessState::Holding => { info!("No negative dependecies events on {} process. Unfreezing ...", self.name); - unfreeze_process(self.name).await; + if let Err(er) = unfreeze_process(self.name).await { + error!("Cannot unfreeze process {} due to {}", self.name, er); + } else { + self.state = ProcessState::Pending; + } }, ProcessState::Stopped => { info!("No negative dependecies events on {} process. Starting ...", self.name); if let Err(_) = start_process(self.name, &self.obj.path).await { error!("Cannot start process {} due to {}", self.name, "system unrecognized error"); + } else { + self.state = ProcessState::Pending; } }, _ => {}, @@ -255,14 +260,11 @@ pub async fn freeze_process(name: &str) { /// /// *depends on* : - /// -pub async fn unfreeze_process(name: &str) { +pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") .args(["-CONT", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to unfreeze process"); - std::process::exit(101); - }); + .output()?; + Ok(()) } /// # Fn `restart_process` @@ -278,7 +280,7 @@ pub async fn unfreeze_process(name: &str) { /// /// *depends on* : fn `start_process`, fn `terminate_process` /// -pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { +pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { terminate_process(name).await; tokio::time::sleep(Duration::from_millis(100)).await; start_process(name, path).await @@ -297,7 +299,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> /// /// *depends on* : - /// -pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { +pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> { // let runsh = format!("{} {}", "exec", path); let mut command = Command::new(path); // command.arg(path); @@ -308,8 +310,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { Ok(()) } Err(er) => { - println!("{:?}", er); - Err(CustomError::Fatal) + Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er))) } } } diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index b90007d..2477c6d 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -6,23 +6,38 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use tokio::sync::mpsc::Sender as MpscSender; -use crate::options::structs::Events; pub mod v2 { - use crate::options::structs::{Triggers, ProcessUnit}; + use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceWaitConfig}; use super::*; use std::collections::HashMap; - type EventHandlers<'a> = Vec>>; + // type EventHandlers<'a> = Vec>>; + type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender>)>; struct ServicesController<'a> { - name: &'a str, - // obj: Arc, - triggers: Triggers<'a>, - event_registrator: EventHandlers<'a>, + name : &'a str, + access_url : String, + config: ServiceWaitConfig, + event_registrator : EventHandlers<'a>, + } + impl<'a> ServicesController<'a> { + pub fn new() -> ServicesController<'a> { + ServicesController { + name : "", + access_url : String::new(), + config: ServiceWaitConfig::default(), + event_registrator : EventHandlers::new(), + } + } + pub async fn with_params(&mut self, hostname: &'a str, port: Option<&'a str>, event_registrator: EventHandlers<'a>) -> anyhow::Result<()> { + self.name = hostname; + self.access_url = format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))); + self.event_registrator = event_registrator; + Ok(()) + } } - // self impl impl<'a> ProcessUnit<'a> for ServicesController<'a> { async fn process(&mut self) {