diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 4cc69f4..9a1d0da 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -16,9 +16,13 @@ use options::preboot::PrebootParams; use tokio::sync::{broadcast, oneshot}; use options::config::v2::init_config_mechanism; use utils::v2::init_monitoring; +use metrics::init_metrics_grubber; #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ + init_metrics_grubber().await; + todo!(); + let preboot = Arc::new(PrebootParams::validate()); let _ = setup_logger(); diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 62e6e3e..3423d25 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use async_trait::async_trait; use std::sync::Arc; + #[derive(Debug)] pub enum DependencyType { File, diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index 756e44b..47b733f 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -2,18 +2,136 @@ // cpu load, ram/rom load and net activity // use std::sync::Mutex; -use std::sync::Arc; -use crate::options::structs::TrackingProcess; -use sysinfo::{Process, System}; -use tokio::join; -use crate::options::structs::{ProcessMetrics, ContainerMetrics}; -use super::get_container_id; +use std::{collections::BTreeMap, net, sync::Arc}; +use crate::options::structs::{ProcessState, TrackingProcess}; +use sysinfo::{System, Disks as DisksList, Networks}; +use crate::options::structs::Dependencies; +use serde::Serialize; +use super::prcs::v2::Pid; // use pcap::{Device, Capture, Active}; // use std::net::Ipv4Addr; // use anyhow::{Result, Ok}; // type PacketBuffer = Arc>>; +type CoreUsage = BTreeMap; +type Disks = Vec; +type Ifaces = Vec; +pub type MetricProcesses = Vec; + + +#[derive(Serialize, Debug)] +struct FullMetrics { + hostname : String, + os : String, + kernel : String, + cpu : Cpu, + ram : Ram, + disks : Disks, + networks : Ifaces, + processes : MetricProcesses, +} + +#[derive(Debug)] +struct HostInfo { + hostname : String, + os : String, + kernel : String, +} + + +#[derive(Serialize, Debug)] +struct Cpu { + global_usage : f32, + usage : CoreUsage, +} + +#[derive(Serialize, Debug)] +struct CoreInfo { + name: String, + brand : String, + frequency : u64, + vendor_id : String, + usage : f32, +} + +#[derive(Serialize, Debug)] +struct Ram { + free_mem : u64, + free_swap : u64, + total_mem : u64, + total_swap : u64 +} + +#[derive(Serialize, Debug)] +struct Disk { + name : String, + kind : String, + fs : String, + mount_point : String, + total_space : u64, + available_space : u64, + is_removable : bool, + is_readonly : bool, +} + + // vec +#[derive(Serialize, Debug)] +struct Network { + iname : String, + mac : String, + recieved : u64, + transmitted : u64, + total_recieved_bytes : u64, + total_transmitted_bytes : u64, + total_recieved_packets : u64, + total_transmitted_packets : u64, + errors_on_recieved : u64, + errors_on_transmitted : u64, +} + +#[derive(Serialize, Debug)] +pub struct ProcessesExtended { + name : String, + status : String, + pid : Pid, + dependencies : Dependencies, + cpu_usage : f32, + ram_usage : f32, + virtual_mem_usage : u64, + disks_usage_read_bytes: u64, + disks_usage_write_bytes: u64, +} + +impl ProcessesExtended { + pub fn from_old_with_params( + old : Arc, + pid : Pid, + status : ProcessState, + ) -> Self { + Self { + name : old.name.clone(), + status : status.to_string(), + pid, + dependencies : old.dependencies.clone(), + cpu_usage : 0.0, + ram_usage : 0.0, + virtual_mem_usage : 0, + disks_usage_read_bytes: 0, + disks_usage_write_bytes: 0, + } + } + fn add_metrics(&mut self, system : &mut System) { + if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) { + self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32; + self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32); + self.disks_usage_read_bytes = prc.disk_usage().total_read_bytes; + self.disks_usage_write_bytes = prc.disk_usage().total_written_bytes; + self.virtual_mem_usage = prc.virtual_memory(); + } + } +} + /// # Fn `init_metrics_grubber` /// ## for initializing process of unstoppable grubbing metrics. /// @@ -28,204 +146,117 @@ use super::get_container_id; /// *depends on* : - /// #[allow(dead_code)] -pub async fn init_metrics_grubber() { +pub async fn init_metrics_grubber( + /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ +) { let mut system = System::new(); - // let mut buffer: Vec = vec![]; - // let shared_buf: PacketBuffer = Arc::new(Mutex::new(buffer)); + get_all_metrics(&mut system).await; +} +async fn get_all_metrics(system: &mut System) { system.refresh_all(); - // let temp = String::from_utf8(get_pid("systemd").await.unwrap().stdout).unwrap(); - // let prc = system.process(Pid::from_str(&temp).unwrap()).unwrap(); - // prc. - // let _ = capture_packets(shared_buf.clone()).await; + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + dbg!(get_host_info().await); + dbg!(get_cpu_metrics(system).await); + dbg!(get_ram_metrics(system).await); + dbg!(get_all_disks_metrics().await); + dbg!(get_all_ifaces_metrics().await); } -#[allow(dead_code)] -#[allow(unused_variables)] -async fn gather_metrics(proc: Arc) { - +async fn get_host_info() -> HostInfo { + HostInfo { + hostname : System::host_name().unwrap_or_default(), + os : System::long_os_version().unwrap_or_default(), + kernel : System::kernel_version().unwrap_or_default(), + } } -// DEPRECATED : for net monitoring -// async fn capture_packets(buffer: PacketBuffer) -> Result<()> { -// let mut cap = Capture::from_device(Device::lookup()?.unwrap())? -// .promisc(true) -// .open()?; +async fn get_cpu_metrics(system: &mut System) -> Cpu { + system.refresh_cpu_all(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; -// cap.filter("not broadcast and not multicast", true)?; + let mut buffer = CoreUsage::new(); + let global_usage = system.global_cpu_usage(); -// while let core::result::Result::Ok(packet) = cap.next_packet() { -// if let Some((src, dst, prot)) = get_packet_info(&packet.data).await { -// let packet_info = PacketInfo::new(String::from(prot), dst, src, packet.header.len as usize); -// let mut locked_buffer = buffer.lock().unwrap(); -// println!("{:?}", &packet_info); -// locked_buffer.push(packet_info); -// } -// } -// Ok(()) -// } -// async fn get_packet_info(data: &[u8]) -> Option<(Ipv4Addr, Ipv4Addr, &str)> { -// if data.len() >= 20 { -// let src_ip = Ipv4Addr::new(data[12], data[13], data[14], data[15]); -// let dst_ip = Ipv4Addr::new(data[16], data[17], data[18], data[19]); -// let protocol = match data[9] { -// 1 => "ICMP", -// 6 => "TCP", -// 17 => "UDP", -// _ => "Unknown", -// }; - -// Some((src_ip, dst_ip, protocol)) -// } else { -// None -// } -// } + system.cpus() + .iter() + .enumerate() + .for_each(|(id, cpu)| { + let core_info = CoreInfo { + // id, + brand : cpu.brand().to_string(), + name : cpu.name().to_string(), + frequency : cpu.frequency(), + vendor_id : cpu.vendor_id().to_string(), + usage : cpu.cpu_usage(), + }; + // buffer.push(core_info); + buffer.entry(id).or_insert(core_info); + }); - -/// # Fn `get_all_container_metrics` -/// ## for gathering all container (whole system metrics) -/// -/// *input* : `Arc`, `Arc>` -/// -/// *output* : `ContainerMetrics` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object, ref counter to list of processes -/// -/// *depends on* : `TrackingProcess` -/// -#[allow(dead_code)] -async fn get_all_container_metrics(sys: Arc, prcs: Arc>) -> ContainerMetrics { - let metrics = join!( - get_cpu_metrics_container(sys.clone()), - get_ram_metrics_container(sys.clone()), - get_subsystems(prcs.clone()) - ); - ContainerMetrics::new( - &get_container_id().unwrap_or(String::from("unknown")), - metrics.0, - metrics.1, - metrics.2 - ) + Cpu { + global_usage, + usage: buffer + } } -/// # Fn `get_cpu_metrics_container` -/// ## for gathering container cpu metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_cpu_metrics_container(sys: Arc) -> f32 { - sys.global_cpu_usage() +async fn get_ram_metrics(system: &mut System) -> Ram { + system.refresh_memory(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + Ram { + free_mem : system.free_memory(), + free_swap : system.free_swap(), + total_mem : system.total_memory(), + total_swap : system.total_swap(), + } } -/// # Fn `get_ram_metrics_container` -/// ## for gathering container ram metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_ram_metrics_container(sys: Arc) -> f32 { - (sys.used_memory() / sys.total_memory()) as f32 * 100.0 -} -// async fn get_mem_metrics_container(sys: Arc) -> f32 { -// sys. -// } - -/// # Fn `get_subsystems` -/// ## for gathering info about container subsystems (processes) -/// -/// *input* : `Arc>` -/// -/// *output* : `Vec` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to list of `TrackingProcess` -/// -/// *depends on* : `TrackingProcess` -/// -#[allow(dead_code)] -async fn get_subsystems(prcs: Arc>) -> Vec { - prcs.iter().map(|process| process.name.clone()).collect() +async fn get_all_disks_metrics() -> Disks { + let disks = DisksList::new_with_refreshed_list(); + let mut buffer = Disks::new(); + disks.list() + .iter() + .for_each(|disk| { + let disk = Disk { + name : disk.name().to_string_lossy().into_owned(), + kind: disk.kind().to_string(), + fs : disk.file_system().to_string_lossy().into_owned(), + mount_point : disk.mount_point().to_string_lossy().into_owned(), + total_space : disk.total_space(), + available_space : disk.available_space(), + is_removable : disk.is_removable(), + is_readonly : disk.is_read_only() + }; + buffer.push(disk); + }); + buffer } -/// # Fn `get_all_metrics_process` -/// ## for gathering all process' metrics -/// -/// *input* : `Arc`, `Arc` -/// -/// *output* : `ProcessMetrics` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : two ref counters to `Process` and `System` -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_all_metrics_process(proc: Arc, sys: Arc) -> ProcessMetrics { - let metrics = join!( - get_cpu_metrics_process(proc.clone()), - get_ram_metrics_process(proc.clone(), sys.clone()) - ); - ProcessMetrics::new( - proc.name().to_str().unwrap_or("unknown"), - metrics.0, - metrics.1 - ) +async fn get_all_ifaces_metrics() -> Ifaces { + let mut ifaces = Ifaces::new(); + let networks = Networks::new_with_refreshed_list(); + networks.iter() + .for_each(|(iface_name, data)| { + let mac = data.mac_address().to_string(); + let iface = Network { + iname : iface_name.to_owned(), + mac : mac, + recieved : data.received(), + transmitted : data.transmitted(), + total_recieved_bytes : data.total_received(), + total_transmitted_bytes : data.total_transmitted(), + total_recieved_packets : data.total_packets_received(), + total_transmitted_packets : data.total_packets_transmitted(), + errors_on_recieved : data.errors_on_received(), + errors_on_transmitted : data.errors_on_transmitted(), + }; + ifaces.push(iface); + }); + ifaces } -/// # Fn `get_cpu_metrics_process` -/// ## for gathering process cpu metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `Process` object -/// -/// *depends on* : - -/// -async fn get_cpu_metrics_process(proc: Arc) -> f32 { - proc.cpu_usage() -} - -/// # Fn `get_ram_metrics_process` -/// ## for gathering process ram metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `Process` object -/// -/// *depends on* : - -/// -async fn get_ram_metrics_process(proc: Arc, sys: Arc) -> f32 { - (proc.memory() as f64 / sys.total_memory() as f64) as f32 * 100.0 as f32 -} +async fn get_all_processes_metrics(system: &mut System) {} #[cfg(test)] mod metrics_unittets { diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 9e72d4e..76a09f6 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -6,6 +6,7 @@ use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUni use std::collections::HashSet; use tokio::sync::mpsc::Receiver as MpscReciever; use async_trait::async_trait; +use serde::Serialize; pub mod v2 { use log::info; @@ -14,8 +15,8 @@ pub mod v2 { use super::*; - #[derive(Debug)] - struct Pid(i64); + #[derive(Debug, Serialize, Clone, Copy)] + pub struct Pid(i64); impl std::fmt::Display for Pid { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -36,6 +37,9 @@ pub mod v2 { }; Pid(temp.parse::().unwrap_or_else(|_| { -1 })) } + pub fn new_sysinfo_pid(&self) -> sysinfo::Pid { + sysinfo::Pid::from_u32(self.0 as u32) + } } #[derive(Debug)] @@ -71,6 +75,10 @@ pub mod v2 { self } + pub fn get_pid(&self) -> Pid { + self.pid + } + async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { match trigger { "stay" => {