metrics added (without processes)

migrate
prplV 2025-05-13 16:20:09 +03:00
parent 4b9db02528
commit 6ff17d9620
4 changed files with 232 additions and 188 deletions

View File

@ -16,9 +16,13 @@ use options::preboot::PrebootParams;
use tokio::sync::{broadcast, oneshot};
use options::config::v2::init_config_mechanism;
use utils::v2::init_monitoring;
use metrics::init_metrics_grubber;
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()>{
init_metrics_grubber().await;
todo!();
let preboot = Arc::new(PrebootParams::validate());
let _ = setup_logger();

View File

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug)]
pub enum DependencyType {
File,

View File

@ -2,18 +2,136 @@
// cpu load, ram/rom load and net activity
// use std::sync::Mutex;
use std::sync::Arc;
use crate::options::structs::TrackingProcess;
use sysinfo::{Process, System};
use tokio::join;
use crate::options::structs::{ProcessMetrics, ContainerMetrics};
use super::get_container_id;
use std::{collections::BTreeMap, net, sync::Arc};
use crate::options::structs::{ProcessState, TrackingProcess};
use sysinfo::{System, Disks as DisksList, Networks};
use crate::options::structs::Dependencies;
use serde::Serialize;
use super::prcs::v2::Pid;
// use pcap::{Device, Capture, Active};
// use std::net::Ipv4Addr;
// use anyhow::{Result, Ok};
// type PacketBuffer = Arc<Mutex<Vec<PacketInfo>>>;
type CoreUsage = BTreeMap<usize, CoreInfo>;
type Disks = Vec<Disk>;
type Ifaces = Vec<Network>;
pub type MetricProcesses = Vec<ProcessesExtended>;
#[derive(Serialize, Debug)]
struct FullMetrics {
hostname : String,
os : String,
kernel : String,
cpu : Cpu,
ram : Ram,
disks : Disks,
networks : Ifaces,
processes : MetricProcesses,
}
#[derive(Debug)]
struct HostInfo {
hostname : String,
os : String,
kernel : String,
}
#[derive(Serialize, Debug)]
struct Cpu {
global_usage : f32,
usage : CoreUsage,
}
#[derive(Serialize, Debug)]
struct CoreInfo {
name: String,
brand : String,
frequency : u64,
vendor_id : String,
usage : f32,
}
#[derive(Serialize, Debug)]
struct Ram {
free_mem : u64,
free_swap : u64,
total_mem : u64,
total_swap : u64
}
#[derive(Serialize, Debug)]
struct Disk {
name : String,
kind : String,
fs : String,
mount_point : String,
total_space : u64,
available_space : u64,
is_removable : bool,
is_readonly : bool,
}
// vec<Network>
#[derive(Serialize, Debug)]
struct Network {
iname : String,
mac : String,
recieved : u64,
transmitted : u64,
total_recieved_bytes : u64,
total_transmitted_bytes : u64,
total_recieved_packets : u64,
total_transmitted_packets : u64,
errors_on_recieved : u64,
errors_on_transmitted : u64,
}
#[derive(Serialize, Debug)]
pub struct ProcessesExtended {
name : String,
status : String,
pid : Pid,
dependencies : Dependencies,
cpu_usage : f32,
ram_usage : f32,
virtual_mem_usage : u64,
disks_usage_read_bytes: u64,
disks_usage_write_bytes: u64,
}
impl ProcessesExtended {
pub fn from_old_with_params(
old : Arc<TrackingProcess>,
pid : Pid,
status : ProcessState,
) -> Self {
Self {
name : old.name.clone(),
status : status.to_string(),
pid,
dependencies : old.dependencies.clone(),
cpu_usage : 0.0,
ram_usage : 0.0,
virtual_mem_usage : 0,
disks_usage_read_bytes: 0,
disks_usage_write_bytes: 0,
}
}
fn add_metrics(&mut self, system : &mut System) {
if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) {
self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32;
self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32);
self.disks_usage_read_bytes = prc.disk_usage().total_read_bytes;
self.disks_usage_write_bytes = prc.disk_usage().total_written_bytes;
self.virtual_mem_usage = prc.virtual_memory();
}
}
}
/// # Fn `init_metrics_grubber`
/// ## for initializing process of unstoppable grubbing metrics.
///
@ -28,204 +146,117 @@ use super::get_container_id;
/// *depends on* : -
///
#[allow(dead_code)]
pub async fn init_metrics_grubber() {
pub async fn init_metrics_grubber(
/* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */
) {
let mut system = System::new();
// let mut buffer: Vec<PacketInfo> = vec![];
// let shared_buf: PacketBuffer = Arc::new(Mutex::new(buffer));
get_all_metrics(&mut system).await;
}
async fn get_all_metrics(system: &mut System) {
system.refresh_all();
// let temp = String::from_utf8(get_pid("systemd").await.unwrap().stdout).unwrap();
// let prc = system.process(Pid::from_str(&temp).unwrap()).unwrap();
// prc.
// let _ = capture_packets(shared_buf.clone()).await;
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
dbg!(get_host_info().await);
dbg!(get_cpu_metrics(system).await);
dbg!(get_ram_metrics(system).await);
dbg!(get_all_disks_metrics().await);
dbg!(get_all_ifaces_metrics().await);
}
#[allow(dead_code)]
#[allow(unused_variables)]
async fn gather_metrics(proc: Arc<Process>) {
async fn get_host_info() -> HostInfo {
HostInfo {
hostname : System::host_name().unwrap_or_default(),
os : System::long_os_version().unwrap_or_default(),
kernel : System::kernel_version().unwrap_or_default(),
}
}
// DEPRECATED : for net monitoring
// async fn capture_packets(buffer: PacketBuffer) -> Result<()> {
// let mut cap = Capture::from_device(Device::lookup()?.unwrap())?
// .promisc(true)
// .open()?;
async fn get_cpu_metrics(system: &mut System) -> Cpu {
system.refresh_cpu_all();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
// cap.filter("not broadcast and not multicast", true)?;
let mut buffer = CoreUsage::new();
let global_usage = system.global_cpu_usage();
// while let core::result::Result::Ok(packet) = cap.next_packet() {
// if let Some((src, dst, prot)) = get_packet_info(&packet.data).await {
// let packet_info = PacketInfo::new(String::from(prot), dst, src, packet.header.len as usize);
// let mut locked_buffer = buffer.lock().unwrap();
// println!("{:?}", &packet_info);
// locked_buffer.push(packet_info);
// }
// }
// Ok(())
// }
// async fn get_packet_info(data: &[u8]) -> Option<(Ipv4Addr, Ipv4Addr, &str)> {
// if data.len() >= 20 {
// let src_ip = Ipv4Addr::new(data[12], data[13], data[14], data[15]);
// let dst_ip = Ipv4Addr::new(data[16], data[17], data[18], data[19]);
// let protocol = match data[9] {
// 1 => "ICMP",
// 6 => "TCP",
// 17 => "UDP",
// _ => "Unknown",
// };
// Some((src_ip, dst_ip, protocol))
// } else {
// None
// }
// }
system.cpus()
.iter()
.enumerate()
.for_each(|(id, cpu)| {
let core_info = CoreInfo {
// id,
brand : cpu.brand().to_string(),
name : cpu.name().to_string(),
frequency : cpu.frequency(),
vendor_id : cpu.vendor_id().to_string(),
usage : cpu.cpu_usage(),
};
// buffer.push(core_info);
buffer.entry(id).or_insert(core_info);
});
/// # Fn `get_all_container_metrics`
/// ## for gathering all container (whole system metrics)
///
/// *input* : `Arc<System>`, `Arc<Vec<TrackingProcess>>`
///
/// *output* : `ContainerMetrics`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `System` object, ref counter to list of processes
///
/// *depends on* : `TrackingProcess`
///
#[allow(dead_code)]
async fn get_all_container_metrics(sys: Arc<System>, prcs: Arc<Vec<TrackingProcess>>) -> ContainerMetrics {
let metrics = join!(
get_cpu_metrics_container(sys.clone()),
get_ram_metrics_container(sys.clone()),
get_subsystems(prcs.clone())
);
ContainerMetrics::new(
&get_container_id().unwrap_or(String::from("unknown")),
metrics.0,
metrics.1,
metrics.2
)
Cpu {
global_usage,
usage: buffer
}
}
/// # Fn `get_cpu_metrics_container`
/// ## for gathering container cpu metrics
///
/// *input* : `Arc<System>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `System` object
///
/// *depends on* : -
///
#[allow(dead_code)]
async fn get_cpu_metrics_container(sys: Arc<System>) -> f32 {
sys.global_cpu_usage()
async fn get_ram_metrics(system: &mut System) -> Ram {
system.refresh_memory();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
Ram {
free_mem : system.free_memory(),
free_swap : system.free_swap(),
total_mem : system.total_memory(),
total_swap : system.total_swap(),
}
}
/// # Fn `get_ram_metrics_container`
/// ## for gathering container ram metrics
///
/// *input* : `Arc<System>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `System` object
///
/// *depends on* : -
///
#[allow(dead_code)]
async fn get_ram_metrics_container(sys: Arc<System>) -> f32 {
(sys.used_memory() / sys.total_memory()) as f32 * 100.0
}
// async fn get_mem_metrics_container(sys: Arc<System>) -> f32 {
// sys.
// }
/// # Fn `get_subsystems`
/// ## for gathering info about container subsystems (processes)
///
/// *input* : `Arc<Vec<TrackingProcess>>`
///
/// *output* : `Vec<String>`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to list of `TrackingProcess`
///
/// *depends on* : `TrackingProcess`
///
#[allow(dead_code)]
async fn get_subsystems(prcs: Arc<Vec<TrackingProcess>>) -> Vec<String> {
prcs.iter().map(|process| process.name.clone()).collect()
async fn get_all_disks_metrics() -> Disks {
let disks = DisksList::new_with_refreshed_list();
let mut buffer = Disks::new();
disks.list()
.iter()
.for_each(|disk| {
let disk = Disk {
name : disk.name().to_string_lossy().into_owned(),
kind: disk.kind().to_string(),
fs : disk.file_system().to_string_lossy().into_owned(),
mount_point : disk.mount_point().to_string_lossy().into_owned(),
total_space : disk.total_space(),
available_space : disk.available_space(),
is_removable : disk.is_removable(),
is_readonly : disk.is_read_only()
};
buffer.push(disk);
});
buffer
}
/// # Fn `get_all_metrics_process`
/// ## for gathering all process' metrics
///
/// *input* : `Arc<Process>`, `Arc<System>`
///
/// *output* : `ProcessMetrics`
///
/// *initiator* : main thread ??
///
/// *managing* : two ref counters to `Process` and `System`
///
/// *depends on* : -
///
#[allow(dead_code)]
async fn get_all_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> ProcessMetrics {
let metrics = join!(
get_cpu_metrics_process(proc.clone()),
get_ram_metrics_process(proc.clone(), sys.clone())
);
ProcessMetrics::new(
proc.name().to_str().unwrap_or("unknown"),
metrics.0,
metrics.1
)
async fn get_all_ifaces_metrics() -> Ifaces {
let mut ifaces = Ifaces::new();
let networks = Networks::new_with_refreshed_list();
networks.iter()
.for_each(|(iface_name, data)| {
let mac = data.mac_address().to_string();
let iface = Network {
iname : iface_name.to_owned(),
mac : mac,
recieved : data.received(),
transmitted : data.transmitted(),
total_recieved_bytes : data.total_received(),
total_transmitted_bytes : data.total_transmitted(),
total_recieved_packets : data.total_packets_received(),
total_transmitted_packets : data.total_packets_transmitted(),
errors_on_recieved : data.errors_on_received(),
errors_on_transmitted : data.errors_on_transmitted(),
};
ifaces.push(iface);
});
ifaces
}
/// # Fn `get_cpu_metrics_process`
/// ## for gathering process cpu metrics
///
/// *input* : `Arc<Process>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `Process` object
///
/// *depends on* : -
///
async fn get_cpu_metrics_process(proc: Arc<Process>) -> f32 {
proc.cpu_usage()
}
/// # Fn `get_ram_metrics_process`
/// ## for gathering process ram metrics
///
/// *input* : `Arc<Process>`
///
/// *output* : `f32`
///
/// *initiator* : main thread ??
///
/// *managing* : ref counter to `Process` object
///
/// *depends on* : -
///
async fn get_ram_metrics_process(proc: Arc<Process>, sys: Arc<System>) -> f32 {
(proc.memory() as f64 / sys.total_memory() as f64) as f32 * 100.0 as f32
}
async fn get_all_processes_metrics(system: &mut System) {}
#[cfg(test)]
mod metrics_unittets {

View File

@ -6,6 +6,7 @@ use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUni
use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever;
use async_trait::async_trait;
use serde::Serialize;
pub mod v2 {
use log::info;
@ -14,8 +15,8 @@ pub mod v2 {
use super::*;
#[derive(Debug)]
struct Pid(i64);
#[derive(Debug, Serialize, Clone, Copy)]
pub struct Pid(i64);
impl std::fmt::Display for Pid {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@ -36,6 +37,9 @@ pub mod v2 {
};
Pid(temp.parse::<i64>().unwrap_or_else(|_| { -1 }))
}
pub fn new_sysinfo_pid(&self) -> sysinfo::Pid {
sysinfo::Pid::from_u32(self.0 as u32)
}
}
#[derive(Debug)]
@ -71,6 +75,10 @@ pub mod v2 {
self
}
pub fn get_pid(&self) -> Pid {
self.pid
}
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
match trigger {
"stay" => {