Compare commits

...

4 Commits

Author SHA1 Message Date
prplV e89d1be8e4 docs: utils 2024-11-13 15:42:39 +03:00
prplV adb3858ee4 docs: services + prcs fix 2024-11-13 15:29:00 +03:00
prplV c14a6f943b docs: prcs 2024-11-13 15:15:39 +03:00
prplV 174779bfcb docs: metrics (tmp) 2024-11-13 15:02:38 +03:00
4 changed files with 288 additions and 17 deletions

View File

@ -22,9 +22,21 @@ use tokio::time::Duration;
const GET_ID_CMD: &str = "hostname";
/// # async func to run 3 main daemons (now it's more like tree-form than classical 0.1.0 form )
/// > hint : give mpsc with capacity 1 to jump over potential errors during running process
/// > ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") **
/// # Fn `run_daemons`
/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel
///
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `&mut mpsc::Receiver<u8>`,
///
/// *output* : ()
///
/// *initiator* : main thread
///
/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader
///
/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher`
///
/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process
///
pub async fn run_daemons(
proc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>,
@ -167,6 +179,19 @@ pub async fn run_daemons(
tokio::task::yield_now().await;
}
// check process status daemon
/// # Fn `run_daemons`
/// ## func to async exec subjobs of checking process, services and files states
///
/// *input* : `Arc<TrackingProcess>`, `Arc<mpsc::Sender<u8>>`, `Arc<tokio::sync::Mutex<Vec<Inotify>>>`
///
/// *output* : ()
///
/// *initiator* : fn `run_daemons`
///
/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers
///
/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}`
///
pub async fn running_handler(
prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>,
@ -199,6 +224,19 @@ pub async fn running_handler(
}
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
/// # Fn `get_container_id`
/// ## for getting container id used in logs
///
/// *input* : -
///
/// *output* : Some(String) if cont-id was grubbed | None - if not
///
/// *initiator* : fn `options::logger::setup_logger`
///
/// *managing* : -
///
/// *depends on* : -
///
pub fn get_container_id() -> Option<String> {
match Command::new(GET_ID_CMD).output() {
Ok(output) => {

View File

@ -14,18 +14,18 @@ use crate::utils::get_container_id;
// type PacketBuffer = Arc<Mutex<Vec<PacketInfo>>>;
/// # Fn init_metrics_grubber
/// # Fn `init_metrics_grubber`
/// ## for initializing process of unstoppable grubbing metrics.
///
/// *input* : `Result<()>`
/// *input* : `Arc<Mutex<UnixSocket>>` ??
///
/// *output* : `Err` if it cant create grubbers | `Ok` on finish
///
/// *initiator* : main thread
/// *initiator* : main thread ??
///
/// *managing* : object of unix-socket reader
///
/// *depends on* : network activity
/// *depends on* : -
///
pub async fn init_metrics_grubber() {
let mut system = System::new();
@ -79,8 +79,19 @@ async fn gather_metrics(proc: Arc<Process>) {
// }
// !!!
// for container (whole system metrics)
/// # Fn `get_all_container_metrics`
/// ## for gathering all container (whole system metrics)
///
/// *input* : `Arc<System>`, `Arc<Vec<TrackingProcess>>`
///
/// *output* : `ContainerMetrics`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `System` object, ref counter to list of processes
///
/// *depends on* : `TrackingProcess`
///
async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProcess>>) -> ContainerMetrics {
let metrics = join!(
get_cpu_metrics_container(sys.clone()),
@ -94,22 +105,74 @@ async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProce
metrics.2
)
}
/// # Fn `get_cpu_metrics_container`
/// ## for gathering container cpu metrics
///
/// *input* : `Arc<System>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `System` object
///
/// *depends on* : -
///
async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
sys.global_cpu_usage()
}
/// # Fn `get_ram_metrics_container`
/// ## for gathering container ram metrics
///
/// *input* : `Arc<System>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `System` object
///
/// *depends on* : -
///
async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
(sys.used_memory() / sys.total_memory()) as f32 * 100.0
}
// async fn get_mem_metrics_container(sys: Arc<System>) -> f32 {
// sys.
// }
/// # Fn `get_subsystems`
/// ## for gathering info about container subsystems (processes)
///
/// *input* : `Arc<Vec<TrackingProcess>>`
///
/// *output* : `Vec<String>`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to list of `TrackingProcess`
///
/// *depends on* : `TrackingProcess`
///
async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
prcs.iter().map(|process| process.name.clone()).collect()
}
// !!!
// for process (process metrics)
// %
/// # Fn `get_all_metrics_process`
/// ## for gathering all process' metrics
///
/// *input* : `Arc<Process>`, `Arc<System>`
///
/// *output* : `ProcessMetrics`
///
/// *initiator* : main thread ??
///
/// *managing* : two ref counters to `Process` and `System`
///
/// *depends on* : -
///
async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> ProcessMetrics {
let metrics = join!(
get_cpu_metrics_process(proc.clone()),
@ -121,10 +184,37 @@ async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> Proces
metrics.1
)
}
/// # Fn `get_cpu_metrics_process`
/// ## for gathering process cpu metrics
///
/// *input* : `Arc<Process>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `Process` object
///
/// *depends on* : -
///
async fn get_cpu_metrics_process(proc: Arc<Process>) -> f32 {
proc.cpu_usage()
}
// %
/// # Fn `get_ram_metrics_process`
/// ## for gathering process ram metrics
///
/// *input* : `Arc<Process>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `Process` object
///
/// *depends on* : -
///
async fn get_ram_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> f32 {
(proc.memory() as f64 / sys.total_memory() as f64) as f32 * 100.0 as f32
}

View File

@ -1,10 +1,22 @@
use crate::options::structs::CustomError;
use log::{error, warn};
use std::io;
use std::process::{Command, Output};
use std::sync::Arc;
use tokio::time::Duration;
/// # Fn `get_pid`
/// ## for initializing process of unstoppable grubbing metrics.
///
/// *input* : `&str`
///
/// *output* : `None` if cant get process PID | `Some(Output)` on success
///
/// *initiator* : fn `is_frozen`, fn `utils::files::file_handler`, fn `utils::files::service_handler`
///
/// *managing* : process name
///
/// *depends on* : -
///
pub async fn get_pid(name: &str) -> Option<Output> {
let name = Arc::new(name.to_string());
let res =
@ -20,8 +32,20 @@ pub async fn get_pid(name: &str) -> Option<Output> {
None
}
}
// ! can be with bug !!!
// * APPROVED
/// # Fn `is_active`
/// ## for checking process's activity state
///
/// *input* : `&str`
///
/// *output* : `true` if process running | `false` if not
///
/// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler`
///
/// *managing* : process name
///
/// *depends on* : -
///
pub async fn is_active(name: &str) -> bool {
let arc_name = Arc::new(name.to_string());
tokio::task::spawn_blocking(move || {
@ -38,7 +62,19 @@ pub async fn is_active(name: &str) -> bool {
.unwrap()
}
// T is for stopped processes
/// # Fn `is_frozen`
/// ## for checking process's hibernation state
///
/// *input* : `&str`
///
/// *output* : `true` if process is frozen | `false` if not
///
/// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler`
///
/// *managing* : process name
///
/// *depends on* : fn `get_pid`
///
pub async fn is_frozen(name: &str) -> bool {
let temp: Output;
if let Some(output) = get_pid(name).await {
@ -66,6 +102,20 @@ pub async fn is_frozen(name: &str) -> bool {
.unwrap()
}
}
/// # Fn `terminate_process`
/// ## for stop current process
///
/// *input* : `&str`
///
/// *output* : ()
///
/// *initiator* : fn `utils::files::file_handler`, fn `utils::files::service_handler`, fn `utils::run_daemons`
///
/// *managing* : process name
///
/// *depends on* : -
///
pub async fn terminate_process(name: &str) {
let _ = Command::new("pkill")
.arg(name)
@ -76,6 +126,19 @@ pub async fn terminate_process(name: &str) {
});
}
/// # Fn `terminate_process`
/// ## for freeze/hibernate current process
///
/// *input* : `&str`
///
/// *output* : ()
///
/// *initiator* : fn `utils::run_daemons`
///
/// *managing* : process name
///
/// *depends on* : -
///
pub async fn freeze_process(name: &str) {
let _ = Command::new("pkill")
.args(["-STOP", name])
@ -85,6 +148,20 @@ pub async fn freeze_process(name: &str) {
std::process::exit(101);
});
}
/// # Fn `unfreeze_process`
/// ## for unfreeze/hibernate current process
///
/// *input* : `&str`
///
/// *output* : ()
///
/// *initiator* : fn `utils::run_daemons`
///
/// *managing* : process name
///
/// *depends on* : -
///
pub async fn unfreeze_process(name: &str) {
let _ = Command::new("pkill")
.args(["-CONT", name])
@ -94,12 +171,39 @@ pub async fn unfreeze_process(name: &str) {
std::process::exit(101);
});
}
/// # Fn `restart_process`
/// ## for restarting current process
///
/// *input* : `&str`, &str
///
/// *output* : ()
///
/// *initiator* : fn `utils::run_daemons`
///
/// *managing* : process name and path to its exec file
///
/// *depends on* : fn `start_process`, fn `terminate_process`
///
pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> {
terminate_process(name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
start_process(name, path).await
}
/// # Fn `start_process`
/// ## for starting current process
///
/// *input* : `&str`, &str
///
/// *output* : ()
///
/// *initiator* : fn `restart_process`
///
/// *managing* : process name and path to its exec file
///
/// *depends on* : -
///
pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
// let runsh = format!("{} {}", "exec", path);
let mut command = Command::new(path);

View File

@ -6,6 +6,19 @@ use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
/// # Fn `service_handler`
/// ## function to realize mechanism of current process' dep services monitoring
///
/// *input* : `&str`, `&Vec<Services>`, `Arc<mpsc::Sender<u8>>`
///
/// *output* : ()
///
/// *initiator* : fn `utils::running_handler`
///
/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer
///
/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting`
///
pub async fn service_handler(
name: &str,
services: &Vec<Services>,
@ -54,6 +67,19 @@ pub async fn service_handler(
Ok(())
}
/// # Fn `looped_service_connecting`
/// ## for service's state check in loop (with delay and restriction of attempts)
///
/// *input* : `&str`, `&Services`
///
/// *output* : Ok(()) if service now available | Err(er) if still not
///
/// *initiator* : fn `service_handler`
///
/// *managing* : process name, current service struct
///
/// *depends on* : fn `check_service`
///
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
if serv.triggers.wait == 0 {
loop {
@ -103,6 +129,19 @@ async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), Cu
}
}
/// # Fn `check_service`
/// ## for check current service's availiability
///
/// *input* : `&str`, `&u32`
///
/// *output* : Ok(()) if service now available | Err(er) if still not
///
/// *initiator* : fn `service_handler`, fn `looped_service_connecting`
///
/// *managing* : hostname, port
///
/// *depends on* : -
///
// ! have to be rewritten
// todo: rewrite use
async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> {