info bus is ready now

migrate
prplV 2025-05-30 14:40:59 +03:00
parent dafc6dcbeb
commit e9f9bbc0b4
4 changed files with 193 additions and 4 deletions

View File

@ -6,6 +6,56 @@ use async_trait::async_trait;
use std::sync::Arc; use std::sync::Arc;
pub mod bus {
use std::fmt::Debug;
use super::*;
use noxis_cli::Cli;
use crate::utils::metrics::MetricsExportable;
pub type BusMessageContent = Box<dyn BusContent>;
#[derive(Debug)]
pub enum BusMessage {
Request(BusMessageDirection, BusMessageContentType, BusMessageContent),
Response(BusMessageDirection, BusMessageContentType, BusMessageContent),
}
#[derive(Debug)]
pub enum BusMessageDirection {
ToCli,
ToSupervisor,
ToMetrics,
}
#[derive(Debug)]
pub enum BusMessageContentType {
RawString,
Cli,
MetricsObj,
}
pub trait BusContent: Send + Sync + 'static + Debug {
fn get_bus_type(&self) -> BusMessageContentType;
}
impl BusContent for String {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::RawString
}
}
impl BusContent for Cli {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::Cli
}
}
impl BusContent for dyn MetricsExportable {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::Cli
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum DependencyType { pub enum DependencyType {
File, File,

View File

@ -3,6 +3,7 @@ pub mod hagent;
pub mod metrics; pub mod metrics;
pub mod prcs; pub mod prcs;
pub mod services; pub mod services;
pub mod bus;
use crate::options::structs::Processes; use crate::options::structs::Processes;
use log::{error, info}; use log::{error, info};

80
noxis-rs/src/utils/bus.rs Normal file
View File

@ -0,0 +1,80 @@
use std::sync::Arc;
use crate::options::structs::bus::{BusMessageDirection, BusMessage};
use crate::options::structs::ProcessUnit;
use tokio::sync::mpsc::{Sender, Receiver};
use log::{trace, debug, error};
type Inner = Receiver<BusMessage>;
type Outter = Arc<Sender<BusMessage>>;
#[derive(Debug )]
pub struct Highway {
to_cli : Outter,
to_supervisor : Outter,
to_metrics : Outter,
}
impl Highway {
fn new(to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self {
Self { to_cli, to_supervisor, to_metrics }
}
async fn send(&self, msg: BusMessage) -> anyhow::Result<()> {
let dir = match &msg {
BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => dir,
};
match dir {
BusMessageDirection::ToCli => self.send_cli(msg).await,
BusMessageDirection::ToSupervisor => self.send_supervisor(msg).await,
BusMessageDirection::ToMetrics => self.send_metrics(msg).await,
}
}
async fn send_cli(&self, msg: BusMessage) -> anyhow::Result<()> {
self.to_cli.send(msg).await?;
Ok(())
}
async fn send_supervisor(&self, msg: BusMessage) -> anyhow::Result<()> {
self.to_supervisor.send(msg).await?;
Ok(())
}
async fn send_metrics(&self, msg: BusMessage) -> anyhow::Result<()> {
self.to_metrics.send(msg).await?;
Ok(())
}
}
pub struct Bus {
inner : Inner,
highway : Highway,
}
impl Bus {
fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self {
Self { inner, highway: Highway::new(to_cli, to_supervisor, to_metrics) }
}
}
#[async_trait::async_trait]
impl ProcessUnit for Bus {
async fn process(&mut self) {
loop {
// TODO
while let Ok(content) = self.inner.try_recv() {
debug!("New message to the Bus : {:?}", &content);
let msg = match content {
BusMessage::Request(direction, content_type, content) => {
trace!("Bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type);
BusMessage::Response(direction, content_type, content)
},
BusMessage::Response(direction, content_type, content) => {
trace!("Bus has got a new Response with direction {:?} and type {:?}", &direction, &content_type);
BusMessage::Response(direction, content_type, content)
},
};
if let Err(er) = self.highway.send(msg).await {
error!("Cannot redirect message : {}", er);
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}

View File

@ -8,6 +8,7 @@ use sysinfo::{System, Disks as DisksList, Networks};
use crate::options::structs::Dependencies; use crate::options::structs::Dependencies;
use serde::Serialize; use serde::Serialize;
use super::prcs::v2::Pid; use super::prcs::v2::Pid;
use std::fmt::Debug;
// use pcap::{Device, Capture, Active}; // use pcap::{Device, Capture, Active};
// use std::net::Ipv4Addr; // use std::net::Ipv4Addr;
// use anyhow::{Result, Ok}; // use anyhow::{Result, Ok};
@ -17,8 +18,21 @@ use super::prcs::v2::Pid;
type CoreUsage = BTreeMap<usize, CoreInfo>; type CoreUsage = BTreeMap<usize, CoreInfo>;
type Disks = Vec<Disk>; type Disks = Vec<Disk>;
type Ifaces = Vec<Network>; type Ifaces = Vec<Network>;
pub type MetricProcesses = Vec<ProcessesExtended>; pub type MetricProcesses = Vec<ProcessExtended>;
pub enum MetricType {
FullMetrics,
HostInfo,
Cpu,
Ram,
Disks,
Ifaces,
Processes,
}
pub trait MetricsExportable: Send + Sync + 'static + Debug {
fn get_metric_type(&self) -> MetricType;
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct FullMetrics { struct FullMetrics {
@ -32,6 +46,12 @@ struct FullMetrics {
processes : MetricProcesses, processes : MetricProcesses,
} }
impl MetricsExportable for FullMetrics{
fn get_metric_type(&self) -> MetricType {
MetricType::FullMetrics
}
}
#[derive(Debug)] #[derive(Debug)]
struct HostInfo { struct HostInfo {
hostname : String, hostname : String,
@ -39,6 +59,13 @@ struct HostInfo {
kernel : String, kernel : String,
} }
impl MetricsExportable for HostInfo{
fn get_metric_type(&self) -> MetricType {
MetricType::HostInfo
}
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct Cpu { struct Cpu {
@ -46,6 +73,13 @@ struct Cpu {
usage : CoreUsage, usage : CoreUsage,
} }
impl MetricsExportable for Cpu{
fn get_metric_type(&self) -> MetricType {
MetricType::Cpu
}
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct CoreInfo { struct CoreInfo {
name: String, name: String,
@ -63,6 +97,12 @@ struct Ram {
total_swap : u64 total_swap : u64
} }
impl MetricsExportable for Ram{
fn get_metric_type(&self) -> MetricType {
MetricType::Ram
}
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct Disk { struct Disk {
name : String, name : String,
@ -75,6 +115,12 @@ struct Disk {
is_readonly : bool, is_readonly : bool,
} }
impl MetricsExportable for Disks{
fn get_metric_type(&self) -> MetricType {
MetricType::Disks
}
}
// vec<Network> // vec<Network>
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct Network { struct Network {
@ -90,8 +136,14 @@ struct Network {
errors_on_transmitted : u64, errors_on_transmitted : u64,
} }
impl MetricsExportable for Ifaces {
fn get_metric_type(&self) -> MetricType {
MetricType::Ifaces
}
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
pub struct ProcessesExtended { pub struct ProcessExtended {
name : String, name : String,
status : String, status : String,
pid : Pid, pid : Pid,
@ -103,7 +155,7 @@ pub struct ProcessesExtended {
disks_usage_write_bytes: u64, disks_usage_write_bytes: u64,
} }
impl ProcessesExtended { impl ProcessExtended {
pub fn from_old_with_params( pub fn from_old_with_params(
old : Arc<TrackingProcess>, old : Arc<TrackingProcess>,
pid : Pid, pid : Pid,
@ -132,6 +184,12 @@ impl ProcessesExtended {
} }
} }
impl MetricsExportable for MetricProcesses {
fn get_metric_type(&self) -> MetricType {
MetricType::Processes
}
}
/// # Fn `init_metrics_grubber` /// # Fn `init_metrics_grubber`
/// ## for initializing process of unstoppable grubbing metrics. /// ## for initializing process of unstoppable grubbing metrics.
/// ///