pub mod files; pub mod hagent; pub mod metrics; pub mod prcs; pub mod services; // TODO : saving current flags state use crate::options::structs::{CustomError, TrackingProcess, Processes}; // use files::create_watcher; // use files::file_handler; // use inotify::Inotify; use log::{error, warn, info}; use prcs::{ freeze_process, is_active, is_frozen, restart_process, terminate_process, unfreeze_process, }; // use services::service_handler; use std::process::Command; use std::sync::Arc; // use tokio::join; use tokio::sync::mpsc; use tokio::time::Duration; // use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; // controllers import use prcs::v2::ProcessesController; use files::v2::FilesController; use services::v2::ServicesController; use async_trait::async_trait; const GET_ID_CMD: &str = "hostname"; pub mod v2 { use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers}; use super::*; #[derive(Debug)] enum ControllerResult { Process(Option), File(Option), Service(Option), } #[derive(Debug)] struct Supervisor { prcs : LinkedList, files : LinkedList, services : LinkedList, config : Arc, } impl Supervisor { pub fn new() -> Supervisor { Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()) } } pub async fn with_config(mut self, config: Processes) -> Supervisor { self.config = Arc::from(config); let _ = self.config.processes.iter() .for_each(|prc| { let (rx, tx) = mpsc::channel::(10); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); if !self.prcs.contains(&temp) { self.prcs.push_back(temp); } let rx = Arc::new(rx); let proc_name: Arc = Arc::from(prc.name.clone()); let _ = prc.dependencies.files.iter() .for_each(|file| { let mut hm = HashMap::new(); let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())}; hm.insert(proc_name.clone(), (triggers, rx.clone())); let tempfile = FilesController::new(&file.filename.as_str(), hm) .with_path(&file.src); if let Ok(file) = tempfile { if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) { current_file.add_event(file); } else { self.files.push_back(file); } } }); // servs let _ = prc.dependencies.services.iter() .for_each(|serv| { let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref()); // preparations let rx = rx.clone(); let serv_cont = ServicesController::new().with_access_name( &serv.hostname, &access_url ); // triggers let arc: Arc = Arc::from(serv.triggers.on_lost.clone()); let triggers = Triggers::new_service(arc, serv.triggers.wait); if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) { proc.add_process(&prc.name, triggers, rx); } else { // vecdeque for queue let mut vec: VecDeque> = VecDeque::new(); vec.push_back(proc_name.clone()); // connection_queue let mut connection_queue: BTreeMap>> = BTreeMap::new(); connection_queue.insert(serv.triggers.wait, vec); // event_reg let mut hm = HashMap::new(); hm.insert(proc_name.clone(), (triggers, rx)); let serv_cont = serv_cont.with_params(connection_queue, hm); self.services.push_back(serv_cont); } }); }); self } pub fn get_stats(&self) -> String { format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len()) } } #[async_trait] impl ProcessUnit for Supervisor { async fn process(&mut self) { info!("Initializing monitoring ..."); loop { // // todo: CHANNEL check and reaction // // dbg!(&self); let mut tasks: Vec> = vec![]; // let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap()); // let res = tokio::join!(prc.process(), file.process(), serv.process()); if let Some(mut val) = self.prcs.pop_front() { tasks.push( tokio::spawn( async move { val.process().await; ControllerResult::Process(Some(val)) }) ); } if let Some(mut val) = self.files.pop_front() { tasks.push( tokio::spawn( async move { val.process().await; ControllerResult::File(Some(val)) }) ); } if let Some(mut val) = self.services.pop_front() { tasks.push( tokio::spawn( async move { val.process().await; ControllerResult::Service(Some(val)) }) ); } for task in tasks { match task.await { Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val), Ok(ControllerResult::File(Some(val))) => self.files.push_back(val), Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val), Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."), _ => { /* DEAD END (CAN NOT BE EXECUTED) */}, } } tokio::time::sleep(Duration::from_millis(100)).await; } } } // spawn tasks // spawn prc // spawn files // spawn services // ## for ... i.await in loop pub async fn init_monitoring( config: Processes ) -> anyhow::Result<()> { let mut supervisor = Supervisor::new().with_config(config).await; info!("Monitoring: {} ", &supervisor.get_stats()); supervisor.process().await; Ok(()) } // async fn generate_controllers<'a>(config: Processes) -> (HashSet>, HashSet>, HashSet>) { // let mut prcs: HashSet> = HashSet::new(); // let mut files: HashSet> = HashSet::new(); // let mut services: HashSet> = HashSet::new(); // for prc in config.processes { // let (rx, tx) = mpsc::channel::>(10); // // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path); // let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path); // let a = new_prc.process().await; // } // (prcs, files, services) // } // spawn prc check with semaphore check async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) } // spawn file check with semaphore check async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) } // spawn service check with semaphore check async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) } } /// # Fn `run_daemons` /// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel /// /// *input* : `Arc`, `Arc>`, `&mut mpsc::Receiver`, /// /// *output* : () /// /// *initiator* : main thread /// /// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader /// /// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher` /// /// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process /// // pub async fn run_daemons( // proc: Arc, // tx: Arc>, // rx: &mut mpsc::Receiver, // ) { // // creating watchers + ---buffers--- // let mut watchers: Vec = vec![]; // for file in proc.dependencies.files.clone().into_iter() { // if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { // watchers.push(watcher); // } else { // let _ = tx.send(121).await; // } // // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); // } // let watchers_clone: Arc>> = // Arc::new(tokio::sync::Mutex::new(watchers)); // loop { // let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); // tokio::select! { // _ = run_hand => continue, // _val = rx.recv() => { // if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { // return; // } // }, // } // tokio::task::yield_now().await; // } // } async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ match val { // 1 - File-dependency handling error -> terminating (after waiting) 1 => { if is_active(&proc.name).await { error!("File-dependency handling error: Terminating {} process ..." , &proc.name); terminate_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(500)).await; } // return; }, // 2 - File-dependency handling error -> holding (after waiting) 2 => { if !is_frozen(&proc.name).await { error!("File-dependency handling error: Freezing {} process ..." , &proc.name); freeze_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(100)).await; } }, // 3 - Running process error 3 => { error!("Error due to starting {} process", &proc.name); return Err(CustomError::Fatal) }, // 4 - Timeout of waiting service-dependency -> staying (after waiting) 4 => { // warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); tokio::time::sleep(Duration::from_millis(100)).await; }, // 5 - Timeout of waiting service-dependency -> terminating (after waiting) 5 => { if is_active(&proc.name).await { error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); terminate_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(500)).await; } }, // 6 - Timeout of waiting service-dependency -> holding (after waiting) 6 => { // println!("holding {}-{}", proc.name, is_active(&proc.name).await); if !is_frozen(&proc.name).await { error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); freeze_process(&proc.name).await; tokio::time::sleep(Duration::from_secs(1)).await; } }, // // 7 - File-dependency change -> terminating (after check) 7 => { error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); terminate_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(100)).await; return Err(CustomError::Fatal) }, // // 8 - File-dependency change -> restarting (after check) 8 => { warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); let _ = restart_process(&proc.name, &proc.path).await; tokio::time::sleep(Duration::from_millis(100)).await; }, // // 9 - File-dependency change -> staying (after check) 9 => { warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); tokio::time::sleep(Duration::from_millis(100)).await; }, // 10 - Process unfreaze call via file handler (or service handler) 10 | 11 => { if is_frozen(&proc.name).await { warn!("Unfreezing process {} call...", &proc.name); unfreeze_process(&proc.name).await; } tokio::time::sleep(Duration::from_millis(100)).await; }, // 11 - Process unfreaze call via service handler // 11 => { // if is_frozen(&proc.name).await { // warn!("Unfreezing process {} call...", &proc.name); // unfreeze_process(&proc.name).await; // } // tokio::time::sleep(Duration::from_millis(100)).await; // }, // 101 - Impermissible trigger values in JSON 101 => { error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", &proc.name); if is_active(&proc.name).await { terminate_process(&proc.name).await; } return Err(CustomError::Fatal) }, // // 121 - Cannot create valid watcher for file dependency // todo : think about valid situation 121 => { error!("Cannot create valid watcher for file dependency. Terminating {} process...", &proc.name); let _ = terminate_process(&proc.name).await; return Err(CustomError::Fatal) }, // 111 - global thread termination with killing current child in a face // of a current process 111 => { warn!("Terminating {}'s child processes...", &proc.name); match is_active(&proc.name).await { true => { terminate_process(&proc.name).await; }, false => { log::info!("Process {} is already terminated!", proc.name); }, } }, _ => {}, } Ok(()) } // check process status daemon /// # Fn `run_daemons` /// ## func to async exec subjobs of checking process, services and files states /// /// *input* : `Arc`, `Arc>`, `Arc>>` /// /// *output* : () /// /// *initiator* : fn `run_daemons` /// /// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers /// /// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` /// // pub async fn running_handler( // prc: Arc, // tx: Arc>, // watchers: Arc>>, // ) { // // services and files check (once) // let files_check = file_handler( // &prc.name, // &prc.dependencies.files, // tx.clone(), // watchers.clone(), // ); // let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); // let res = join!(files_check, services_check); // // if inactive -> spawn checks -> active is true // if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { // if start_process(&prc.name, &prc.path).await.is_err() { // tx.send(3).await.unwrap(); // return; // } // } // // if frozen -> spawn checks -> unfreeze is true // else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { // tx.send(10).await.unwrap(); // return; // } // // tokio::time::sleep(Duration::from_millis(100)).await; // tokio::task::yield_now().await; // } // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` /// ## for getting container id used in logs /// /// *input* : - /// /// *output* : Some(String) if cont-id was grubbed | None - if not /// /// *initiator* : fn `options::logger::setup_logger` /// /// *managing* : - /// /// *depends on* : - /// pub fn get_container_id() -> Option { match Command::new(GET_ID_CMD).output() { Ok(output) => { if !output.status.success() { return None; } let id = String::from_utf8_lossy(&output.stdout).to_string(); if id.is_empty() { return None; } Some(String::from_utf8_lossy(&output.stdout).to_string()) } Err(_) => None, } } #[cfg(test)] mod utils_unittests { use super::get_container_id; #[test] fn check_if_container_id_can_be_grabed() { assert!(get_container_id().is_some()); } }