prcs v2 controller with impl

feature/configv2
prplV 2025-04-10 08:53:38 -04:00
parent 584404c050
commit 2b82fb7aac
1 changed files with 94 additions and 0 deletions

View File

@ -3,6 +3,100 @@ use log::{error, warn};
use std::process::{Command, Output}; use std::process::{Command, Output};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
use crate::options::structs::{TrackingProcess, ProcessState, Events, NegativeOutcomes};
use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever;
pub mod v2 {
use log::info;
use crate::options::structs::DependencyType;
use super::*;
pub struct ProcessController<'a> {
name: &'a str,
obj: Arc<TrackingProcess>,
state: ProcessState,
event_reader: MpscReciever<Events<'a>>,
negative_events: HashSet<&'a str>,
}
impl<'a> ProcessController<'a> {
pub async fn process(&mut self) {
if let Ok(event) = self.event_reader.try_recv() {
match event {
Events::Positive(target) => {
if self.negative_events.contains(target) {
self.negative_events.remove(target);
}
},
Events::Negative(event) => {
match event {
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) |
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) |
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
if !self.negative_events.contains(target) {
self.negative_events.insert(target);
self.trigger_on(
target,
trigger,
dep_type
).await;
}
},
}
},
}
}
match self.state {
ProcessState::Holding => {
if self.negative_events.len() == 0 {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name);
unfreeze_process(self.name).await;
}
},
ProcessState::Stopped => {
if self.negative_events.len() == 0 {
info!("No negative dependecies events on {} process. Starting ...", self.name);
if let Err(_) = start_process(self.name, &self.obj.path).await {
error!("Cannot start process {} due to {}", self.name, "system unrecognized error");
}
}
},
_ => {},
}
}
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
match trigger {
"stay" => {
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name);
},
"stop" => {
if is_active(self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name);
terminate_process(self.name).await;
self.state = ProcessState::Stopped;
}
},
"hold" => {
if !is_frozen(self.name).await {
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name);
freeze_process(self.name).await;
self.state = ProcessState::Holding;
}
},
"restart" => {
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name);
let _ = restart_process(self.name, &self.obj.path).await;
},
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
async fn trigger_on_servcie(&mut self, file_name: &str, trigger: &str) {}
}
}
/// # Fn `get_pid` /// # Fn `get_pid`
/// ## for initializing process of unstoppable grubbing metrics. /// ## for initializing process of unstoppable grubbing metrics.