processes done

migrate
prplV 2025-06-04 12:24:17 +03:00
parent 888fb41885
commit 1c1252ecfe
2 changed files with 103 additions and 33 deletions

View File

@ -154,7 +154,7 @@ pub mod v2 {
prc.dependencies prc.dependencies
.files .files
.iter() .iter()
.map(|file| (file, format!("{}{}", file.filename, file.src))) .map(|file| (file, format!("{}{}", file.src, file.filename)))
.for_each(|(file, code_name)| { .for_each(|(file, code_name)| {
if let Some(file_cont) = files_list if let Some(file_cont) = files_list
.iter() .iter()

View File

@ -3,7 +3,7 @@
use crate::{ use crate::{
options::structs::{ProcessState, TrackingProcess}, options::structs::{ProcessState, TrackingProcess},
utils::metrics::processes::{ProcessesGeneral, ProcessesQuery}, utils::metrics::processes::{ProcessesAll, ProcessesGeneral, ProcessesQuery},
}; };
use futures::lock::Mutex; use futures::lock::Mutex;
use log::warn; use log::warn;
@ -50,7 +50,8 @@ pub async fn init_metrics_grubber(
/* TODO */ /* TODO */
let mut bus_reciever = bus_reciever; let mut bus_reciever = bus_reciever;
loop { loop {
if let Ok(BusMessage::Request(_, _, cont)) = bus_reciever.try_recv() { let msg = bus_reciever.try_recv();
if let Ok(BusMessage::Request(_, _, cont)) = msg {
system.refresh_all(); system.refresh_all();
disks.refresh_list(); disks.refresh_list();
networks.refresh_list(); networks.refresh_list();
@ -94,9 +95,10 @@ pub async fn init_metrics_grubber(
MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await), MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await),
MetricsMode::Rom => Box::new(get_all_disks_metrics(&disks).await), MetricsMode::Rom => Box::new(get_all_disks_metrics(&disks).await),
MetricsMode::Network => Box::new(get_all_ifaces_metrics(&networks).await), MetricsMode::Network => Box::new(get_all_ifaces_metrics(&networks).await),
// MetricsMode::Processes => {}, // inspect processes
// TODO -> MetricsMode::Processes => {
_ => todo!(), todo!();
},
}; };
// let metric: Box<dyn BusContent> = Box::new(metric); // let metric: Box<dyn BusContent> = Box::new(metric);
let metric = metric.serialze_into_output(); let metric = metric.serialze_into_output();
@ -110,9 +112,58 @@ pub async fn init_metrics_grubber(
.await; .await;
} }
} }
} else if let Ok(BusMessage::Response(_, _, cont)) = msg {
dbg!(&cont);
let cont: Box<dyn Any + Send> = cont;
if let Ok(info) = cont.downcast::<ProcessesQuery>() {
if let ProcessesQuery::All(info) = *info {
let procs: Vec<_> = info
.into_iter()
.map(|prc| ProcessExtended::from_process_query_all(&mut system, prc))
.collect();
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::<anyhow::Result<String>>::new(
Ok(serde_json::to_string_pretty(&procs)?)
)
),
)
.await;
} else {
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::new(
Err(
anyhow::Error::msg(format!(
"Unknown type was send by the Supervisor"
)
)
)
),
))
.await;
}
} else {
let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::new(
Err(
anyhow::Error::msg(format!(
"Unknown type was send by the Supervisor"
)
)
)
),
))
.await;
}
} }
// TODO else if response in metrics
// else if let Response ....
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
} }
} }
@ -481,38 +532,57 @@ impl MetricsExportable for Ifaces {
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
pub struct ProcessExtended { pub struct ProcessExtended {
name: String, name: String,
status: String, status: ProcessState,
pid: Pid, pid: Pid,
dependencies: Dependencies, dependencies: processes::deps::Dependencies,
cpu_usage: f32, cpu_usage: f32,
ram_usage: f32, ram_usage: u64,
virtual_mem_usage: u64, virtual_mem_usage: u64,
disks_usage_read_bytes: u64, disks_usage_read_bytes: u64,
disks_usage_write_bytes: u64, disks_usage_write_bytes: u64,
} }
impl ProcessExtended { impl ProcessExtended {
pub fn from_old_with_params(old: Arc<TrackingProcess>, pid: Pid, status: ProcessState) -> Self { fn add_metrics(&mut self, system: &mut System) {
if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) {
self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32;
self.ram_usage = system.total_memory() / prc.memory();
self.disks_usage_read_bytes = prc.disk_usage().total_read_bytes;
self.disks_usage_write_bytes = prc.disk_usage().total_written_bytes;
self.virtual_mem_usage = prc.virtual_memory();
}
}
pub fn from_process_query_all(
system: &mut System,
proc : processes::ProcessesAll
) -> Self {
system.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
return if let Some(prc) = system.process(proc.pid.new_sysinfo_pid()) {
let disk_usage = prc.disk_usage();
Self { Self {
name: old.name.clone(), name: proc.name,
status: status.to_string(), status: proc.state,
pid, pid: proc.pid,
dependencies: old.dependencies.clone(), dependencies: proc.dependencies,
cpu_usage: prc.cpu_usage(),
ram_usage: prc.memory(),
virtual_mem_usage: prc.virtual_memory(),
disks_usage_read_bytes: disk_usage.read_bytes,
disks_usage_write_bytes: disk_usage.written_bytes,
}
} else {
Self {
name: proc.name,
status: proc.state,
pid: proc.pid,
dependencies: proc.dependencies,
cpu_usage: 0.0, cpu_usage: 0.0,
ram_usage: 0.0, ram_usage: 0,
virtual_mem_usage: 0, virtual_mem_usage: 0,
disks_usage_read_bytes: 0, disks_usage_read_bytes: 0,
disks_usage_write_bytes: 0, disks_usage_write_bytes: 0,
} }
} }
fn add_metrics(&mut self, system: &mut System) {
if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) {
self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32;
self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32);
self.disks_usage_read_bytes = prc.disk_usage().total_read_bytes;
self.disks_usage_write_bytes = prc.disk_usage().total_written_bytes;
self.virtual_mem_usage = prc.virtual_memory();
}
} }
} }