From 541b0f52dd197d9c73eb2e06bb5488c31f7eb17e Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 23 Apr 2025 10:34:07 -0400 Subject: [PATCH] supervisor work --- noxis-rs/src/utils.rs | 62 ++++++++++++++++++++++++++- noxis-rs/src/utils/files.rs | 12 ++++-- noxis-rs/src/utils/prcs.rs | 77 ++++++++++++++++++++++------------ noxis-rs/src/utils/services.rs | 10 ++++- 4 files changed, 129 insertions(+), 32 deletions(-) diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 468075c..3430fac 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -23,12 +23,58 @@ use tokio::sync::mpsc; use tokio::time::Duration; use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; +// controllers import +use prcs::v2::ProcessesController; +use files::v2::FilesController; +use services::v2::ServicesController; const GET_ID_CMD: &str = "hostname"; pub mod v2 { + use std::collections::{HashMap, LinkedList}; + use crate::options::structs::{Events, FileTriggersForController, ProcessUnit}; + use super::*; + struct Supervisor<'a> { + prcs : LinkedList>, + files : LinkedList>, + services : LinkedList>, + } + + impl<'a> Supervisor<'a> { + pub fn new(config: &'a Processes) -> Supervisor<'a> { + let mut p = LinkedList::new(); + let mut f = LinkedList::new(); + let mut s = LinkedList::new(); + + let _ = config.processes.iter() + .map(|prc| { + let (rx, tx) = mpsc::channel::>(10); + let temp = ProcessesController::new(&prc.name, tx); + if !p.contains(&temp) { + p.push_back(temp); + } + let rx = Arc::new(rx); + // files + let _ = prc.dependencies.files.iter() + .map(|file| async { + let mut hm = HashMap::new(); + let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete}; + hm.insert(&prc.name, (triggers, rx.clone())); + let tempfile = FilesController::new(&file.filename, hm).with_path(file.src).await; + }); + // servs + let _ = prc.dependencies.services.iter() + .map(|serv| { + + }); + }); + + Supervisor { prcs: p, files: f, services: s } + } + } + // spawn tasks // spawn prc // spawn files @@ -53,8 +99,22 @@ pub mod v2 { // LinkedList // LinkedList Ok(()) - } + } + + // async fn generate_controllers<'a>(config: Processes) -> (HashSet>, HashSet>, HashSet>) { + // let mut prcs: HashSet> = HashSet::new(); + // let mut files: HashSet> = HashSet::new(); + // let mut services: HashSet> = HashSet::new(); + // for prc in config.processes { + // let (rx, tx) = mpsc::channel::>(10); + // // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path); + // let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path); + // let a = new_prc.process().await; + + // } + // (prcs, files, services) + // } // spawn prc check with semaphore check async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) } diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index fad24f0..00d4033 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -21,7 +21,7 @@ // type EventHandlers<'a> = HashMap type EventHandlers<'a> = HashMap<&'a str, (Triggers<'a>, MpscSender<'a>)>; - struct FilesController<'a> { + pub struct FilesController<'a> { name : &'a str, path : String, watcher : Option, @@ -30,6 +30,12 @@ code_name : String, } + impl<'a> PartialEq for FilesController<'a> { + fn eq(&self, other: &Self) -> bool { + self.path == other.path && self.name == other.name + } + } + impl<'a> FilesController<'a> { pub fn new(name: &'a str, triggers: EventHandlers<'a>) -> FilesController<'a> { Self { @@ -40,7 +46,7 @@ code_name : name.to_string(), } } - pub async fn with_path(&mut self, path: impl AsRef) -> anyhow::Result<()> { + pub async fn with_path(mut self, path: impl AsRef) -> anyhow::Result> { self.path = path.as_ref().to_string_lossy().into_owned(); self.watcher = { match create_watcher(self.name, &self.path).await { @@ -52,7 +58,7 @@ } }; self.code_name = format!("{}{}", &self.path, &self.code_name); - Ok(()) + Ok(self) } async fn trigger_on(&'a mut self, trigger_type: Option) { let _ = self.triggers.iter() diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index c190303..b2a966f 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -2,24 +2,47 @@ use log::{error, warn}; use std::process::{Command, Output}; use std::sync::Arc; use tokio::time::Duration; -use crate::options::structs::{TrackingProcess, ProcessState, Events, NegativeOutcomes, ProcessUnit}; +use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit}; use std::collections::HashSet; use tokio::sync::mpsc::Receiver as MpscReciever; pub mod v2 { use log::info; use crate::options::structs::DependencyType; + use std::path::Path; use super::*; - pub struct ProcessController<'a> { + + pub struct ProcessesController<'a> { name: &'a str, - obj: Arc, + bin: String, + // obj: Arc, state: ProcessState, event_reader: MpscReciever>, negative_events: HashSet<&'a str>, } - impl<'a> ProcessController<'a> { + impl<'a> PartialEq for ProcessesController<'a> { + fn eq(&self, other: &Self) -> bool { + self.bin == other.bin + } + } + + impl<'a> ProcessesController<'a> { + pub fn new(name: &'a str, event_reader: MpscReciever>) -> ProcessesController<'a> { + ProcessesController { + name, + bin: String::new(), + state : ProcessState::Stopped, + event_reader, + negative_events : HashSet::new(), + } + } + pub fn with_exe(mut self, bin: impl AsRef) -> ProcessesController<'a> { + self.bin = bin.as_ref().to_string_lossy().into_owned(); + self + } + async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { match trigger { "stay" => { @@ -41,7 +64,7 @@ pub mod v2 { }, "restart" => { info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); - let _ = restart_process(self.name, &self.obj.path).await; + let _ = restart_process(self.name, &self.bin).await; }, _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), } @@ -49,8 +72,29 @@ pub mod v2 { } } - impl<'a> ProcessUnit<'a> for ProcessController<'a> { + impl<'a> ProcessUnit<'a> for ProcessesController<'a> { async fn process(&mut self) { + if self.negative_events.len() == 0 { + match self.state { + ProcessState::Holding => { + info!("No negative dependecies events on {} process. Unfreezing ...", self.name); + if let Err(er) = unfreeze_process(self.name).await { + error!("Cannot unfreeze process {} : {}", self.name, er); + } else { + self.state = ProcessState::Pending; + } + }, + ProcessState::Stopped => { + info!("No negative dependecies events on {} process. Starting ...", self.name); + if let Err(er) = start_process(self.name, &self.bin).await { + error!("Cannot start process {} : {}", self.name, er); + } else { + self.state = ProcessState::Pending; + } + }, + _ => {}, + } + } while let Ok(event) = self.event_reader.try_recv() { match event { Events::Positive(target) => { @@ -77,27 +121,6 @@ pub mod v2 { }, } } - if self.negative_events.len() == 0 { - match self.state { - ProcessState::Holding => { - info!("No negative dependecies events on {} process. Unfreezing ...", self.name); - if let Err(er) = unfreeze_process(self.name).await { - error!("Cannot unfreeze process {} : {}", self.name, er); - } else { - self.state = ProcessState::Pending; - } - }, - ProcessState::Stopped => { - info!("No negative dependecies events on {} process. Starting ...", self.name); - if let Err(er) = start_process(self.name, &self.obj.path).await { - error!("Cannot start process {} : {}", self.name, er); - } else { - self.state = ProcessState::Pending; - } - }, - _ => {}, - } - } } } } diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index 9f54db3..0dd56f6 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -22,8 +22,9 @@ pub mod v2 { type ConnectionQueue<'a> = BTreeMap>; #[derive(Debug)] - struct ServicesController<'a> { + pub struct ServicesController<'a> { // i.e. yandex.ru + #[allow(unused)] name : &'a str, // i.e. yandex.ru:443 access_url : String, @@ -34,6 +35,13 @@ pub mod v2 { // Map of processes with their (trigger and mpsc sender) event_registrator : EventHandlers<'a>, } + + impl<'a> PartialEq for ServicesController<'a> { + fn eq(&self, other: &Self) -> bool { + self.access_url == other.access_url + } + } + impl<'a> ServicesController<'a> { pub fn new() -> ServicesController<'a> { ServicesController {