From e9f9bbc0b41cf737854dab29a2c0befce36b5000 Mon Sep 17 00:00:00 2001 From: prplV Date: Fri, 30 May 2025 14:40:59 +0300 Subject: [PATCH] info bus is ready now --- noxis-rs/src/options/structs.rs | 50 +++++++++++++++++++++ noxis-rs/src/utils.rs | 1 + noxis-rs/src/utils/bus.rs | 80 +++++++++++++++++++++++++++++++++ noxis-rs/src/utils/metrics.rs | 66 +++++++++++++++++++++++++-- 4 files changed, 193 insertions(+), 4 deletions(-) create mode 100644 noxis-rs/src/utils/bus.rs diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 51855d5..16faf23 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -6,6 +6,56 @@ use async_trait::async_trait; use std::sync::Arc; +pub mod bus { + use std::fmt::Debug; + + use super::*; + use noxis_cli::Cli; + use crate::utils::metrics::MetricsExportable; + + pub type BusMessageContent = Box; + + #[derive(Debug)] + pub enum BusMessage { + Request(BusMessageDirection, BusMessageContentType, BusMessageContent), + Response(BusMessageDirection, BusMessageContentType, BusMessageContent), + } + + #[derive(Debug)] + pub enum BusMessageDirection { + ToCli, + ToSupervisor, + ToMetrics, + } + + #[derive(Debug)] + pub enum BusMessageContentType { + RawString, + Cli, + MetricsObj, + } + + pub trait BusContent: Send + Sync + 'static + Debug { + fn get_bus_type(&self) -> BusMessageContentType; + } + + impl BusContent for String { + fn get_bus_type(&self) -> BusMessageContentType { + BusMessageContentType::RawString + } + } + impl BusContent for Cli { + fn get_bus_type(&self) -> BusMessageContentType { + BusMessageContentType::Cli + } + } + impl BusContent for dyn MetricsExportable { + fn get_bus_type(&self) -> BusMessageContentType { + BusMessageContentType::Cli + } + } +} + #[derive(Debug)] pub enum DependencyType { File, diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 2ef0d08..66f2f09 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -3,6 +3,7 @@ pub mod hagent; pub mod metrics; pub mod prcs; pub mod services; +pub mod bus; use crate::options::structs::Processes; use log::{error, info}; diff --git a/noxis-rs/src/utils/bus.rs b/noxis-rs/src/utils/bus.rs new file mode 100644 index 0000000..7f060a4 --- /dev/null +++ b/noxis-rs/src/utils/bus.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use crate::options::structs::bus::{BusMessageDirection, BusMessage}; +use crate::options::structs::ProcessUnit; +use tokio::sync::mpsc::{Sender, Receiver}; +use log::{trace, debug, error}; + +type Inner = Receiver; +type Outter = Arc>; + +#[derive(Debug )] +pub struct Highway { + to_cli : Outter, + to_supervisor : Outter, + to_metrics : Outter, +} +impl Highway { + fn new(to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self { + Self { to_cli, to_supervisor, to_metrics } + } + async fn send(&self, msg: BusMessage) -> anyhow::Result<()> { + let dir = match &msg { + BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => dir, + }; + match dir { + BusMessageDirection::ToCli => self.send_cli(msg).await, + BusMessageDirection::ToSupervisor => self.send_supervisor(msg).await, + BusMessageDirection::ToMetrics => self.send_metrics(msg).await, + } + } + async fn send_cli(&self, msg: BusMessage) -> anyhow::Result<()> { + self.to_cli.send(msg).await?; + Ok(()) + } + async fn send_supervisor(&self, msg: BusMessage) -> anyhow::Result<()> { + self.to_supervisor.send(msg).await?; + Ok(()) + } + async fn send_metrics(&self, msg: BusMessage) -> anyhow::Result<()> { + self.to_metrics.send(msg).await?; + Ok(()) + } +} + +pub struct Bus { + inner : Inner, + highway : Highway, +} + +impl Bus { + fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self { + Self { inner, highway: Highway::new(to_cli, to_supervisor, to_metrics) } + } +} + +#[async_trait::async_trait] +impl ProcessUnit for Bus { + async fn process(&mut self) { + loop { + // TODO + while let Ok(content) = self.inner.try_recv() { + debug!("New message to the Bus : {:?}", &content); + let msg = match content { + BusMessage::Request(direction, content_type, content) => { + trace!("Bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type); + BusMessage::Response(direction, content_type, content) + }, + BusMessage::Response(direction, content_type, content) => { + trace!("Bus has got a new Response with direction {:?} and type {:?}", &direction, &content_type); + BusMessage::Response(direction, content_type, content) + }, + }; + if let Err(er) = self.highway.send(msg).await { + error!("Cannot redirect message : {}", er); + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } +} diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index aa7774d..9db5eb9 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -8,6 +8,7 @@ use sysinfo::{System, Disks as DisksList, Networks}; use crate::options::structs::Dependencies; use serde::Serialize; use super::prcs::v2::Pid; +use std::fmt::Debug; // use pcap::{Device, Capture, Active}; // use std::net::Ipv4Addr; // use anyhow::{Result, Ok}; @@ -17,8 +18,21 @@ use super::prcs::v2::Pid; type CoreUsage = BTreeMap; type Disks = Vec; type Ifaces = Vec; -pub type MetricProcesses = Vec; +pub type MetricProcesses = Vec; +pub enum MetricType { + FullMetrics, + HostInfo, + Cpu, + Ram, + Disks, + Ifaces, + Processes, +} + +pub trait MetricsExportable: Send + Sync + 'static + Debug { + fn get_metric_type(&self) -> MetricType; +} #[derive(Serialize, Debug)] struct FullMetrics { @@ -32,6 +46,12 @@ struct FullMetrics { processes : MetricProcesses, } +impl MetricsExportable for FullMetrics{ + fn get_metric_type(&self) -> MetricType { + MetricType::FullMetrics + } +} + #[derive(Debug)] struct HostInfo { hostname : String, @@ -39,6 +59,13 @@ struct HostInfo { kernel : String, } +impl MetricsExportable for HostInfo{ + fn get_metric_type(&self) -> MetricType { + MetricType::HostInfo + } +} + + #[derive(Serialize, Debug)] struct Cpu { @@ -46,6 +73,13 @@ struct Cpu { usage : CoreUsage, } +impl MetricsExportable for Cpu{ + fn get_metric_type(&self) -> MetricType { + MetricType::Cpu + } +} + + #[derive(Serialize, Debug)] struct CoreInfo { name: String, @@ -63,6 +97,12 @@ struct Ram { total_swap : u64 } +impl MetricsExportable for Ram{ + fn get_metric_type(&self) -> MetricType { + MetricType::Ram + } +} + #[derive(Serialize, Debug)] struct Disk { name : String, @@ -75,7 +115,13 @@ struct Disk { is_readonly : bool, } - // vec +impl MetricsExportable for Disks{ + fn get_metric_type(&self) -> MetricType { + MetricType::Disks + } +} + +// vec #[derive(Serialize, Debug)] struct Network { iname : String, @@ -89,9 +135,15 @@ struct Network { errors_on_recieved : u64, errors_on_transmitted : u64, } + +impl MetricsExportable for Ifaces { + fn get_metric_type(&self) -> MetricType { + MetricType::Ifaces + } +} #[derive(Serialize, Debug)] -pub struct ProcessesExtended { +pub struct ProcessExtended { name : String, status : String, pid : Pid, @@ -103,7 +155,7 @@ pub struct ProcessesExtended { disks_usage_write_bytes: u64, } -impl ProcessesExtended { +impl ProcessExtended { pub fn from_old_with_params( old : Arc, pid : Pid, @@ -132,6 +184,12 @@ impl ProcessesExtended { } } +impl MetricsExportable for MetricProcesses { + fn get_metric_type(&self) -> MetricType { + MetricType::Processes + } +} + /// # Fn `init_metrics_grubber` /// ## for initializing process of unstoppable grubbing metrics. ///