diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 79e5b8c..b10a5da 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -58,9 +58,7 @@ async fn main() -> anyhow::Result<()>{ handler.push(ctrlc); let monitoring = tokio::spawn(async move { - let config = if !rx_brd.is_empty() { - rx_brd.recv().await? - } else { + let config = { let mut tick = tokio::time::interval(Duration::from_millis(500)); loop { tick.tick().await; diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 3018f69..4405669 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -36,38 +36,39 @@ pub mod v2 { use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers}; use super::*; - enum ControllerResult<'a> { - Process(Option>), - File(Option>), - Service(Option>), + enum ControllerResult { + Process(Option), + File(Option), + Service(Option), } #[derive(Debug)] - struct Supervisor<'a> { - prcs : LinkedList>, - files : LinkedList>, - services : LinkedList>, + struct Supervisor { + prcs : LinkedList, + files : LinkedList, + services : LinkedList, } - impl<'a> Supervisor<'a> { - pub fn new() -> Supervisor<'a> { + impl Supervisor { + pub fn new() -> Supervisor { Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()} } - pub async fn with_config(mut self, config: &'a Processes) -> Supervisor<'a> { + pub async fn with_config(mut self, config: &Processes) -> Supervisor { let _ = config.processes.iter() .for_each(|prc| { - let (rx, tx) = mpsc::channel::>(10); + let (rx, tx) = mpsc::channel::(10); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); if !self.prcs.contains(&temp) { self.prcs.push_back(temp); } let rx = Arc::new(rx); + let proc_name: Arc = Arc::from(prc.name.clone()); let _ = prc.dependencies.files.iter() .for_each(|file| { let mut hm = HashMap::new(); - let triggers = FileTriggersForController { on_change: &file.triggers.on_change, on_delete: &file.triggers.on_delete}; - hm.insert(prc.name.as_str(), (triggers, rx.clone())); + let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())}; + hm.insert(proc_name.clone(), (triggers, rx.clone())); let tempfile = FilesController::new(&file.filename.as_str(), hm) .with_path(&file.src); @@ -90,23 +91,24 @@ pub mod v2 { let rx = rx.clone(); let serv_cont = ServicesController::new().with_access_name( &serv.hostname, - access_url + &access_url ); // triggers - let triggers = Triggers::new_service(&serv.triggers.on_lost, serv.triggers.wait); + let arc: Arc = Arc::from(serv.triggers.on_lost.clone()); + let triggers = Triggers::new_service(arc, serv.triggers.wait); if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) { proc.add_process(&prc.name, triggers, rx); } else { // vecdeque for queue - let mut vec: VecDeque<&'a str> = VecDeque::new(); - vec.push_back(&prc.name); + let mut vec: VecDeque> = VecDeque::new(); + vec.push_back(proc_name.clone()); // connection_queue - let mut connection_queue: BTreeMap> = BTreeMap::new(); + let mut connection_queue: BTreeMap>> = BTreeMap::new(); connection_queue.insert(serv.triggers.wait, vec); // event_reg let mut hm = HashMap::new(); - hm.insert(prc.name.as_str(), (triggers, rx)); + hm.insert(proc_name.clone(), (triggers, rx)); let serv_cont = serv_cont.with_params(connection_queue, hm); self.services.push_back(serv_cont); @@ -118,21 +120,48 @@ pub mod v2 { pub fn get_stats(&self) -> String { format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len()) } - async fn proccess_prc(&mut self) { - - } } #[async_trait] - impl<'a> ProcessUnit<'a> for Supervisor<'a> { - async fn process(&'a mut self) { + impl ProcessUnit for Supervisor { + async fn process(&mut self) { info!("Initializing monitoring ..."); loop { - // let mut tasks: Vec> = vec![]; + let mut tasks: Vec> = vec![]; // let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap()); // let res = tokio::join!(prc.process(), file.process(), serv.process()); if let Some(mut val) = self.prcs.pop_front() { - tokio::spawn(async move {val.process().await;}).await; + tasks.push( + tokio::spawn( async move { + val.process().await; + ControllerResult::Process(Some(val)) + }) + ); + } + if let Some(mut val) = self.files.pop_front() { + tasks.push( + tokio::spawn( async move { + val.process().await; + ControllerResult::File(Some(val)) + }) + ); + } + if let Some(mut val) = self.services.pop_front() { + tasks.push( + tokio::spawn( async move { + val.process().await; + ControllerResult::Service(Some(val)) + }) + ); + } + for task in tasks { + match task.await { + Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val), + Ok(ControllerResult::File(Some(val))) => self.files.push_back(val), + Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val), + Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."), + _ => { /* DEAD END (CAN NOT BE EXECUTED) */}, + } } tokio::time::sleep(Duration::from_millis(100)).await; } @@ -149,7 +178,7 @@ pub mod v2 { ) -> anyhow::Result<()> { let mut supervisor = Supervisor::new().with_config(&config).await; info!("Monitoring: {} ", &supervisor.get_stats()); - supervisor.process().await; + supervisor.process().await; Ok(()) }