From 56f8474c013f6d114ca00fd5e0901ce554de8df6 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 2 Jun 2025 17:19:17 +0300 Subject: [PATCH] cli <-> metrics done (almost) --- noxis-cli/src/cli.rs | 14 ++- noxis-cli/src/cli_net.rs | 12 ++- noxis-rs/src/main.rs | 21 ++++- noxis-rs/src/options/cli_pipeline.rs | 50 +++++++--- noxis-rs/src/options/structs.rs | 22 ++++- noxis-rs/src/utils.rs | 10 +- noxis-rs/src/utils/metrics.rs | 134 +++++++++++++++++++++------ noxis-rs/src/utils/prcs.rs | 7 +- 8 files changed, 213 insertions(+), 57 deletions(-) diff --git a/noxis-cli/src/cli.rs b/noxis-cli/src/cli.rs index f5e7672..30feba1 100644 --- a/noxis-cli/src/cli.rs +++ b/noxis-cli/src/cli.rs @@ -1,4 +1,5 @@ use clap::{Parser, Subcommand}; +use metrics_models::MetricsMode; #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] pub struct Cli { @@ -42,11 +43,20 @@ pub enum Commands { about = "To manage current process that is being monitoring", )] Process(ProcessCommand), - // config command = #[command( about = "To manage config settings", )] Config(ConfigCommand), + #[command( + about = "To get Noxis metrics", + )] + Metrics(MetricsCommand), +} + +#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] +pub struct MetricsCommand { + #[command(subcommand)] + pub mode : MetricsMode, } #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] @@ -167,11 +177,11 @@ pub enum ProcessAction { } pub mod metrics_models { + #[derive(Debug, clap::Parser, serde::Serialize, serde::Deserialize)] pub enum MetricsMode { Full, // system Cpu, - Memory, Ram, Rom, Network, diff --git a/noxis-cli/src/cli_net.rs b/noxis-cli/src/cli_net.rs index 6d31d12..622e118 100644 --- a/noxis-cli/src/cli_net.rs +++ b/noxis-cli/src/cli_net.rs @@ -1,6 +1,5 @@ use tokio::net::UnixStream; use tokio::io::{AsyncWriteExt, AsyncReadExt}; -use tokio::time::{Duration, sleep}; use anyhow::Result; use super::Cli; use super::cli_error::NoxisCliError; @@ -20,11 +19,14 @@ pub async fn try_send(cli: Cli) -> Result<()> { .await .map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?; - let mut response = [0; 1024]; - stream.read(&mut response) - .await + let mut response = Vec::new(); + stream.read_to_end(&mut response).await .map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?; - println!("{}", String::from_utf8_lossy(&response)); + let response = String::from_utf8_lossy(&response); + for line in response.lines() { + println!("{}", line); + } + Ok(()) } \ No newline at end of file diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 32565f8..969dff7 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -14,6 +14,7 @@ use tokio::sync::{broadcast, oneshot, mpsc}; use options::config::v2::init_config_mechanism; use utils::v2::init_monitoring; use utils::bus::Bus; +use utils::metrics::init_metrics_grubber; #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ @@ -30,11 +31,11 @@ async fn main() -> anyhow::Result<()>{ let mut handler: Vec> = vec![]; // to BUS channel - let (tx_to_bus, mut rx_to_bus) = mpsc::channel::(5); + let (tx_to_bus, rx_to_bus) = mpsc::channel::(5); // from BUS channels - let (tx_to_cli, mut rx_to_cli) = mpsc::channel::(5); - let (tx_to_supervisor, mut rx_to_supervisor) = mpsc::channel::(5); - let (tx_to_metrics, mut rx_to_metrics) = mpsc::channel::(5); + let (tx_to_cli, rx_to_cli) = mpsc::channel::(5); + let (tx_to_supervisor, rx_to_supervisor) = mpsc::channel::(5); + let (tx_to_metrics, rx_to_metrics) = mpsc::channel::(5); let tx_to_bus = Arc::new(tx_to_bus); let tx_to_cli = Arc::new(tx_to_cli); @@ -85,6 +86,18 @@ async fn main() -> anyhow::Result<()>{ }); handler.push(cli_module); + // metrics + let tx_bus = tx_to_bus.clone(); + let metrics_module = tokio::spawn(async move { + if let Err(er) = init_metrics_grubber( + tx_bus.clone(), + rx_to_metrics + ).await { + error!("Metrics module crushed : {}", er); + } + }); + handler.push(metrics_module); + // initilaizing task for deinitializing `Noxis` let ctrlc = tokio::spawn(async move { if let Err(er) = set_valid_destructor(vec![].into()).await { diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index 578c6fa..c47bd4d 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use noxis_cli::{Cli, ProcessAction}; use crate::options::structs::bus::InternalCli; - use super::structs::Processes; use super::structs::bus::BusMessage; @@ -125,8 +124,10 @@ async fn process_connection( error_msg }, }; - if let Err(e) = stream.write_all(response.as_bytes()).await { - error!("Failed to send response: {}", e); + for line in response.lines() { + if let Err(er) = stream.write_all(line.as_bytes()).await { + error!("Failed to send response: {}", er); + } } } Err(e) => { @@ -248,7 +249,7 @@ async fn process_cli_cmd( let mut bus = bus_reciever.lock().await; bus_sender.send(req).await?; tokio::time::sleep(std::time::Duration::from_millis(200)).await; - let resp = tokio::time::timeout(std::time::Duration::from_secs(10), async move { + let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move { loop { if let Ok(cont) = bus.try_recv() { return cont @@ -256,20 +257,47 @@ async fn process_cli_cmd( tokio::time::sleep(std::time::Duration::from_millis(500)).await; } }).await?; - + if let BusMessage::Response(_, _, content) = resp { let content: Box = content; - if let Ok(resp) = content.downcast::() { - return Ok(*resp) - } else { - // TODO : REWRITE THIS - return Ok(String::from("OK")); + if let Ok(resp) = content.downcast::>() { + return Ok((*resp)?) } } - Ok(String::from("OK")) + Err(anyhow::Error::msg(format!("Unknown type of response from supervisor"))) }, /* */ Commands::Status => Ok(String::from("Ok")), + Commands::Metrics(mode) => { + use crate::options::structs::bus::{BusMessageDirection, BusMessageContentType}; + let mode = mode.mode; + if let Ok(()) = bus_sender.send(BusMessage::Request( + BusMessageDirection::ToMetrics, + BusMessageContentType::MetricsModeTransfered, + Box::new(mode) + )).await { + let mut bus_reciever = bus_reciever.lock().await; + sleep(Duration::from_millis(300)).await; + let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move { + loop { + if let Ok(cont) = bus_reciever.try_recv() { + return cont + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + }).await?; + if let BusMessage::Response(_, _, content) = resp { + let content: Box = content; + if let Ok(resp) = content.downcast::>() { + // let mut file = std::fs::File::create("output.json")?; + // file.write_all(resp.unwrap_or_else(|_| String::from("no")).)?; + return Ok((*resp)?) + } + } + return Err(anyhow::Error::msg(format!("Unknown type of response from CLI"))); + } + Ok(String::from("Ok")) + }, _ => Ok(String::from("Ok")) } } diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index f40be37..5963b09 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -10,10 +10,10 @@ pub mod bus { use std::fmt::Debug; use super::*; - use noxis_cli::Cli; + use noxis_cli::{Cli, metrics_models::MetricsMode}; use crate::utils::metrics::MetricsExportable; - pub type BusMessageContent = Box; + pub type BusMessageContent = Box; #[derive(Debug)] pub enum BusMessage { @@ -33,6 +33,8 @@ pub mod bus { RawString, Cli, MetricsObj, + Result, + MetricsModeTransfered, } #[derive(Debug)] @@ -53,7 +55,11 @@ pub mod bus { pub trait BusContent: Send + Sync + 'static + Debug + Any { fn get_bus_type(&self) -> BusMessageContentType; } - + impl BusContent for anyhow::Result { + fn get_bus_type(&self) -> BusMessageContentType { + BusMessageContentType::Result + } + } impl BusContent for String { fn get_bus_type(&self) -> BusMessageContentType { BusMessageContentType::RawString @@ -74,6 +80,16 @@ pub mod bus { BusMessageContentType::MetricsObj } } + impl BusContent for Box { + fn get_bus_type(&self) -> BusMessageContentType { + BusMessageContentType::MetricsObj + } + } + impl BusContent for MetricsMode { + fn get_bus_type(&self) -> BusMessageContentType { + BusMessageContentType::MetricsModeTransfered + } + } } #[derive(Debug)] diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index a58a4d4..a34747a 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -159,12 +159,12 @@ pub mod v2 { }; let sender = self.bus.1.clone(); let resp_content = match res { - Ok(_) => format!("Ok on user call abour process {}", prc.name), - Err(er) => format!("Error: User call for process {} failed : {}", prc.name, er), + Ok(_) => Ok(format!("Ok on user call abour process {}", prc.name)), + Err(er) => Err(anyhow::Error::msg(format!("Error: User call for process {} failed : {}", prc.name, er))), }; let _ = sender.send(BusMessage::Response( crate::options::structs::bus::BusMessageDirection::ToCli, - BusMessageContentType::RawString, + BusMessageContentType::Result, Box::new(resp_content) )).await; 1 @@ -176,7 +176,9 @@ pub mod v2 { let _ = self.bus.1.clone().send(BusMessage::Response( crate::options::structs::bus::BusMessageDirection::ToCli, BusMessageContentType::RawString, - Box::new(format!("No process named `{}` was found in controlled scope", cli.prc)) + Box::new( + Err(anyhow::Error::msg(format!("No process named `{}` was found in controlled scope", cli.prc))) + ) )).await; } } diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index 5daeade..edc3f16 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -1,25 +1,23 @@ // submodule needed to get metrics such as // cpu load, ram/rom load and net activity -// use std::sync::Mutex; -use std::{collections::BTreeMap, sync::Arc}; +use std::{any::Any, collections::BTreeMap, sync::Arc}; use crate::options::structs::{ProcessState, TrackingProcess}; +use log::warn; +use noxis_cli::metrics_models::MetricsMode; +// use chrono::Duration; use sysinfo::{System, Disks as DisksList, Networks}; use crate::options::structs::Dependencies; use serde::Serialize; use super::prcs::v2::Pid; use std::fmt::Debug; -use crate::options::structs::bus::BusMessage; -// use pcap::{Device, Capture, Active}; -// use std::net::Ipv4Addr; -// use anyhow::{Result, Ok}; - -// type PacketBuffer = Arc>>; +use crate::options::structs::bus::{BusMessage, BusMessageDirection, BusMessageContentType}; +// use noxis_cli::metrics_models::MetricsMode; +pub type MetricProcesses = Vec; type CoreUsage = BTreeMap; type Disks = Vec; type Ifaces = Vec; -pub type MetricProcesses = Vec; type BusReciever = tokio::sync::mpsc::Receiver; type BusSender = Arc>; @@ -36,22 +34,64 @@ type BusSender = Arc>; /// /// *depends on* : - /// -#[allow(dead_code)] pub async fn init_metrics_grubber( /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ -) { + bus_sender : BusSender, + bus_recirever : BusReciever, +) -> anyhow::Result<()> { let mut system = System::new(); - get_all_metrics(&mut system).await; + // get_all_metrics(&mut system).await; + /* TODO */ + let mut bus_recirever = bus_recirever; + loop { + if let Ok(BusMessage::Request(_, _, cont)) = bus_recirever.try_recv() { + let cont: Box = cont; + match cont.downcast::() { + Err(_) => { + warn!("Unrecognized Metric mode was given"); + let _ = bus_sender.send(BusMessage::Response( + BusMessageDirection::ToCli, + BusMessageContentType::Result, + Box::new( + Err(anyhow::Error::msg(format!("Unrecognized Metric mode was given")) + )) + )).await; + } + Ok(mode) => { + let metric: Box = match *mode { + MetricsMode::Full => Box::new(get_all_metrics(&mut system).await), + MetricsMode::Cpu => Box::new(get_cpu_metrics(&mut system).await), + MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await), + MetricsMode::Rom => Box::new(get_all_disks_metrics().await), + MetricsMode::Network => Box::new(get_all_ifaces_metrics().await), + // MetricsMode::Processes => {}, + _ => todo!(), + }; + // let metric: Box = Box::new(metric); + let metric = metric.serialze_into_output(); + + let _ = bus_sender.send(BusMessage::Response( + BusMessageDirection::ToCli, + BusMessageContentType::MetricsObj, + Box::new(metric) + )).await; + }, + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } } -async fn get_all_metrics(system: &mut System) { +async fn get_all_metrics(system: &mut System) -> FullMetrics { system.refresh_all(); tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - dbg!(get_host_info().await); - dbg!(get_cpu_metrics(system).await); - dbg!(get_ram_metrics(system).await); - dbg!(get_all_disks_metrics().await); - dbg!(get_all_ifaces_metrics().await); + let host = get_host_info().await; + let cpu = get_cpu_metrics(system).await; + let ram = get_ram_metrics(system).await; + let disks = get_all_disks_metrics().await; + let ifaces = get_all_ifaces_metrics().await; + let prcs = MetricProcesses::new(); + FullMetrics::create(host, cpu, ram, disks, ifaces, prcs) } async fn get_host_info() -> HostInfo { @@ -147,7 +187,7 @@ async fn get_all_ifaces_metrics() -> Ifaces { ifaces } -async fn get_all_processes_metrics(system: &mut System) {} +async fn get_all_processes_metrics(system: &mut System) { /* TODO */} pub enum MetricType { FullMetrics, @@ -159,8 +199,9 @@ pub enum MetricType { Processes, } -pub trait MetricsExportable: Send + Sync + 'static + Debug { +pub trait MetricsExportable: Send + Sync + 'static + Debug + Any { fn get_metric_type(&self) -> MetricType; + fn serialze_into_output(&self) -> anyhow::Result; } #[derive(Serialize, Debug)] @@ -174,38 +215,65 @@ struct FullMetrics { networks : Ifaces, processes : MetricProcesses, } - -impl MetricsExportable for FullMetrics{ +impl FullMetrics { + fn create( + host: HostInfo, + cpu : Cpu, + ram : Ram, + disks : Disks, + ifaces : Ifaces, + processes : MetricProcesses, + ) -> Self { + Self { + hostname : host.hostname, + os : host.os, + kernel : host.kernel, + cpu, + ram, + disks, + networks : ifaces, + processes + } + } +} +impl MetricsExportable for FullMetrics { fn get_metric_type(&self) -> MetricType { MetricType::FullMetrics } + fn serialze_into_output(&self) -> anyhow::Result { + Ok(serde_json::to_string_pretty(self)?) + } } -#[derive(Debug)] +#[derive(Debug, Serialize)] struct HostInfo { hostname : String, os : String, kernel : String, } -impl MetricsExportable for HostInfo{ +impl MetricsExportable for HostInfo { fn get_metric_type(&self) -> MetricType { MetricType::HostInfo } + fn serialze_into_output(&self) -> anyhow::Result { + Ok(serde_json::to_string_pretty(self)?) + } } - - #[derive(Serialize, Debug)] struct Cpu { global_usage : f32, usage : CoreUsage, } -impl MetricsExportable for Cpu{ +impl MetricsExportable for Cpu { fn get_metric_type(&self) -> MetricType { MetricType::Cpu } + fn serialze_into_output(&self) -> anyhow::Result { + Ok(serde_json::to_string_pretty(self)?) + } } @@ -230,6 +298,9 @@ impl MetricsExportable for Ram{ fn get_metric_type(&self) -> MetricType { MetricType::Ram } + fn serialze_into_output(&self) -> anyhow::Result { + Ok(serde_json::to_string_pretty(self)?) + } } #[derive(Serialize, Debug)] @@ -248,6 +319,9 @@ impl MetricsExportable for Disks{ fn get_metric_type(&self) -> MetricType { MetricType::Disks } + fn serialze_into_output(&self) -> anyhow::Result { + Ok(serde_json::to_string_pretty(self)?) + } } // vec @@ -269,6 +343,9 @@ impl MetricsExportable for Ifaces { fn get_metric_type(&self) -> MetricType { MetricType::Ifaces } + fn serialze_into_output(&self) -> anyhow::Result { + Ok(serde_json::to_string_pretty(self)?) + } } #[derive(Serialize, Debug)] @@ -317,6 +394,9 @@ impl MetricsExportable for MetricProcesses { fn get_metric_type(&self) -> MetricType { MetricType::Processes } + fn serialze_into_output(&self) -> anyhow::Result { + Ok(serde_json::to_string_pretty(self)?) + } } #[cfg(test)] diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 2fe958f..d43c9e7 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -387,6 +387,9 @@ pub async fn is_frozen(name: &str) -> bool { /// *depends on* : - /// pub async fn terminate_process(name: &str) -> anyhow::Result<()> { + if !is_active(name).await { + return Err(anyhow::Error::msg(format!("Process {} is already stopped", name))) + } let _ = Command::new("pkill") .arg(name) .output()?; @@ -466,7 +469,9 @@ pub async fn restart_process(name: &str, path: &str) -> anyhow::Result { /// *depends on* : - /// pub async fn start_process(name: &str, path: &str) -> anyhow::Result { - // let runsh = format!("{} {}", "exec", path); + if is_active(name).await { + return Err(anyhow::Error::msg(format!("Process {} is already running", name))) + } let mut command = Command::new(path); // command.arg(path);