From c76b615341ca1a3c82bec15043be4c8a8417647a Mon Sep 17 00:00:00 2001 From: prplV Date: Fri, 30 May 2025 18:07:35 +0300 Subject: [PATCH] with bus prc manipulation --- noxis-rs/src/main.rs | 114 ++++-------- noxis-rs/src/options/cli_pipeline.rs | 108 +++++++++++- noxis-rs/src/options/structs.rs | 26 ++- noxis-rs/src/utils.rs | 82 +++++++-- noxis-rs/src/utils/bus.rs | 11 +- noxis-rs/src/utils/metrics.rs | 255 ++++++++++++++------------- noxis-rs/src/utils/prcs.rs | 5 + 7 files changed, 367 insertions(+), 234 deletions(-) diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 3f1c6fd..32565f8 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -4,14 +4,16 @@ mod utils; use log::{error, info}; use options::logger::setup_logger; use options::signals::set_valid_destructor; -use options::structs::Processes; +use options::structs::ProcessUnit; +use options::structs::{Processes, bus::BusMessage}; use options::cli_pipeline::init_cli_pipeline; use std::sync::Arc; use std::time::Duration; use options::preboot::PrebootParams; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::{broadcast, oneshot, mpsc}; use options::config::v2::init_config_mechanism; use utils::v2::init_monitoring; +use utils::bus::Bus; #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ @@ -27,6 +29,25 @@ async fn main() -> anyhow::Result<()>{ let (tx_oneshot, rx_oneshot) = oneshot::channel::(); let mut handler: Vec> = vec![]; + // to BUS channel + let (tx_to_bus, mut rx_to_bus) = mpsc::channel::(5); + // from BUS channels + let (tx_to_cli, mut rx_to_cli) = mpsc::channel::(5); + let (tx_to_supervisor, mut rx_to_supervisor) = mpsc::channel::(5); + let (tx_to_metrics, mut rx_to_metrics) = mpsc::channel::(5); + + let tx_to_bus = Arc::new(tx_to_bus); + let tx_to_cli = Arc::new(tx_to_cli); + let tx_to_supervisor = Arc::new(tx_to_supervisor); + let tx_to_metrics = Arc::new(tx_to_metrics); + + let bus_module = tokio::spawn(async move { + let mut bus = Bus::new(rx_to_bus, tx_to_cli.clone(), tx_to_supervisor.clone(), tx_to_metrics.clone()); + bus.process().await; + error!("Info Bus crushed !"); + }); + handler.push(bus_module); + // initilaizing task for config manipulations let preboot_config = preboot.clone(); let config_module = tokio::spawn(async move { @@ -39,6 +60,7 @@ async fn main() -> anyhow::Result<()>{ handler.push(config_module); // initilaizing task for cli manipulation + let tx_bus = tx_to_bus.clone(); let preboot_cli = preboot.clone(); let cli_module = tokio::spawn(async move { let config = { @@ -54,7 +76,9 @@ async fn main() -> anyhow::Result<()>{ if let Err(er) = init_cli_pipeline( preboot_cli, Arc::new(config), - tx_oneshot + tx_oneshot, + rx_to_cli, + tx_bus.clone() ).await { error!("CLI pipeline failed due to {}", er) } @@ -70,6 +94,7 @@ async fn main() -> anyhow::Result<()>{ }); handler.push(ctrlc); + let tx_bus = tx_to_bus.clone(); let monitoring = tokio::spawn(async move { let config = { let mut tick = tokio::time::interval(Duration::from_millis(500)); @@ -81,7 +106,11 @@ async fn main() -> anyhow::Result<()>{ } } }; - if let Err(er) = init_monitoring(config).await { + if let Err(er) = init_monitoring( + config, + rx_to_supervisor, + tx_bus + ).await { error!("Monitoring mod failed due to {}", er); } }); @@ -90,80 +119,5 @@ async fn main() -> anyhow::Result<()>{ for i in handler { let _ = i.await; } - - // setting up redis connection \ - // then conf checks to choose the most actual \ - // let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { - // error!("No actual configuration for runner. Stopping..."); - // std::process::exit(1); - // }); - // - // info!( - // "Current runner configuration: {}", - // &processes.date_of_creation - // ); - // info!("Runner is ready. Initializing..."); - // - // if processes.processes.is_empty() { - // error!("Processes list is null, runner-rs initialization is stopped"); - // return Err(Error::msg("Empty processes segment in config")); - // } - // let mut handler: Vec> = vec![]; - // // is in need to send to the signals handler thread - // let mut senders: Vec>> = vec![]; - // - // for proc in processes.processes.iter() { - // info!( - // "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", - // proc.name, - // proc.path, - // proc.dependencies.files.len(), - // proc.dependencies.services.len() - // ); - // - // // creating msg channel - // // can or should be executed in new thread - // let (tx, mut rx) = mpsc::channel::(1); - // let proc = Arc::new(proc.clone()); - // let tx = Arc::new(tx.clone()); - // - // senders.push(Arc::clone(&tx.clone())); - // - // let event = tokio::spawn(async move { - // run_daemons(proc.clone(), tx.clone(), &mut rx).await; - // }); - // handler.push(event); - // } - // - // // destructor addition - // handler.push(tokio::spawn(async move { - // if set_valid_destructor(Arc::new(senders)).await.is_err() { - // error!("Linux signals handler creation failed. Terminating main thread..."); - // return; - // } - // - // tokio::time::sleep(Duration::from_millis(200)).await; - // info!("End of job. Terminating main thread..."); - // std::process::exit(0); - // })); - // - // // remote config update subscription - // handler.push(tokio::spawn(async move { - // let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; - // })); - // - // // cli pipeline - // handler.push(tokio::spawn(async move { - // let _ = init_cli_pipeline().await; - // })); - // - // for i in handler { - // let _ = i.await; - // } Ok(()) -} - -// todo: integration tests -// todo: config pulling mechanism rework (socket) -// todo: tasks management after killing all processes -// todo: +} \ No newline at end of file diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index 199a9ba..578c6fa 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -2,16 +2,23 @@ use log::{error, info}; use tokio::net::{ UnixStream, UnixListener }; use tokio::sync::{Mutex, OnceCell}; use tokio::time::{sleep, Duration}; +use std::any::Any; use std::fs; use std::sync::Arc; use tokio::io::{ AsyncWriteExt, AsyncReadExt}; -use noxis_cli::Cli; +use noxis_cli::{Cli, ProcessAction}; +use crate::options::structs::bus::InternalCli; + use super::structs::Processes; +use super::structs::bus::BusMessage; use super::preboot::PrebootParams; type ConfigGateway = tokio::sync::oneshot::Sender; type ProcessedConfigGateway = Arc>>; +type BusReciever = tokio::sync::mpsc::Receiver; +type BusSender = Arc>; +type ReadyBusReciever = Arc>>; /// # Fn `init_cli_pipeline` /// ## for catching all input requests from CLI @@ -30,6 +37,8 @@ pub async fn init_cli_pipeline( params: Arc, config : Arc, config_gateway : ConfigGateway, + bus_reciever : BusReciever, + bus_sender : BusSender, ) -> anyhow::Result<()> { let socket_path = ¶ms.self_socket; let _ = fs::remove_file(socket_path); @@ -39,6 +48,7 @@ pub async fn init_cli_pipeline( OnceCell::new_with(Some(config_gateway)) ) ); + let bus_reciever = Arc::new(Mutex::new(bus_reciever)); match UnixListener::bind(socket_path) { Ok(list) => { @@ -52,8 +62,10 @@ pub async fn init_cli_pipeline( let params = params.clone(); let config = config.clone(); let config_gateway = config_gateway.clone(); + let bus_reciever = bus_reciever.clone(); + let bus_sender = bus_sender.clone(); tokio::spawn(async move { - process_connection(socket, params.clone(), config.clone(), config_gateway.clone()).await; + process_connection(socket, params.clone(), config.clone(), config_gateway.clone(), bus_reciever, bus_sender).await; }); }, Err(er) => { @@ -84,7 +96,14 @@ pub async fn init_cli_pipeline( /// /// *depends on* : `tokio::net::TcpStream` /// -async fn process_connection(mut stream: UnixStream, params: Arc, config : Arc, cfg_gateway : ProcessedConfigGateway) { +async fn process_connection( + mut stream: UnixStream, + params: Arc, + config : Arc, + cfg_gateway : ProcessedConfigGateway, + bus_reciever : ReadyBusReciever, + bus_sender : BusSender, +) { let mut buf = vec![0; 1024]; match stream.read(&mut buf).await { Ok(0) => { @@ -96,7 +115,7 @@ async fn process_connection(mut stream: UnixStream, params: Arc, match serde_json::from_slice::(&buf) { Ok(cli) => { info!("Received CLI request: {:?}", cli); - let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone()).await { + let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone(), bus_reciever.clone(), bus_sender.clone()).await { Ok(response) => { response }, @@ -121,7 +140,14 @@ async fn process_connection(mut stream: UnixStream, params: Arc, } -async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Arc, cfg_gateway: ProcessedConfigGateway) -> anyhow::Result { +async fn process_cli_cmd( + cli : Cli, + params: Arc, + global_config : Arc, + cfg_gateway: ProcessedConfigGateway, + bus_reciever : ReadyBusReciever, + bus_sender : BusSender, +) -> anyhow::Result { use noxis_cli::{Commands, ConfigAction}; return match cli.command { Commands::Config(config) => { @@ -170,6 +196,78 @@ async fn process_cli_cmd(cli : Cli, params: Arc, global_config : // _ => Err(anyhow::Error::msg("Unrecognized command from CLI")) } }, + Commands::Process(prc) => { + use crate::options::structs::bus::{BusMessageDirection, BusMessageContentType, CLiCommand}; + + let proc_name = prc.process; + let req = BusMessage::Request( + BusMessageDirection::ToSupervisor, + BusMessageContentType::Cli, + Box::new( + match prc.action { + ProcessAction::Start => { + InternalCli { + prc : proc_name, + cmd : CLiCommand::Start, + } + }, + ProcessAction::Stop => { + InternalCli { + prc : proc_name, + cmd : CLiCommand::Stop, + } + }, + ProcessAction::Restart => { + InternalCli { + prc : proc_name, + cmd : CLiCommand::Restart, + } + }, + ProcessAction::Freeze => { + InternalCli { + prc : proc_name, + cmd : CLiCommand::Freeze, + } + }, + ProcessAction::Unfreeze => { + InternalCli { + prc : proc_name, + cmd : CLiCommand::Unfreeze, + } + }, + /* TODO: ALL CMDS */ + _ => { + InternalCli { + prc : proc_name, + cmd : CLiCommand::Restart, + } + }, + } + ) + ); + let mut bus = bus_reciever.lock().await; + bus_sender.send(req).await?; + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + let resp = tokio::time::timeout(std::time::Duration::from_secs(10), async move { + loop { + if let Ok(cont) = bus.try_recv() { + return cont + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + }).await?; + + if let BusMessage::Response(_, _, content) = resp { + let content: Box = content; + if let Ok(resp) = content.downcast::() { + return Ok(*resp) + } else { + // TODO : REWRITE THIS + return Ok(String::from("OK")); + } + } + Ok(String::from("OK")) + }, /* */ Commands::Status => Ok(String::from("Ok")), _ => Ok(String::from("Ok")) diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 16faf23..f40be37 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -4,7 +4,7 @@ use std::net::Ipv4Addr; use serde::{Deserialize, Serialize}; use async_trait::async_trait; use std::sync::Arc; - +use std::any::Any; pub mod bus { use std::fmt::Debug; @@ -35,7 +35,22 @@ pub mod bus { MetricsObj, } - pub trait BusContent: Send + Sync + 'static + Debug { + #[derive(Debug)] + pub enum CLiCommand { + Start, + Stop, + Restart, + Freeze, + Unfreeze + } + + #[derive(Debug)] + pub struct InternalCli { + pub prc : String, + pub cmd : CLiCommand, + } + + pub trait BusContent: Send + Sync + 'static + Debug + Any { fn get_bus_type(&self) -> BusMessageContentType; } @@ -49,11 +64,16 @@ pub mod bus { BusMessageContentType::Cli } } - impl BusContent for dyn MetricsExportable { + impl BusContent for InternalCli { fn get_bus_type(&self) -> BusMessageContentType { BusMessageContentType::Cli } } + impl BusContent for dyn MetricsExportable { + fn get_bus_type(&self) -> BusMessageContentType { + BusMessageContentType::MetricsObj + } + } } #[derive(Debug)] diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 66f2f09..a58a4d4 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -16,6 +16,7 @@ use files::v2::FilesController; use services::v2::ServicesController; use async_trait::async_trait; use lazy_static::lazy_static; +use crate::options::structs::bus::{BusMessage, InternalCli, BusMessageContentType}; lazy_static! { static ref GET_ID_CMD : &'static str = "hostname"; @@ -25,8 +26,12 @@ lazy_static! { pub mod v2 { use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; - use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers}; + use crate::options::structs::{bus::CLiCommand, Events, FileTriggersForController, ProcessUnit, Triggers}; use super::*; + use std::any::Any; + + type BusReciever = tokio::sync::mpsc::Receiver; + type BusSender = Arc>; #[derive(Debug)] enum ControllerResult { @@ -41,11 +46,12 @@ pub mod v2 { files : LinkedList, services : LinkedList, config : Arc, + bus : (BusReciever, BusSender), } impl Supervisor { - pub fn new() -> Supervisor { - Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()) } + pub fn new(bus_reciever : BusReciever, bus_sender: BusSender) -> Supervisor { + Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()), bus : (bus_reciever, bus_sender) } } pub async fn with_config(mut self, config: Processes) -> Supervisor { self.config = Arc::from(config); @@ -123,12 +129,61 @@ pub mod v2 { info!("Initializing monitoring ..."); loop { // - // todo: CHANNEL check and reaction - // - // dbg!(&self); + let rec = &mut self.bus.0; + while let Ok(request) = rec.try_recv(){ + if let BusMessage::Request(_, _, cont) = request { + let cont: Box = cont; + if let Ok(cli) = cont.downcast::() { + let mut count = 0; + let fut = (&mut self.prcs).into_iter() + .find(|prc| prc.name == Arc::from(cli.prc.as_ref())) + .map(|prc| async { + let count = &mut count; + *count += 1; + let res = match cli.cmd { + CLiCommand::Start => { + prc.start_by_user_call().await + }, + CLiCommand::Stop => { + prc.stop_by_user_call().await + }, + CLiCommand::Restart => { + prc.restart_by_user_call().await + }, + CLiCommand::Freeze => { + prc.freeze_by_user_call().await + }, + CLiCommand::Unfreeze => { + prc.unfreeze_by_user_call().await + }, + }; + let sender = self.bus.1.clone(); + let resp_content = match res { + Ok(_) => format!("Ok on user call abour process {}", prc.name), + Err(er) => format!("Error: User call for process {} failed : {}", prc.name, er), + }; + let _ = sender.send(BusMessage::Response( + crate::options::structs::bus::BusMessageDirection::ToCli, + BusMessageContentType::RawString, + Box::new(resp_content) + )).await; + 1 + }); + if let Some(fut) = fut { + fut.await; + } + else { + let _ = self.bus.1.clone().send(BusMessage::Response( + crate::options::structs::bus::BusMessageDirection::ToCli, + BusMessageContentType::RawString, + Box::new(format!("No process named `{}` was found in controlled scope", cli.prc)) + )).await; + } + } + // TODO: GET PRCS METRICS DOWNCASTING + } + } let mut tasks: Vec> = vec![]; - // let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap()); - // let res = tokio::join!(prc.process(), file.process(), serv.process()); if let Some(mut val) = self.prcs.pop_front() { tasks.push( tokio::spawn( async move { @@ -167,15 +222,12 @@ pub mod v2 { } } - // spawn tasks - // spawn prc - // spawn files - // spawn services - // ## for ... i.await in loop pub async fn init_monitoring( - config: Processes + config: Processes, + bus_reciever : BusReciever, + bus_sender : BusSender, ) -> anyhow::Result<()> { - let mut supervisor = Supervisor::new().with_config(config).await; + let mut supervisor = Supervisor::new(bus_reciever, bus_sender).with_config(config).await; info!("Monitoring: {} ", &supervisor.get_stats()); supervisor.process().await; Ok(()) diff --git a/noxis-rs/src/utils/bus.rs b/noxis-rs/src/utils/bus.rs index 7f060a4..b981636 100644 --- a/noxis-rs/src/utils/bus.rs +++ b/noxis-rs/src/utils/bus.rs @@ -19,6 +19,7 @@ impl Highway { Self { to_cli, to_supervisor, to_metrics } } async fn send(&self, msg: BusMessage) -> anyhow::Result<()> { + trace!("redirecting message - {:?} ...", &msg); let dir = match &msg { BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => dir, }; @@ -48,7 +49,7 @@ pub struct Bus { } impl Bus { - fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self { + pub fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self { Self { inner, highway: Highway::new(to_cli, to_supervisor, to_metrics) } } } @@ -59,14 +60,14 @@ impl ProcessUnit for Bus { loop { // TODO while let Ok(content) = self.inner.try_recv() { - debug!("New message to the Bus : {:?}", &content); + debug!("new message to the Bus : {:?}", &content); let msg = match content { BusMessage::Request(direction, content_type, content) => { - trace!("Bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type); - BusMessage::Response(direction, content_type, content) + trace!("bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type); + BusMessage::Request(direction, content_type, content) }, BusMessage::Response(direction, content_type, content) => { - trace!("Bus has got a new Response with direction {:?} and type {:?}", &direction, &content_type); + trace!("bus has got a new Response with direction {:?} and type {:?}", &direction, &content_type); BusMessage::Response(direction, content_type, content) }, }; diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index 9db5eb9..5daeade 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -9,6 +9,7 @@ use crate::options::structs::Dependencies; use serde::Serialize; use super::prcs::v2::Pid; use std::fmt::Debug; +use crate::options::structs::bus::BusMessage; // use pcap::{Device, Capture, Active}; // use std::net::Ipv4Addr; // use anyhow::{Result, Ok}; @@ -19,6 +20,134 @@ type CoreUsage = BTreeMap; type Disks = Vec; type Ifaces = Vec; pub type MetricProcesses = Vec; +type BusReciever = tokio::sync::mpsc::Receiver; +type BusSender = Arc>; + +/// # Fn `init_metrics_grubber` +/// ## for initializing process of unstoppable grubbing metrics. +/// +/// *input* : `Arc>` ?? +/// +/// *output* : `Err` if it cant create grubbers | `Ok` on finish +/// +/// *initiator* : main thread ?? +/// +/// *managing* : object of unix-socket reader +/// +/// *depends on* : - +/// +#[allow(dead_code)] +pub async fn init_metrics_grubber( + /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ +) { + let mut system = System::new(); + get_all_metrics(&mut system).await; +} + +async fn get_all_metrics(system: &mut System) { + system.refresh_all(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + dbg!(get_host_info().await); + dbg!(get_cpu_metrics(system).await); + dbg!(get_ram_metrics(system).await); + dbg!(get_all_disks_metrics().await); + dbg!(get_all_ifaces_metrics().await); +} + +async fn get_host_info() -> HostInfo { + HostInfo { + hostname : System::host_name().unwrap_or_default(), + os : System::long_os_version().unwrap_or_default(), + kernel : System::kernel_version().unwrap_or_default(), + } +} + +async fn get_cpu_metrics(system: &mut System) -> Cpu { + system.refresh_cpu_all(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let mut buffer = CoreUsage::new(); + let global_usage = system.global_cpu_usage(); + + system.cpus() + .iter() + .enumerate() + .for_each(|(id, cpu)| { + let core_info = CoreInfo { + // id, + brand : cpu.brand().to_string(), + name : cpu.name().to_string(), + frequency : cpu.frequency(), + vendor_id : cpu.vendor_id().to_string(), + usage : cpu.cpu_usage(), + }; + // buffer.push(core_info); + buffer.entry(id).or_insert(core_info); + }); + + Cpu { + global_usage, + usage: buffer + } +} + +async fn get_ram_metrics(system: &mut System) -> Ram { + system.refresh_memory(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + Ram { + free_mem : system.free_memory(), + free_swap : system.free_swap(), + total_mem : system.total_memory(), + total_swap : system.total_swap(), + } +} + +async fn get_all_disks_metrics() -> Disks { + let disks = DisksList::new_with_refreshed_list(); + let mut buffer = Disks::new(); + disks.list() + .iter() + .for_each(|disk| { + let disk = Disk { + name : disk.name().to_string_lossy().into_owned(), + kind: disk.kind().to_string(), + fs : disk.file_system().to_string_lossy().into_owned(), + mount_point : disk.mount_point().to_string_lossy().into_owned(), + total_space : disk.total_space(), + available_space : disk.available_space(), + is_removable : disk.is_removable(), + is_readonly : disk.is_read_only() + }; + buffer.push(disk); + }); + buffer +} + +async fn get_all_ifaces_metrics() -> Ifaces { + let mut ifaces = Ifaces::new(); + let networks = Networks::new_with_refreshed_list(); + networks.iter() + .for_each(|(iface_name, data)| { + let mac = data.mac_address().to_string(); + let iface = Network { + iname : iface_name.to_owned(), + mac : mac, + recieved : data.received(), + transmitted : data.transmitted(), + total_recieved_bytes : data.total_received(), + total_transmitted_bytes : data.total_transmitted(), + total_recieved_packets : data.total_packets_received(), + total_transmitted_packets : data.total_packets_transmitted(), + errors_on_recieved : data.errors_on_received(), + errors_on_transmitted : data.errors_on_transmitted(), + }; + ifaces.push(iface); + }); + ifaces +} + +async fn get_all_processes_metrics(system: &mut System) {} pub enum MetricType { FullMetrics, @@ -190,132 +319,6 @@ impl MetricsExportable for MetricProcesses { } } -/// # Fn `init_metrics_grubber` -/// ## for initializing process of unstoppable grubbing metrics. -/// -/// *input* : `Arc>` ?? -/// -/// *output* : `Err` if it cant create grubbers | `Ok` on finish -/// -/// *initiator* : main thread ?? -/// -/// *managing* : object of unix-socket reader -/// -/// *depends on* : - -/// -#[allow(dead_code)] -pub async fn init_metrics_grubber( - /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ -) { - let mut system = System::new(); - get_all_metrics(&mut system).await; -} - -async fn get_all_metrics(system: &mut System) { - system.refresh_all(); - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - dbg!(get_host_info().await); - dbg!(get_cpu_metrics(system).await); - dbg!(get_ram_metrics(system).await); - dbg!(get_all_disks_metrics().await); - dbg!(get_all_ifaces_metrics().await); -} - -async fn get_host_info() -> HostInfo { - HostInfo { - hostname : System::host_name().unwrap_or_default(), - os : System::long_os_version().unwrap_or_default(), - kernel : System::kernel_version().unwrap_or_default(), - } -} - -async fn get_cpu_metrics(system: &mut System) -> Cpu { - system.refresh_cpu_all(); - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - let mut buffer = CoreUsage::new(); - let global_usage = system.global_cpu_usage(); - - system.cpus() - .iter() - .enumerate() - .for_each(|(id, cpu)| { - let core_info = CoreInfo { - // id, - brand : cpu.brand().to_string(), - name : cpu.name().to_string(), - frequency : cpu.frequency(), - vendor_id : cpu.vendor_id().to_string(), - usage : cpu.cpu_usage(), - }; - // buffer.push(core_info); - buffer.entry(id).or_insert(core_info); - }); - - Cpu { - global_usage, - usage: buffer - } -} - -async fn get_ram_metrics(system: &mut System) -> Ram { - system.refresh_memory(); - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - Ram { - free_mem : system.free_memory(), - free_swap : system.free_swap(), - total_mem : system.total_memory(), - total_swap : system.total_swap(), - } -} - -async fn get_all_disks_metrics() -> Disks { - let disks = DisksList::new_with_refreshed_list(); - let mut buffer = Disks::new(); - disks.list() - .iter() - .for_each(|disk| { - let disk = Disk { - name : disk.name().to_string_lossy().into_owned(), - kind: disk.kind().to_string(), - fs : disk.file_system().to_string_lossy().into_owned(), - mount_point : disk.mount_point().to_string_lossy().into_owned(), - total_space : disk.total_space(), - available_space : disk.available_space(), - is_removable : disk.is_removable(), - is_readonly : disk.is_read_only() - }; - buffer.push(disk); - }); - buffer -} - -async fn get_all_ifaces_metrics() -> Ifaces { - let mut ifaces = Ifaces::new(); - let networks = Networks::new_with_refreshed_list(); - networks.iter() - .for_each(|(iface_name, data)| { - let mac = data.mac_address().to_string(); - let iface = Network { - iname : iface_name.to_owned(), - mac : mac, - recieved : data.received(), - transmitted : data.transmitted(), - total_recieved_bytes : data.total_received(), - total_transmitted_bytes : data.total_transmitted(), - total_recieved_packets : data.total_packets_received(), - total_transmitted_packets : data.total_packets_transmitted(), - errors_on_recieved : data.errors_on_received(), - errors_on_transmitted : data.errors_on_transmitted(), - }; - ifaces.push(iface); - }); - ifaces -} - -async fn get_all_processes_metrics(system: &mut System) {} - #[cfg(test)] mod metrics_unittets { #[tokio::test] diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index dfb6c7e..2fe958f 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -154,6 +154,7 @@ pub mod v2 { #[allow(unused)] pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> { terminate_process(&self.name).await?; + warn!("Process {} was stopped by user call ...", self.name); self.state = ProcessState::StoppedByCli; self.pid = Pid::new(); Ok(()) @@ -161,12 +162,14 @@ pub mod v2 { #[allow(unused)] pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> { freeze_process(&self.name).await?; + warn!("Process {} was frozen by user call ...", self.name); self.state = ProcessState::HoldingByCli; Ok(()) } #[allow(unused)] pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> { let pid = start_process(&self.name, &self.bin).await?; + warn!("Process {} was started by user call ...", self.name); self.state = ProcessState::Pending; self.pid = Pid(pid); Ok(()) @@ -174,12 +177,14 @@ pub mod v2 { #[allow(unused)] pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> { unfreeze_process(&self.name).await?; + warn!("Process {} was unfrozen by user call ...", self.name); self.state = ProcessState::Pending; Ok(()) } #[allow(unused)] pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> { let pid = restart_process(&self.name, &self.bin).await?; + warn!("Process {} was restarted by user call ...", self.name); self.pid = Pid(pid); Ok(()) }