utils ref + lazy static

migrate
prplV 2025-05-29 17:24:05 +03:00
parent 19b3477560
commit 3d0b7766ac
2 changed files with 10 additions and 246 deletions

View File

@ -21,3 +21,4 @@ dotenv = "0.15.0"
futures = "0.3.31" futures = "0.3.31"
async-trait = "0.1.88" async-trait = "0.1.88"
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
lazy_static = "1.5.0"

View File

@ -4,31 +4,23 @@ pub mod metrics;
pub mod prcs; pub mod prcs;
pub mod services; pub mod services;
// TODO : saving current flags state use crate::options::structs::Processes;
use log::{error, info};
use crate::options::structs::{CustomError, TrackingProcess, Processes};
// use files::create_watcher;
// use files::file_handler;
// use inotify::Inotify;
use log::{error, warn, info};
use prcs::{
freeze_process, is_active, is_frozen, restart_process, terminate_process,
unfreeze_process,
};
// use services::service_handler;
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
// use tokio::join;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender};
// controllers import
use prcs::v2::ProcessesController; use prcs::v2::ProcessesController;
use files::v2::FilesController; use files::v2::FilesController;
use services::v2::ServicesController; use services::v2::ServicesController;
use async_trait::async_trait; use async_trait::async_trait;
use lazy_static::lazy_static;
const GET_ID_CMD: &str = "hostname"; lazy_static! {
static ref GET_ID_CMD : &'static str = "hostname";
}
// const GET_ID_CMD: &str = "hostname";
pub mod v2 { pub mod v2 {
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
@ -187,237 +179,8 @@ pub mod v2 {
supervisor.process().await; supervisor.process().await;
Ok(()) Ok(())
} }
// async fn generate_controllers<'a>(config: Processes) -> (HashSet<ProcessesController<'a>>, HashSet<FilesController<'a>>, HashSet<ServicesController<'a>>) {
// let mut prcs: HashSet<ProcessesController<'a>> = HashSet::new();
// let mut files: HashSet<FilesController<'a>> = HashSet::new();
// let mut services: HashSet<ServicesController<'a>> = HashSet::new();
// for prc in config.processes {
// let (rx, tx) = mpsc::channel::<Events<'a>>(10);
// // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path);
// let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path);
// let a = new_prc.process().await;
// }
// (prcs, files, services)
// }
// spawn prc check with semaphore check
async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn file check with semaphore check
async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) }
// spawn service check with semaphore check
async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) }
} }
/// # Fn `run_daemons`
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
///
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `&mut mpsc::Receiver<u8>`,
///
/// *output* : ()
///
/// *initiator* : main thread
///
/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader
///
/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher`
///
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
///
// pub async fn run_daemons(
// proc: Arc<TrackingProcess>,
// tx: Arc<mpsc::Sender<u8>>,
// rx: &mut mpsc::Receiver<u8>,
// ) {
// // creating watchers + ---buffers---
// let mut watchers: Vec<Inotify> = vec![];
// for file in proc.dependencies.files.clone().into_iter() {
// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await {
// watchers.push(watcher);
// } else {
// let _ = tx.send(121).await;
// }
// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
// }
// let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> =
// Arc::new(tokio::sync::Mutex::new(watchers));
// loop {
// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
// tokio::select! {
// _ = run_hand => continue,
// _val = rx.recv() => {
// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() {
// return;
// }
// },
// }
// tokio::task::yield_now().await;
// }
// }
async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<(), CustomError>{
match val {
// 1 - File-dependency handling error -> terminating (after waiting)
1 => {
if is_active(&proc.name).await {
error!("File-dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(500)).await;
}
// return;
},
// 2 - File-dependency handling error -> holding (after waiting)
2 => {
if !is_frozen(&proc.name).await {
error!("File-dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
},
// 3 - Running process error
3 => {
error!("Error due to starting {} process", &proc.name);
return Err(CustomError::Fatal)
},
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
4 => {
// warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => {
if is_active(&proc.name).await {
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(500)).await;
}
},
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => {
// println!("holding {}-{}", proc.name, is_active(&proc.name).await);
if !is_frozen(&proc.name).await {
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
},
// // 7 - File-dependency change -> terminating (after check)
7 => {
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
return Err(CustomError::Fatal)
},
// // 8 - File-dependency change -> restarting (after check)
8 => {
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
let _ = restart_process(&proc.name, &proc.path).await;
tokio::time::sleep(Duration::from_millis(100)).await;
},
// // 9 - File-dependency change -> staying (after check)
9 => {
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 10 - Process unfreaze call via file handler (or service handler)
10 | 11 => {
if is_frozen(&proc.name).await {
warn!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
},
// 11 - Process unfreaze call via service handler
// 11 => {
// if is_frozen(&proc.name).await {
// warn!("Unfreezing process {} call...", &proc.name);
// unfreeze_process(&proc.name).await;
// }
// tokio::time::sleep(Duration::from_millis(100)).await;
// },
// 101 - Impermissible trigger values in JSON
101 => {
error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", &proc.name);
if is_active(&proc.name).await {
terminate_process(&proc.name).await;
}
return Err(CustomError::Fatal)
},
//
// 121 - Cannot create valid watcher for file dependency
// todo : think about valid situation
121 => {
error!("Cannot create valid watcher for file dependency. Terminating {} process...", &proc.name);
let _ = terminate_process(&proc.name).await;
return Err(CustomError::Fatal)
},
// 111 - global thread termination with killing current child in a face
// of a current process
111 => {
warn!("Terminating {}'s child processes...", &proc.name);
match is_active(&proc.name).await {
true => {
terminate_process(&proc.name).await;
},
false => {
log::info!("Process {} is already terminated!", proc.name);
},
}
},
_ => {},
}
Ok(())
}
// check process status daemon
/// # Fn `run_daemons`
/// ## func to async exec subjobs of checking process, services and files states
///
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : ()
///
/// *initiator* : fn `run_daemons`
///
/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers
///
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
///
// pub async fn running_handler(
// prc: Arc<TrackingProcess>,
// tx: Arc<mpsc::Sender<u8>>,
// watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>,
// ) {
// // services and files check (once)
// let files_check = file_handler(
// &prc.name,
// &prc.dependencies.files,
// tx.clone(),
// watchers.clone(),
// );
// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
// let res = join!(files_check, services_check);
// // if inactive -> spawn checks -> active is true
// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
// if start_process(&prc.name, &prc.path).await.is_err() {
// tx.send(3).await.unwrap();
// return;
// }
// }
// // if frozen -> spawn checks -> unfreeze is true
// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() {
// tx.send(10).await.unwrap();
// return;
// }
// // tokio::time::sleep(Duration::from_millis(100)).await;
// tokio::task::yield_now().await;
// }
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
/// # Fn `get_container_id` /// # Fn `get_container_id`
/// ## for getting container id used in logs /// ## for getting container id used in logs
@ -433,7 +196,7 @@ async fn process_protocol_symbol(proc: Arc<TrackingProcess>, val: u8) -> Result<
/// *depends on* : - /// *depends on* : -
/// ///
pub fn get_container_id() -> Option<String> { pub fn get_container_id() -> Option<String> {
match Command::new(GET_ID_CMD).output() { match Command::new(*GET_ID_CMD).output() {
Ok(output) => { Ok(output) => {
if !output.status.success() { if !output.status.success() {
return None; return None;