monitor/noxis-rs/src/utils/metrics.rs

582 lines
18 KiB
Rust

///! Submodule needed to get metrics such as
///! cpu load, ram/rom load and net activity
use crate::{
options::structs::ProcessState,
utils::metrics::processes::{ProcessesGeneral, ProcessesQuery},
};
use log::warn;
use noxis_cli::metrics_models::MetricsMode;
use std::{any::Any, collections::BTreeMap, sync::Arc};
// use chrono::Duration;
use super::prcs::v2::Pid;
use crate::options::structs::bus::{BusMessage, BusMessageContentType, BusMessageDirection};
use serde::Serialize;
use std::fmt::Debug;
use sysinfo::{Disks as DisksList, Networks, System};
// use noxis_cli::metrics_models::MetricsMode;
pub type MetricProcesses = Vec<ProcessExtended>;
type CoreUsage = BTreeMap<usize, CoreInfo>;
type Disks = Vec<Disk>;
type Ifaces = Vec<Network>;
type BusReciever = tokio::sync::mpsc::Receiver<BusMessage>;
type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
/// # Fn `init_metrics_grubber`
/// ## for initializing process of unstoppable grubbing metrics.
///
/// *input* : `Arc<Mutex<UnixSocket>>` ??
///
/// *output* : `Err` if it cant create grubbers | `Ok` on finish
///
/// *initiator* : main thread ??
///
/// *managing* : object of unix-socket reader
///
/// *depends on* : -
///
pub async fn init_metrics_grubber(
/* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */
bus_sender: BusSender,
bus_reciever: BusReciever,
) -> anyhow::Result<()> {
let mut system = System::new();
let mut disks = DisksList::new_with_refreshed_list();
let mut networks = Networks::new_with_refreshed_list();
// get_all_metrics(&mut system).await;
/* TODO */
let mut bus_reciever = bus_reciever;
loop {
let msg = bus_reciever.try_recv();
if let Ok(BusMessage::Request(_, _, cont)) = msg {
system.refresh_all();
disks.refresh_list();
networks.refresh_list();
let cont: Box<dyn Any + Send> = cont;
match cont.downcast::<MetricsMode>() {
Err(_) => {
warn!("Unrecognized Metric mode was given");
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::new(Err(anyhow::Error::msg(format!(
"Unrecognized Metric mode was given"
)))),
))
.await;
}
Ok(mode) => {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let metric: Box<dyn MetricsExportable> = match *mode {
MetricsMode::Full => {
let mut refs =
get_all_metrics(&mut system, bus_sender.clone(), &disks, &networks)
.await;
if let Some(prcs) = bus_reciever.recv().await {
if let BusMessage::Response(_, _, cont) = prcs {
let cont: Box<dyn Any> = cont;
if let Ok(cont) = cont.downcast::<ProcessesQuery>() {
if let ProcessesQuery::General(info) = *cont {
refs.processes = info;
}
}
}
}
Box::new(refs)
}
MetricsMode::Host => {
Box::new(get_global_host_info(&mut system, &disks, &networks).await)
}
MetricsMode::Cpu => Box::new(get_cpu_metrics(&mut system).await),
MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await),
MetricsMode::Rom => Box::new(get_all_disks_metrics(&disks).await),
MetricsMode::Network => Box::new(get_all_ifaces_metrics(&networks).await),
// inspect processes
MetricsMode::Processes => {
todo!();
}
};
// let metric: Box<dyn BusContent> = Box::new(metric);
let metric = metric.serialze_into_output();
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::MetricsObj,
Box::new(metric),
))
.await;
}
}
} else if let Ok(BusMessage::Response(_, _, cont)) = msg {
let cont: Box<dyn Any + Send> = cont;
if let Ok(info) = cont.downcast::<ProcessesQuery>() {
if let ProcessesQuery::All(info) = *info {
let procs: Vec<_> = info
.into_iter()
.map(|prc| ProcessExtended::from_process_query_all(&mut system, prc))
.collect();
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::<anyhow::Result<String>>::new(Ok(serde_json::to_string_pretty(
&procs,
)?)),
))
.await;
} else {
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::new(Err(anyhow::Error::msg(format!(
"Unknown type was send by the Supervisor"
)))),
))
.await;
}
} else {
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::new(Err(anyhow::Error::msg(format!(
"Unknown type was send by the Supervisor"
)))),
))
.await;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
async fn get_all_metrics(
system: &mut System,
sender: BusSender,
disks: &DisksList,
networks: &Networks,
) -> FullMetrics {
let host = get_host_info().await;
let cpu = get_cpu_metrics(system).await;
let ram = get_ram_metrics(system).await;
let disks = get_all_disks_metrics(&disks).await;
let ifaces = get_all_ifaces_metrics(&networks).await;
let prcs: Vec<ProcessesGeneral> = Vec::new();
let _ = sender
.send(BusMessage::Request(
BusMessageDirection::ToSupervisor,
BusMessageContentType::ProcessQuery,
Box::new(ProcessesQuery::QueryGeneral),
))
.await;
FullMetrics::create(host, cpu, ram, disks, ifaces, prcs)
}
async fn get_global_host_info(
system: &mut System,
disks: &DisksList,
networks: &Networks,
) -> HostGeneral {
HostGeneral {
hostname: System::host_name().unwrap_or_default(),
os: System::long_os_version().unwrap_or_default(),
kernel: System::kernel_version().unwrap_or_default(),
cpu_percentage: system.global_cpu_usage(),
ram_available: system.total_memory() - system.free_memory(),
disk_percentage: {
let total = disks
.iter()
.map(|disk| disk.available_space() * 100 / disk.total_space())
.collect::<Vec<u64>>();
total.iter().sum::<u64>() / (total.len() as u64)
},
net_stat: {
let total = networks
.iter()
.map(|(_, iface_data)| iface_data.received() + iface_data.transmitted())
.collect::<Vec<u64>>();
total.iter().sum::<u64>() / ((total.len() * 2) as u64)
},
}
}
async fn get_host_info() -> HostInfo {
HostInfo {
hostname: System::host_name().unwrap_or_default(),
os: System::long_os_version().unwrap_or_default(),
kernel: System::kernel_version().unwrap_or_default(),
}
}
async fn get_cpu_metrics(system: &mut System) -> Cpu {
let mut buffer = CoreUsage::new();
let global_usage = system.global_cpu_usage();
system.cpus().iter().enumerate().for_each(|(id, cpu)| {
let core_info = CoreInfo {
// id,
brand: cpu.brand().to_string(),
name: cpu.name().to_string(),
frequency: cpu.frequency(),
vendor_id: cpu.vendor_id().to_string(),
usage: cpu.cpu_usage(),
};
buffer.entry(id).or_insert(core_info);
});
Cpu {
global_usage,
usage: buffer,
}
}
async fn get_ram_metrics(system: &mut System) -> Ram {
Ram {
free_mem: system.free_memory(),
free_swap: system.free_swap(),
total_mem: system.total_memory(),
total_swap: system.total_swap(),
}
}
async fn get_all_disks_metrics(disks: &DisksList) -> Disks {
// let disks = DisksList::new_with_refreshed_list();
let mut buffer = Disks::new();
disks.list().iter().for_each(|disk| {
let disk = Disk {
name: disk.name().to_string_lossy().into_owned(),
kind: disk.kind().to_string(),
fs: disk.file_system().to_string_lossy().into_owned(),
mount_point: disk.mount_point().to_string_lossy().into_owned(),
total_space: disk.total_space(),
available_space: disk.available_space(),
is_removable: disk.is_removable(),
is_readonly: disk.is_read_only(),
};
buffer.push(disk);
});
buffer
}
async fn get_all_ifaces_metrics(networks: &Networks) -> Ifaces {
let mut ifaces = Ifaces::new();
networks.iter().for_each(|(iface_name, data)| {
let mac = data.mac_address().to_string();
let ip_addrs = data
.ip_networks()
.iter()
.map(|ipaddr| format!("{}/{}", ipaddr.addr, ipaddr.prefix))
.collect::<Vec<String>>();
let iface = Network {
iname: iface_name.to_owned(),
mac: mac,
ip_addresses: ip_addrs,
recieved: data.received(),
transmitted: data.transmitted(),
total_recieved_bytes: data.total_received(),
total_transmitted_bytes: data.total_transmitted(),
total_recieved_packets: data.total_packets_received(),
total_transmitted_packets: data.total_packets_transmitted(),
errors_on_recieved: data.errors_on_received(),
errors_on_transmitted: data.errors_on_transmitted(),
};
ifaces.push(iface);
});
ifaces
}
pub mod processes {
use crate::options::structs::ProcessState;
use crate::utils::prcs::v2::Pid;
#[derive(Debug, serde::Serialize)]
pub enum ProcessesQuery {
General(Vec<ProcessesGeneral>),
All(Vec<ProcessesAll>),
QueryGeneral,
QueryAll,
}
#[derive(Debug, serde::Serialize)]
pub struct ProcessesGeneral {
pub name: String,
pub state: ProcessState,
pub pid: Pid,
}
#[derive(Debug, serde::Serialize)]
pub struct ProcessesAll {
pub name: String,
pub state: ProcessState,
pub pid: Pid,
pub dependencies: deps::Dependencies,
}
pub mod deps {
use crate::options::structs::{FileTriggers, ServiceState, ServiceTriggers};
use crate::utils::files::v2::FileState;
// use super::*;
#[derive(Debug, serde::Serialize)]
pub struct FilesExtended {
pub name: String,
pub path: String,
pub status: FileState,
pub triggers: FileTriggers,
}
#[derive(Debug, serde::Serialize)]
pub struct ServicesExtended {
pub name: String,
pub access_name: String,
pub status: ServiceState,
pub triggers: ServiceTriggers,
}
#[derive(Debug, serde::Serialize)]
pub struct Dependencies {
pub files: Vec<FilesExtended>,
pub services: Vec<ServicesExtended>,
}
}
}
pub trait MetricsExportable: Send + Sync + 'static + Debug + Any {
fn serialze_into_output(&self) -> anyhow::Result<String>;
}
#[derive(Serialize, Debug)]
struct FullMetrics {
hostname: String,
os: String,
kernel: String,
cpu: Cpu,
ram: Ram,
disks: Disks,
networks: Ifaces,
pub processes: Vec<ProcessesGeneral>,
}
impl FullMetrics {
fn create(
host: HostInfo,
cpu: Cpu,
ram: Ram,
disks: Disks,
ifaces: Ifaces,
processes: Vec<ProcessesGeneral>,
) -> Self {
Self {
hostname: host.hostname,
os: host.os,
kernel: host.kernel,
cpu,
ram,
disks,
networks: ifaces,
processes,
}
}
}
impl MetricsExportable for FullMetrics {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Debug, Serialize)]
struct HostInfo {
hostname: String,
os: String,
kernel: String,
}
impl MetricsExportable for HostInfo {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Debug, Serialize)]
struct HostGeneral {
hostname: String,
os: String,
kernel: String,
cpu_percentage: f32,
ram_available: u64,
disk_percentage: u64,
net_stat: u64,
}
impl MetricsExportable for HostGeneral {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Serialize, Debug)]
struct Cpu {
global_usage: f32,
usage: CoreUsage,
}
impl MetricsExportable for Cpu {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Serialize, Debug)]
struct CoreInfo {
name: String,
brand: String,
frequency: u64,
vendor_id: String,
usage: f32,
}
#[derive(Serialize, Debug)]
struct Ram {
free_mem: u64,
free_swap: u64,
total_mem: u64,
total_swap: u64,
}
impl MetricsExportable for Ram {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Serialize, Debug)]
struct Disk {
name: String,
kind: String,
fs: String,
mount_point: String,
total_space: u64,
available_space: u64,
is_removable: bool,
is_readonly: bool,
}
impl MetricsExportable for Disks {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
// vec<Network>
#[derive(Serialize, Debug)]
struct Network {
iname: String,
mac: String,
ip_addresses: Vec<String>,
recieved: u64,
transmitted: u64,
total_recieved_bytes: u64,
total_transmitted_bytes: u64,
total_recieved_packets: u64,
total_transmitted_packets: u64,
errors_on_recieved: u64,
errors_on_transmitted: u64,
}
impl MetricsExportable for Ifaces {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Serialize, Debug)]
pub struct ProcessExtended {
name: String,
status: ProcessState,
pid: Pid,
dependencies: processes::deps::Dependencies,
cpu_usage: f32,
ram_usage: u64,
virtual_mem_usage: u64,
disks_usage_read_bytes: u64,
disks_usage_write_bytes: u64,
}
impl ProcessExtended {
pub fn from_process_query_all(system: &mut System, proc: processes::ProcessesAll) -> Self {
system.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
return if let Some(prc) = system.process(proc.pid.new_sysinfo_pid()) {
let disk_usage = prc.disk_usage();
Self {
name: proc.name,
status: proc.state,
pid: proc.pid,
dependencies: proc.dependencies,
cpu_usage: prc.cpu_usage(),
ram_usage: prc.memory(),
virtual_mem_usage: prc.virtual_memory(),
disks_usage_read_bytes: disk_usage.read_bytes,
disks_usage_write_bytes: disk_usage.written_bytes,
}
} else {
Self {
name: proc.name,
status: proc.state,
pid: proc.pid,
dependencies: proc.dependencies,
cpu_usage: 0.0,
ram_usage: 0,
virtual_mem_usage: 0,
disks_usage_read_bytes: 0,
disks_usage_write_bytes: 0,
}
};
}
}
impl MetricsExportable for MetricProcesses {
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[cfg(test)]
mod metrics_unittets {
#[tokio::test]
// Option<String> output
// echo func
async fn get_cpu_load() {
assert!(true);
}
#[tokio::test]
// Option<String> output
// echo func
async fn get_ram_load() {
assert!(true);
}
#[tokio::test]
// Option<String> output
// echo func
async fn get_mem_load() {
assert!(true);
}
#[tokio::test]
// can't be tested this way because of 0-0 loop of checking buffer
// async func with loop inside
async fn get_net_stat() {
assert!(true);
}
#[tokio::test]
async fn get_metrics_grubber() {
assert!(true);
}
// Option<Vec<TrackingProcess OR String>> output
// echo func
#[tokio::test]
async fn get_info_about_subsystems() {
assert!(true);
}
}