pub mod files; pub mod hagent; pub mod metrics; pub mod prcs; pub mod services; // TODO : saving current flags state use crate::options::structs::CustomError; use crate::options::structs::TrackingProcess; use files::create_watcher; use files::file_handler; use inotify::Inotify; use log::{error, warn}; use prcs::{ freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, unfreeze_process, }; use services::service_handler; use std::process::Command; use std::sync::Arc; use tokio::join; use tokio::sync::mpsc; use tokio::time::Duration; const GET_ID_CMD: &str = "hostname"; /// # Fn `run_daemons` /// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel /// /// *input* : `Arc`, `Arc>`, `&mut mpsc::Receiver`, /// /// *output* : () /// /// *initiator* : main thread /// /// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader /// /// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher` /// /// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process /// pub async fn run_daemons( proc: Arc, tx: Arc>, rx: &mut mpsc::Receiver, ) { // creating watchers + ---buffers--- let mut watchers: Vec = vec![]; for file in proc.dependencies.files.clone().into_iter() { if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { watchers.push(watcher); } else { let _ = tx.send(121).await; } // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); } let watchers_clone: Arc>> = Arc::new(tokio::sync::Mutex::new(watchers)); loop { let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); tokio::select! { _ = run_hand => continue, _val = rx.recv() => { if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { return; } }, } tokio::task::yield_now().await; } } async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ match val { // 1 - File-dependency handling error -> terminating (after waiting) 1 => { if is_active(&proc.name).await { error!("File-dependency handling error: Terminating {} process ..." , &proc.name); terminate_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(500)).await; } // return; }, // 2 - File-dependency handling error -> holding (after waiting) 2 => { if !is_frozen(&proc.name).await { error!("File-dependency handling error: Freezing {} process ..." , &proc.name); freeze_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(100)).await; } }, // 3 - Running process error 3 => { error!("Error due to starting {} process", &proc.name); return Err(CustomError::Fatal) }, // 4 - Timeout of waiting service-dependency -> staying (after waiting) 4 => { // warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); tokio::time::sleep(Duration::from_millis(100)).await; }, // 5 - Timeout of waiting service-dependency -> terminating (after waiting) 5 => { if is_active(&proc.name).await { error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); terminate_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(500)).await; } }, // 6 - Timeout of waiting service-dependency -> holding (after waiting) 6 => { // println!("holding {}-{}", proc.name, is_active(&proc.name).await); if !is_frozen(&proc.name).await { error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); freeze_process(&proc.name).await; tokio::time::sleep(Duration::from_secs(1)).await; } }, // // 7 - File-dependency change -> terminating (after check) 7 => { error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); terminate_process(&proc.name).await; tokio::time::sleep(Duration::from_millis(100)).await; return Err(CustomError::Fatal) }, // // 8 - File-dependency change -> restarting (after check) 8 => { warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); let _ = restart_process(&proc.name, &proc.path).await; tokio::time::sleep(Duration::from_millis(100)).await; }, // // 9 - File-dependency change -> staying (after check) 9 => { // no need to trash logs warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); tokio::time::sleep(Duration::from_millis(100)).await; }, // 10 - Process unfreaze call via file handler (or service handler) 10 | 11 => { if is_frozen(&proc.name).await { warn!("Unfreezing process {} call...", &proc.name); unfreeze_process(&proc.name).await; } tokio::time::sleep(Duration::from_millis(100)).await; }, // 11 - Process unfreaze call via service handler // 11 => { // if is_frozen(&proc.name).await { // warn!("Unfreezing process {} call...", &proc.name); // unfreeze_process(&proc.name).await; // } // tokio::time::sleep(Duration::from_millis(100)).await; // }, // 101 - Impermissible trigger values in JSON 101 => { error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", &proc.name); if is_active(&proc.name).await { terminate_process(&proc.name).await; } return Err(CustomError::Fatal) }, // // 121 - Cannot create valid watcher for file dependency // todo : think about valid situation 121 => { error!("Cannot create valid watcher for file dependency. Terminating {} process...", &proc.name); let _ = terminate_process(&proc.name).await; return Err(CustomError::Fatal) }, // 111 - global thread termination with killing current child in a face // of a current process 111 => { warn!("Terminating {}'s child processes...", &proc.name); match is_active(&proc.name).await { true => { terminate_process(&proc.name).await; }, false => { log::info!("Process {} is already terminated!", proc.name); }, } }, _ => {}, } Ok(()) } // check process status daemon /// # Fn `run_daemons` /// ## func to async exec subjobs of checking process, services and files states /// /// *input* : `Arc`, `Arc>`, `Arc>>` /// /// *output* : () /// /// *initiator* : fn `run_daemons` /// /// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers /// /// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` /// pub async fn running_handler( prc: Arc, tx: Arc>, watchers: Arc>>, ) { // services and files check (once) let files_check = file_handler( &prc.name, &prc.dependencies.files, tx.clone(), watchers.clone(), ); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); let res = join!(files_check, services_check); // if inactive -> spawn checks -> active is true if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { if start_process(&prc.name, &prc.path).await.is_err() { tx.send(3).await.unwrap(); return; } } // if frozen -> spawn checks -> unfreeze is true else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { tx.send(10).await.unwrap(); return; } // tokio::time::sleep(Duration::from_millis(100)).await; tokio::task::yield_now().await; } // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` /// ## for getting container id used in logs /// /// *input* : - /// /// *output* : Some(String) if cont-id was grubbed | None - if not /// /// *initiator* : fn `options::logger::setup_logger` /// /// *managing* : - /// /// *depends on* : - /// pub fn get_container_id() -> Option { match Command::new(GET_ID_CMD).output() { Ok(output) => { if !output.status.success() { return None; } let id = String::from_utf8_lossy(&output.stdout).to_string(); if id.is_empty() { return None; } Some(String::from_utf8_lossy(&output.stdout).to_string()) } Err(_) => None, } } #[cfg(test)] mod utils_unittests { use super::get_container_id; #[test] fn check_if_container_id_can_be_grabed() { assert!(get_container_id().is_some()); } }