From 2b82fb7aac5320e4d53bda347bf3be39c00734d6 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 10 Apr 2025 08:53:38 -0400 Subject: [PATCH] prcs v2 controller with impl --- noxis-rs/src/utils/prcs.rs | 94 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index a434c43..726d703 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -3,6 +3,100 @@ use log::{error, warn}; use std::process::{Command, Output}; use std::sync::Arc; use tokio::time::Duration; +use crate::options::structs::{TrackingProcess, ProcessState, Events, NegativeOutcomes}; +use std::collections::HashSet; +use tokio::sync::mpsc::Receiver as MpscReciever; + +pub mod v2 { + use log::info; + use crate::options::structs::DependencyType; + + use super::*; + pub struct ProcessController<'a> { + name: &'a str, + obj: Arc, + state: ProcessState, + event_reader: MpscReciever>, + negative_events: HashSet<&'a str>, + } + + impl<'a> ProcessController<'a> { + pub async fn process(&mut self) { + if let Ok(event) = self.event_reader.try_recv() { + match event { + Events::Positive(target) => { + if self.negative_events.contains(target) { + self.negative_events.remove(target); + } + }, + Events::Negative(event) => { + match event { + NegativeOutcomes::FileWasChanged(target, dep_type, trigger) | + NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) | + NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => { + if !self.negative_events.contains(target) { + self.negative_events.insert(target); + + self.trigger_on( + target, + trigger, + dep_type + ).await; + } + }, + } + }, + } + } + match self.state { + ProcessState::Holding => { + if self.negative_events.len() == 0 { + info!("No negative dependecies events on {} process. Unfreezing ...", self.name); + unfreeze_process(self.name).await; + } + }, + ProcessState::Stopped => { + if self.negative_events.len() == 0 { + info!("No negative dependecies events on {} process. Starting ...", self.name); + if let Err(_) = start_process(self.name, &self.obj.path).await { + error!("Cannot start process {} due to {}", self.name, "system unrecognized error"); + } + } + }, + _ => {}, + } + } + + async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { + match trigger { + "stay" => { + info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name); + }, + "stop" => { + if is_active(self.name).await { + info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); + terminate_process(self.name).await; + self.state = ProcessState::Stopped; + } + }, + "hold" => { + if !is_frozen(self.name).await { + info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); + freeze_process(self.name).await; + self.state = ProcessState::Holding; + } + }, + "restart" => { + info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); + let _ = restart_process(self.name, &self.obj.path).await; + }, + _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + async fn trigger_on_servcie(&mut self, file_name: &str, trigger: &str) {} + } +} /// # Fn `get_pid` /// ## for initializing process of unstoppable grubbing metrics.