diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 38380c0..73ed76b 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -21,3 +21,4 @@ dotenv = "0.15.0" futures = "0.3.31" async-trait = "0.1.88" crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } +lazy_static = "1.5.0" diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 03513db..2ef0d08 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -4,31 +4,23 @@ pub mod metrics; pub mod prcs; pub mod services; -// TODO : saving current flags state - -use crate::options::structs::{CustomError, TrackingProcess, Processes}; -// use files::create_watcher; -// use files::file_handler; -// use inotify::Inotify; -use log::{error, warn, info}; -use prcs::{ - freeze_process, is_active, is_frozen, restart_process, terminate_process, - unfreeze_process, -}; -// use services::service_handler; +use crate::options::structs::Processes; +use log::{error, info}; use std::process::Command; use std::sync::Arc; -// use tokio::join; use tokio::sync::mpsc; use tokio::time::Duration; -// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; -// controllers import use prcs::v2::ProcessesController; use files::v2::FilesController; use services::v2::ServicesController; use async_trait::async_trait; +use lazy_static::lazy_static; -const GET_ID_CMD: &str = "hostname"; +lazy_static! { + static ref GET_ID_CMD : &'static str = "hostname"; +} + +// const GET_ID_CMD: &str = "hostname"; pub mod v2 { use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; @@ -187,237 +179,8 @@ pub mod v2 { supervisor.process().await; Ok(()) } - - - // async fn generate_controllers<'a>(config: Processes) -> (HashSet>, HashSet>, HashSet>) { - // let mut prcs: HashSet> = HashSet::new(); - // let mut files: HashSet> = HashSet::new(); - // let mut services: HashSet> = HashSet::new(); - // for prc in config.processes { - // let (rx, tx) = mpsc::channel::>(10); - // // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path); - // let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path); - // let a = new_prc.process().await; - - // } - // (prcs, files, services) - // } - // spawn prc check with semaphore check - async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) } - - // spawn file check with semaphore check - async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) } - - // spawn service check with semaphore check - async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) } } -/// # Fn `run_daemons` -/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel -/// -/// *input* : `Arc`, `Arc>`, `&mut mpsc::Receiver`, -/// -/// *output* : () -/// -/// *initiator* : main thread -/// -/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader -/// -/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher` -/// -/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process -/// -// pub async fn run_daemons( -// proc: Arc, -// tx: Arc>, -// rx: &mut mpsc::Receiver, -// ) { -// // creating watchers + ---buffers--- -// let mut watchers: Vec = vec![]; -// for file in proc.dependencies.files.clone().into_iter() { -// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { -// watchers.push(watcher); -// } else { -// let _ = tx.send(121).await; -// } -// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); -// } -// let watchers_clone: Arc>> = -// Arc::new(tokio::sync::Mutex::new(watchers)); - -// loop { -// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); -// tokio::select! { -// _ = run_hand => continue, -// _val = rx.recv() => { -// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { -// return; -// } -// }, -// } -// tokio::task::yield_now().await; -// } -// } - -async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ - match val { - // 1 - File-dependency handling error -> terminating (after waiting) - 1 => { - if is_active(&proc.name).await { - error!("File-dependency handling error: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(500)).await; - } - // return; - }, - // 2 - File-dependency handling error -> holding (after waiting) - 2 => { - if !is_frozen(&proc.name).await { - error!("File-dependency handling error: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - }, - // 3 - Running process error - 3 => { - error!("Error due to starting {} process", &proc.name); - return Err(CustomError::Fatal) - }, - // 4 - Timeout of waiting service-dependency -> staying (after waiting) - 4 => { - // warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 5 - Timeout of waiting service-dependency -> terminating (after waiting) - 5 => { - if is_active(&proc.name).await { - error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(500)).await; - } - }, - // 6 - Timeout of waiting service-dependency -> holding (after waiting) - 6 => { - // println!("holding {}-{}", proc.name, is_active(&proc.name).await); - if !is_frozen(&proc.name).await { - error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_secs(1)).await; - } - }, - // // 7 - File-dependency change -> terminating (after check) - 7 => { - error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - return Err(CustomError::Fatal) - }, - // // 8 - File-dependency change -> restarting (after check) - 8 => { - warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); - let _ = restart_process(&proc.name, &proc.path).await; - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // // 9 - File-dependency change -> staying (after check) - 9 => { - warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - - // 10 - Process unfreaze call via file handler (or service handler) - 10 | 11 => { - if is_frozen(&proc.name).await { - warn!("Unfreezing process {} call...", &proc.name); - unfreeze_process(&proc.name).await; - } - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 11 - Process unfreaze call via service handler - // 11 => { - // if is_frozen(&proc.name).await { - // warn!("Unfreezing process {} call...", &proc.name); - // unfreeze_process(&proc.name).await; - // } - // tokio::time::sleep(Duration::from_millis(100)).await; - // }, - // 101 - Impermissible trigger values in JSON - 101 => { - error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", &proc.name); - if is_active(&proc.name).await { - terminate_process(&proc.name).await; - } - return Err(CustomError::Fatal) - }, - // - // 121 - Cannot create valid watcher for file dependency - // todo : think about valid situation - 121 => { - error!("Cannot create valid watcher for file dependency. Terminating {} process...", &proc.name); - let _ = terminate_process(&proc.name).await; - return Err(CustomError::Fatal) - }, - // 111 - global thread termination with killing current child in a face - // of a current process - 111 => { - warn!("Terminating {}'s child processes...", &proc.name); - match is_active(&proc.name).await { - true => { - terminate_process(&proc.name).await; - }, - false => { - log::info!("Process {} is already terminated!", proc.name); - }, - } - }, - _ => {}, - } - Ok(()) -} -// check process status daemon -/// # Fn `run_daemons` -/// ## func to async exec subjobs of checking process, services and files states -/// -/// *input* : `Arc`, `Arc>`, `Arc>>` -/// -/// *output* : () -/// -/// *initiator* : fn `run_daemons` -/// -/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers -/// -/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` -/// -// pub async fn running_handler( -// prc: Arc, -// tx: Arc>, -// watchers: Arc>>, -// ) { -// // services and files check (once) -// let files_check = file_handler( -// &prc.name, -// &prc.dependencies.files, -// tx.clone(), -// watchers.clone(), -// ); -// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - -// let res = join!(files_check, services_check); -// // if inactive -> spawn checks -> active is true -// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { -// if start_process(&prc.name, &prc.path).await.is_err() { -// tx.send(3).await.unwrap(); -// return; -// } -// } -// // if frozen -> spawn checks -> unfreeze is true -// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { -// tx.send(10).await.unwrap(); -// return; -// } -// // tokio::time::sleep(Duration::from_millis(100)).await; -// tokio::task::yield_now().await; -// } - // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` /// ## for getting container id used in logs @@ -433,7 +196,7 @@ async fn process_protocol_symbol(proc: Arc, val: u8) -> Result< /// *depends on* : - /// pub fn get_container_id() -> Option { - match Command::new(GET_ID_CMD).output() { + match Command::new(*GET_ID_CMD).output() { Ok(output) => { if !output.status.success() { return None;