pub mod bus; pub mod files; pub mod hagent; pub mod metrics; pub mod prcs; pub mod services; use crate::options::structs::bus::{BusMessage, BusMessageContentType, InternalCli}; use crate::options::structs::Processes; use async_trait::async_trait; use files::v2::FilesController; use lazy_static::lazy_static; use log::{error, info}; use prcs::v2::ProcessesController; use services::v2::ServicesController; use std::process::Command; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::Duration; lazy_static! { static ref GET_ID_CMD: &'static str = "hostname"; } // const GET_ID_CMD: &str = "hostname"; pub mod v2 { use super::*; use crate::utils::metrics::processes::{ProcessesAll, ProcessesQuery}; use crate::{ options::structs::{ bus::CLiCommand, Events, FileTriggersForController, ProcessUnit, Triggers, }, utils::metrics::processes::deps::{Dependencies, FilesExtended, ServicesExtended}, }; use std::any::Any; use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; type BusReciever = tokio::sync::mpsc::Receiver; type BusSender = Arc>; #[derive(Debug)] enum ControllerResult { Process(Option), File(Option), Service(Option), } #[derive(Debug)] struct Supervisor { prcs: LinkedList, files: LinkedList, services: LinkedList, config: Arc, bus: (BusReciever, BusSender), } impl Supervisor { pub fn new(bus_reciever: BusReciever, bus_sender: BusSender) -> Supervisor { Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()), bus: (bus_reciever, bus_sender), } } pub async fn with_config(mut self, config: Processes) -> Supervisor { self.config = Arc::from(config); let _ = self.config.processes.iter().for_each(|prc| { let (rx, tx) = mpsc::channel::(10); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); if !self.prcs.contains(&temp) { self.prcs.push_back(temp); } let rx = Arc::new(rx); let proc_name: Arc = Arc::from(prc.name.clone()); let _ = prc.dependencies.files.iter().for_each(|file| { let mut hm = HashMap::new(); let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone()), }; hm.insert(proc_name.clone(), (triggers, rx.clone())); let tempfile = FilesController::new(&file.filename.as_str(), hm).with_path(&file.src); if let Ok(file) = tempfile { if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) { current_file.add_event(file); } else { self.files.push_back(file); } } }); // servs let _ = prc.dependencies.services.iter().for_each(|serv| { let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref()); // preparations let rx = rx.clone(); let serv_cont = ServicesController::new().with_access_name(&serv.hostname, &access_url); // triggers let arc: Arc = Arc::from(serv.triggers.on_lost.clone()); let triggers = Triggers::new_service(arc, serv.triggers.wait); if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) { proc.add_process(&prc.name, triggers, rx); } else { // vecdeque for queue let mut vec: VecDeque> = VecDeque::new(); vec.push_back(proc_name.clone()); // connection_queue let mut connection_queue: BTreeMap>> = BTreeMap::new(); connection_queue.insert(serv.triggers.wait, vec); // event_reg let mut hm = HashMap::new(); hm.insert(proc_name.clone(), (triggers, rx)); let serv_cont = serv_cont.with_params(connection_queue, hm); self.services.push_back(serv_cont); } }); }); self } pub fn get_stats(&self) -> String { format!( "processes: {}, files: {}, services: {}", self.prcs.len(), self.files.len(), self.services.len() ) } pub async fn extract_extended_procs( config: Arc, prcs_list: &LinkedList, files_list: &LinkedList, servs_list: &LinkedList, ) -> Vec { let mut procs = Vec::new(); for prc in config.processes.iter() { if let Some(prc_cont) = prcs_list .iter() .find(|&prc_cont| prc.name == *prc_cont.name) { let mut vec_files = Vec::new(); let mut vec_services = Vec::new(); prc.dependencies .files .iter() .map(|file| (file, format!("{}{}", file.src, file.filename))) .for_each(|(file, code_name)| { if let Some(file_cont) = files_list .iter() .find(|&file_cont| *file_cont.get_code_name() == code_name) { vec_files.push(FilesExtended { name: file.filename.to_string(), path: file.src.to_string(), status: file_cont.get_state(), triggers: file.triggers.to_owned(), }); } }); prc.dependencies .services .iter() .map(|serv| { ( serv, format!("{}{}", serv.hostname, { if let Some(port) = serv.port { format!(":{}", port) } else { String::new() } }), ) }) .for_each(|(serv, acces_url)| { if let Some(serv_cont) = servs_list .iter() .find(|&serv_cont| *serv_cont.get_arc_access_url() == acces_url) { vec_services.push(ServicesExtended { name: serv.hostname.to_owned(), access_name: (*serv_cont.get_arc_access_url()).to_owned(), status: serv_cont.get_state(), triggers: serv.triggers.to_owned(), }); } }); procs.push(ProcessesAll { name: prc_cont.name.clone().to_string(), state: prc_cont.get_state(), pid: prc_cont.get_pid(), dependencies: Dependencies { files: vec_files, services: vec_services, }, }); } } procs } } #[async_trait] impl ProcessUnit for Supervisor { async fn process(&mut self) { info!("Initializing monitoring ..."); loop { // let rec = &mut self.bus.0; while let Ok(request) = rec.try_recv() { if let BusMessage::Request(_, _, cont) = request { let cont: Box = cont; match cont.downcast::() { Ok(cli) => { let mut count = 0; let fut = (&mut self.prcs) .into_iter() .find(|prc| prc.name == Arc::from(cli.prc.as_ref())) .map(|prc| async { let count = &mut count; *count += 1; let res = match cli.cmd { CLiCommand::Start => prc.start_by_user_call().await, CLiCommand::Stop => prc.stop_by_user_call().await, CLiCommand::Restart => prc.restart_by_user_call().await, CLiCommand::Freeze => prc.freeze_by_user_call().await, CLiCommand::Unfreeze => { prc.unfreeze_by_user_call().await } }; let sender = self.bus.1.clone(); let resp_content = match res { Ok(_) => Ok(format!( "Ok on user call abour process {}", prc.name )), Err(er) => Err(anyhow::Error::msg(format!( "Error: User call for process {} failed : {}", prc.name, er ))), }; let _ = sender.send(BusMessage::Response( crate::options::structs::bus::BusMessageDirection::ToCli, BusMessageContentType::Result, Box::new(resp_content) )).await; 1 }); if let Some(fut) = fut { fut.await; } else { let _ = self.bus.1.clone().send(BusMessage::Response( crate::options::structs::bus::BusMessageDirection::ToCli, BusMessageContentType::RawString, Box::new( Err(anyhow::Error::msg(format!("No process named `{}` was found in controlled scope", cli.prc))) ) )).await; } } Err(boxed) => { if let Ok(query) = boxed.downcast::() { match *query { ProcessesQuery::QueryAll => { let procs = Self::extract_extended_procs( self.config.clone(), &self.prcs, &self.files, &self.services, ) .await; let _ = self.bus.1.clone().send(BusMessage::Response( crate::options::structs::bus::BusMessageDirection::ToMetrics, BusMessageContentType::ProcessQuery, Box::new( ProcessesQuery::All(procs) ) )).await; } ProcessesQuery::QueryGeneral => { let mut vec = Vec::new(); for prc in &self.prcs { vec.push(prc.get_general_info().await); } let _ = self.bus.1.clone().send(BusMessage::Response( crate::options::structs::bus::BusMessageDirection::ToMetrics, BusMessageContentType::ProcessQuery, Box::new( ProcessesQuery::General(vec) ) )).await; } _ => { let _ = self.bus.1.clone().send(BusMessage::Response( crate::options::structs::bus::BusMessageDirection::ToCli, BusMessageContentType::RawString, Box::new( Err(anyhow::Error::msg("Unknown request format was send to the Supervisor")) ) )).await; } } } } } } } let mut tasks: Vec> = vec![]; if let Some(mut val) = self.prcs.pop_front() { tasks.push(tokio::spawn(async move { val.process().await; ControllerResult::Process(Some(val)) })); } if let Some(mut val) = self.files.pop_front() { tasks.push(tokio::spawn(async move { val.process().await; ControllerResult::File(Some(val)) })); } if let Some(mut val) = self.services.pop_front() { tasks.push(tokio::spawn(async move { val.process().await; ControllerResult::Service(Some(val)) })); } for task in tasks { match task.await { Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val), Ok(ControllerResult::File(Some(val))) => self.files.push_back(val), Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val), Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."), _ => { /* DEAD END (CAN NOT BE EXECUTED) */}, } } tokio::time::sleep(Duration::from_millis(100)).await; } } } pub async fn init_monitoring( config: Processes, bus_reciever: BusReciever, bus_sender: BusSender, ) -> anyhow::Result<()> { let mut supervisor = Supervisor::new(bus_reciever, bus_sender) .with_config(config) .await; info!("Monitoring: {} ", &supervisor.get_stats()); supervisor.process().await; Ok(()) } } // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` /// ## for getting container id used in logs /// /// *input* : - /// /// *output* : Some(String) if cont-id was grubbed | None - if not /// /// *initiator* : fn `options::logger::setup_logger` /// /// *managing* : - /// /// *depends on* : - /// pub fn get_container_id() -> Option { match Command::new(*GET_ID_CMD).output() { Ok(output) => { if !output.status.success() { return None; } let id = String::from_utf8_lossy(&output.stdout).to_string(); if id.is_empty() { return None; } Some(String::from_utf8_lossy(&output.stdout).trim().to_string()) } Err(_) => None, } } #[cfg(test)] mod utils_unittests { use super::get_container_id; #[test] fn check_if_container_id_can_be_grabed() { assert!(get_container_id().is_some()); } }