///! Submodule needed to get metrics such as ///! cpu load, ram/rom load and net activity use crate::{ options::structs::ProcessState, utils::metrics::processes::{ProcessesGeneral, ProcessesQuery}, }; use log::warn; use noxis_cli::metrics_models::MetricsMode; use std::{any::Any, collections::BTreeMap, sync::Arc}; // use chrono::Duration; use super::prcs::v2::Pid; use crate::options::structs::bus::{BusMessage, BusMessageContentType, BusMessageDirection}; use serde::Serialize; use std::fmt::Debug; use sysinfo::{Disks as DisksList, Networks, System}; // use noxis_cli::metrics_models::MetricsMode; pub type MetricProcesses = Vec; type CoreUsage = BTreeMap; type Disks = Vec; type Ifaces = Vec; type BusReciever = tokio::sync::mpsc::Receiver; type BusSender = Arc>; /// # Fn `init_metrics_grubber` /// ## for initializing process of unstoppable grubbing metrics. /// /// *input* : `Arc>` ?? /// /// *output* : `Err` if it cant create grubbers | `Ok` on finish /// /// *initiator* : main thread ?? /// /// *managing* : object of unix-socket reader /// /// *depends on* : - /// pub async fn init_metrics_grubber( /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ bus_sender: BusSender, bus_reciever: BusReciever, ) -> anyhow::Result<()> { let mut system = System::new(); let mut disks = DisksList::new_with_refreshed_list(); let mut networks = Networks::new_with_refreshed_list(); // get_all_metrics(&mut system).await; /* TODO */ let mut bus_reciever = bus_reciever; loop { let msg = bus_reciever.try_recv(); if let Ok(BusMessage::Request(_, _, cont)) = msg { system.refresh_all(); disks.refresh_list(); networks.refresh_list(); let cont: Box = cont; match cont.downcast::() { Err(_) => { warn!("Unrecognized Metric mode was given"); let _ = bus_sender .send(BusMessage::Response( BusMessageDirection::ToCli, BusMessageContentType::Result, Box::new(Err(anyhow::Error::msg(format!( "Unrecognized Metric mode was given" )))), )) .await; } Ok(mode) => { tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let metric: Box = match *mode { MetricsMode::Full => { let mut refs = get_all_metrics(&mut system, bus_sender.clone(), &disks, &networks) .await; if let Some(prcs) = bus_reciever.recv().await { if let BusMessage::Response(_, _, cont) = prcs { let cont: Box = cont; if let Ok(cont) = cont.downcast::() { if let ProcessesQuery::General(info) = *cont { refs.processes = info; } } } } Box::new(refs) } MetricsMode::Host => { Box::new(get_global_host_info(&mut system, &disks, &networks).await) } MetricsMode::Cpu => Box::new(get_cpu_metrics(&mut system).await), MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await), MetricsMode::Rom => Box::new(get_all_disks_metrics(&disks).await), MetricsMode::Network => Box::new(get_all_ifaces_metrics(&networks).await), // inspect processes MetricsMode::Processes => { todo!(); } }; // let metric: Box = Box::new(metric); let metric = metric.serialze_into_output(); let _ = bus_sender .send(BusMessage::Response( BusMessageDirection::ToCli, BusMessageContentType::MetricsObj, Box::new(metric), )) .await; } } } else if let Ok(BusMessage::Response(_, _, cont)) = msg { let cont: Box = cont; if let Ok(info) = cont.downcast::() { if let ProcessesQuery::All(info) = *info { let procs: Vec<_> = info .into_iter() .map(|prc| ProcessExtended::from_process_query_all(&mut system, prc)) .collect(); let _ = bus_sender .send(BusMessage::Response( BusMessageDirection::ToCli, BusMessageContentType::Result, Box::>::new(Ok(serde_json::to_string_pretty( &procs, )?)), )) .await; } else { let _ = bus_sender .send(BusMessage::Response( BusMessageDirection::ToCli, BusMessageContentType::Result, Box::new(Err(anyhow::Error::msg(format!( "Unknown type was send by the Supervisor" )))), )) .await; } } else { let _ = bus_sender .send(BusMessage::Response( BusMessageDirection::ToCli, BusMessageContentType::Result, Box::new(Err(anyhow::Error::msg(format!( "Unknown type was send by the Supervisor" )))), )) .await; } } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } async fn get_all_metrics( system: &mut System, sender: BusSender, disks: &DisksList, networks: &Networks, ) -> FullMetrics { let host = get_host_info().await; let cpu = get_cpu_metrics(system).await; let ram = get_ram_metrics(system).await; let disks = get_all_disks_metrics(&disks).await; let ifaces = get_all_ifaces_metrics(&networks).await; let prcs: Vec = Vec::new(); let _ = sender .send(BusMessage::Request( BusMessageDirection::ToSupervisor, BusMessageContentType::ProcessQuery, Box::new(ProcessesQuery::QueryGeneral), )) .await; FullMetrics::create(host, cpu, ram, disks, ifaces, prcs) } async fn get_global_host_info( system: &mut System, disks: &DisksList, networks: &Networks, ) -> HostGeneral { HostGeneral { hostname: System::host_name().unwrap_or_default(), os: System::long_os_version().unwrap_or_default(), kernel: System::kernel_version().unwrap_or_default(), cpu_percentage: system.global_cpu_usage(), ram_available: system.total_memory() - system.free_memory(), disk_percentage: { let total = disks .iter() .map(|disk| disk.available_space() * 100 / disk.total_space()) .collect::>(); total.iter().sum::() / (total.len() as u64) }, net_stat: { let total = networks .iter() .map(|(_, iface_data)| iface_data.received() + iface_data.transmitted()) .collect::>(); total.iter().sum::() / ((total.len() * 2) as u64) }, } } async fn get_host_info() -> HostInfo { HostInfo { hostname: System::host_name().unwrap_or_default(), os: System::long_os_version().unwrap_or_default(), kernel: System::kernel_version().unwrap_or_default(), } } async fn get_cpu_metrics(system: &mut System) -> Cpu { let mut buffer = CoreUsage::new(); let global_usage = system.global_cpu_usage(); system.cpus().iter().enumerate().for_each(|(id, cpu)| { let core_info = CoreInfo { // id, brand: cpu.brand().to_string(), name: cpu.name().to_string(), frequency: cpu.frequency(), vendor_id: cpu.vendor_id().to_string(), usage: cpu.cpu_usage(), }; buffer.entry(id).or_insert(core_info); }); Cpu { global_usage, usage: buffer, } } async fn get_ram_metrics(system: &mut System) -> Ram { Ram { free_mem: system.free_memory(), free_swap: system.free_swap(), total_mem: system.total_memory(), total_swap: system.total_swap(), } } async fn get_all_disks_metrics(disks: &DisksList) -> Disks { // let disks = DisksList::new_with_refreshed_list(); let mut buffer = Disks::new(); disks.list().iter().for_each(|disk| { let disk = Disk { name: disk.name().to_string_lossy().into_owned(), kind: disk.kind().to_string(), fs: disk.file_system().to_string_lossy().into_owned(), mount_point: disk.mount_point().to_string_lossy().into_owned(), total_space: disk.total_space(), available_space: disk.available_space(), is_removable: disk.is_removable(), is_readonly: disk.is_read_only(), }; buffer.push(disk); }); buffer } async fn get_all_ifaces_metrics(networks: &Networks) -> Ifaces { let mut ifaces = Ifaces::new(); networks.iter().for_each(|(iface_name, data)| { let mac = data.mac_address().to_string(); let ip_addrs = data .ip_networks() .iter() .map(|ipaddr| format!("{}/{}", ipaddr.addr, ipaddr.prefix)) .collect::>(); let iface = Network { iname: iface_name.to_owned(), mac: mac, ip_addresses: ip_addrs, recieved: data.received(), transmitted: data.transmitted(), total_recieved_bytes: data.total_received(), total_transmitted_bytes: data.total_transmitted(), total_recieved_packets: data.total_packets_received(), total_transmitted_packets: data.total_packets_transmitted(), errors_on_recieved: data.errors_on_received(), errors_on_transmitted: data.errors_on_transmitted(), }; ifaces.push(iface); }); ifaces } pub mod processes { use crate::options::structs::ProcessState; use crate::utils::prcs::v2::Pid; #[derive(Debug, serde::Serialize)] pub enum ProcessesQuery { General(Vec), All(Vec), QueryGeneral, QueryAll, } #[derive(Debug, serde::Serialize)] pub struct ProcessesGeneral { pub name: String, pub state: ProcessState, pub pid: Pid, } #[derive(Debug, serde::Serialize)] pub struct ProcessesAll { pub name: String, pub state: ProcessState, pub pid: Pid, pub dependencies: deps::Dependencies, } pub mod deps { use crate::options::structs::{FileTriggers, ServiceState, ServiceTriggers}; use crate::utils::files::v2::FileState; // use super::*; #[derive(Debug, serde::Serialize)] pub struct FilesExtended { pub name: String, pub path: String, pub status: FileState, pub backup_file : String, pub triggers: FileTriggers, } #[derive(Debug, serde::Serialize)] pub struct ServicesExtended { pub name: String, pub access_name: String, pub status: ServiceState, pub triggers: ServiceTriggers, } #[derive(Debug, serde::Serialize)] pub struct Dependencies { pub files: Vec, pub services: Vec, } } } pub trait MetricsExportable: Send + Sync + 'static + Debug + Any { fn serialze_into_output(&self) -> anyhow::Result; } #[derive(Serialize, Debug)] struct FullMetrics { hostname: String, os: String, kernel: String, cpu: Cpu, ram: Ram, disks: Disks, networks: Ifaces, pub processes: Vec, } impl FullMetrics { fn create( host: HostInfo, cpu: Cpu, ram: Ram, disks: Disks, ifaces: Ifaces, processes: Vec, ) -> Self { Self { hostname: host.hostname, os: host.os, kernel: host.kernel, cpu, ram, disks, networks: ifaces, processes, } } } impl MetricsExportable for FullMetrics { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } #[derive(Debug, Serialize)] struct HostInfo { hostname: String, os: String, kernel: String, } impl MetricsExportable for HostInfo { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } #[derive(Debug, Serialize)] struct HostGeneral { hostname: String, os: String, kernel: String, cpu_percentage: f32, ram_available: u64, disk_percentage: u64, net_stat: u64, } impl MetricsExportable for HostGeneral { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } #[derive(Serialize, Debug)] struct Cpu { global_usage: f32, usage: CoreUsage, } impl MetricsExportable for Cpu { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } #[derive(Serialize, Debug)] struct CoreInfo { name: String, brand: String, frequency: u64, vendor_id: String, usage: f32, } #[derive(Serialize, Debug)] struct Ram { free_mem: u64, free_swap: u64, total_mem: u64, total_swap: u64, } impl MetricsExportable for Ram { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } #[derive(Serialize, Debug)] struct Disk { name: String, kind: String, fs: String, mount_point: String, total_space: u64, available_space: u64, is_removable: bool, is_readonly: bool, } impl MetricsExportable for Disks { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } // vec #[derive(Serialize, Debug)] struct Network { iname: String, mac: String, ip_addresses: Vec, recieved: u64, transmitted: u64, total_recieved_bytes: u64, total_transmitted_bytes: u64, total_recieved_packets: u64, total_transmitted_packets: u64, errors_on_recieved: u64, errors_on_transmitted: u64, } impl MetricsExportable for Ifaces { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } #[derive(Serialize, Debug)] pub struct ProcessExtended { name: String, status: ProcessState, pid: Pid, start_time: String, duration: String, dependencies: processes::deps::Dependencies, cpu_usage: f32, ram_usage: u64, virtual_mem_usage: u64, disks_usage_read_bytes: u64, disks_usage_write_bytes: u64, } impl ProcessExtended { pub fn from_process_query_all(system: &mut System, proc: processes::ProcessesAll) -> Self { system.refresh_processes(sysinfo::ProcessesToUpdate::All, true); return if let Some(prc) = system.process(proc.pid.new_sysinfo_pid()) { let disk_usage = prc.disk_usage(); let duration = chrono::Duration::new(prc.run_time() as i64, 0); let start_time = chrono::DateTime::from_timestamp(prc.start_time() as i64, 0); Self { name: proc.name, status: proc.state, pid: proc.pid, start_time : { match start_time { Some(date) => date.to_string(), None => String::new() } }, duration: { match duration { Some(duration) => { format!("{}:{}:{}:{}", duration.num_days(), duration.num_hours() % 24, duration.num_minutes() % 60, duration.num_seconds() % 60 ) }, None => String::new() } }, dependencies: proc.dependencies, cpu_usage: prc.cpu_usage(), ram_usage: prc.memory(), virtual_mem_usage: prc.virtual_memory(), disks_usage_read_bytes: disk_usage.read_bytes, disks_usage_write_bytes: disk_usage.written_bytes, } } else { Self { name: proc.name, status: proc.state, pid: proc.pid, start_time : String::new(), duration: String::new(), dependencies: proc.dependencies, cpu_usage: 0.0, ram_usage: 0, virtual_mem_usage: 0, disks_usage_read_bytes: 0, disks_usage_write_bytes: 0, } }; } } impl MetricsExportable for MetricProcesses { fn serialze_into_output(&self) -> anyhow::Result { Ok(serde_json::to_string_pretty(self)?) } } #[cfg(test)] mod metrics_unittets { #[tokio::test] // Option output // echo func async fn get_cpu_load() { assert!(true); } #[tokio::test] // Option output // echo func async fn get_ram_load() { assert!(true); } #[tokio::test] // Option output // echo func async fn get_mem_load() { assert!(true); } #[tokio::test] // can't be tested this way because of 0-0 loop of checking buffer // async func with loop inside async fn get_net_stat() { assert!(true); } #[tokio::test] async fn get_metrics_grubber() { assert!(true); } // Option> output // echo func #[tokio::test] async fn get_info_about_subsystems() { assert!(true); } }