cli <-> metrics done (almost)

migrate
prplV 2025-06-02 17:19:17 +03:00
parent c76b615341
commit 56f8474c01
8 changed files with 213 additions and 57 deletions

View File

@ -1,4 +1,5 @@
use clap::{Parser, Subcommand};
use metrics_models::MetricsMode;
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct Cli {
@ -42,11 +43,20 @@ pub enum Commands {
about = "To manage current process that is being monitoring",
)]
Process(ProcessCommand),
// config command =
#[command(
about = "To manage config settings",
)]
Config(ConfigCommand),
#[command(
about = "To get Noxis metrics",
)]
Metrics(MetricsCommand),
}
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
pub struct MetricsCommand {
#[command(subcommand)]
pub mode : MetricsMode,
}
#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)]
@ -167,11 +177,11 @@ pub enum ProcessAction {
}
pub mod metrics_models {
#[derive(Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
pub enum MetricsMode {
Full,
// system
Cpu,
Memory,
Ram,
Rom,
Network,

View File

@ -1,6 +1,5 @@
use tokio::net::UnixStream;
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use tokio::time::{Duration, sleep};
use anyhow::Result;
use super::Cli;
use super::cli_error::NoxisCliError;
@ -20,11 +19,14 @@ pub async fn try_send(cli: Cli) -> Result<()> {
.await
.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?;
let mut response = [0; 1024];
stream.read(&mut response)
.await
let mut response = Vec::new();
stream.read_to_end(&mut response).await
.map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?;
println!("{}", String::from_utf8_lossy(&response));
let response = String::from_utf8_lossy(&response);
for line in response.lines() {
println!("{}", line);
}
Ok(())
}

View File

@ -14,6 +14,7 @@ use tokio::sync::{broadcast, oneshot, mpsc};
use options::config::v2::init_config_mechanism;
use utils::v2::init_monitoring;
use utils::bus::Bus;
use utils::metrics::init_metrics_grubber;
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()>{
@ -30,11 +31,11 @@ async fn main() -> anyhow::Result<()>{
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
// to BUS channel
let (tx_to_bus, mut rx_to_bus) = mpsc::channel::<BusMessage>(5);
let (tx_to_bus, rx_to_bus) = mpsc::channel::<BusMessage>(5);
// from BUS channels
let (tx_to_cli, mut rx_to_cli) = mpsc::channel::<BusMessage>(5);
let (tx_to_supervisor, mut rx_to_supervisor) = mpsc::channel::<BusMessage>(5);
let (tx_to_metrics, mut rx_to_metrics) = mpsc::channel::<BusMessage>(5);
let (tx_to_cli, rx_to_cli) = mpsc::channel::<BusMessage>(5);
let (tx_to_supervisor, rx_to_supervisor) = mpsc::channel::<BusMessage>(5);
let (tx_to_metrics, rx_to_metrics) = mpsc::channel::<BusMessage>(5);
let tx_to_bus = Arc::new(tx_to_bus);
let tx_to_cli = Arc::new(tx_to_cli);
@ -85,6 +86,18 @@ async fn main() -> anyhow::Result<()>{
});
handler.push(cli_module);
// metrics
let tx_bus = tx_to_bus.clone();
let metrics_module = tokio::spawn(async move {
if let Err(er) = init_metrics_grubber(
tx_bus.clone(),
rx_to_metrics
).await {
error!("Metrics module crushed : {}", er);
}
});
handler.push(metrics_module);
// initilaizing task for deinitializing `Noxis`
let ctrlc = tokio::spawn(async move {
if let Err(er) = set_valid_destructor(vec![].into()).await {

View File

@ -8,7 +8,6 @@ use std::sync::Arc;
use tokio::io::{ AsyncWriteExt, AsyncReadExt};
use noxis_cli::{Cli, ProcessAction};
use crate::options::structs::bus::InternalCli;
use super::structs::Processes;
use super::structs::bus::BusMessage;
@ -125,8 +124,10 @@ async fn process_connection(
error_msg
},
};
if let Err(e) = stream.write_all(response.as_bytes()).await {
error!("Failed to send response: {}", e);
for line in response.lines() {
if let Err(er) = stream.write_all(line.as_bytes()).await {
error!("Failed to send response: {}", er);
}
}
}
Err(e) => {
@ -248,7 +249,7 @@ async fn process_cli_cmd(
let mut bus = bus_reciever.lock().await;
bus_sender.send(req).await?;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let resp = tokio::time::timeout(std::time::Duration::from_secs(10), async move {
let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move {
loop {
if let Ok(cont) = bus.try_recv() {
return cont
@ -259,17 +260,44 @@ async fn process_cli_cmd(
if let BusMessage::Response(_, _, content) = resp {
let content: Box<dyn Any> = content;
if let Ok(resp) = content.downcast::<String>() {
return Ok(*resp)
} else {
// TODO : REWRITE THIS
return Ok(String::from("OK"));
if let Ok(resp) = content.downcast::<anyhow::Result<String>>() {
return Ok((*resp)?)
}
}
Ok(String::from("OK"))
Err(anyhow::Error::msg(format!("Unknown type of response from supervisor")))
},
/* */
Commands::Status => Ok(String::from("Ok")),
Commands::Metrics(mode) => {
use crate::options::structs::bus::{BusMessageDirection, BusMessageContentType};
let mode = mode.mode;
if let Ok(()) = bus_sender.send(BusMessage::Request(
BusMessageDirection::ToMetrics,
BusMessageContentType::MetricsModeTransfered,
Box::new(mode)
)).await {
let mut bus_reciever = bus_reciever.lock().await;
sleep(Duration::from_millis(300)).await;
let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move {
loop {
if let Ok(cont) = bus_reciever.try_recv() {
return cont
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}).await?;
if let BusMessage::Response(_, _, content) = resp {
let content: Box<dyn Any> = content;
if let Ok(resp) = content.downcast::<anyhow::Result<String>>() {
// let mut file = std::fs::File::create("output.json")?;
// file.write_all(resp.unwrap_or_else(|_| String::from("no")).)?;
return Ok((*resp)?)
}
}
return Err(anyhow::Error::msg(format!("Unknown type of response from CLI")));
}
Ok(String::from("Ok"))
},
_ => Ok(String::from("Ok"))
}
}

View File

@ -10,10 +10,10 @@ pub mod bus {
use std::fmt::Debug;
use super::*;
use noxis_cli::Cli;
use noxis_cli::{Cli, metrics_models::MetricsMode};
use crate::utils::metrics::MetricsExportable;
pub type BusMessageContent = Box<dyn BusContent>;
pub type BusMessageContent = Box<dyn BusContent >;
#[derive(Debug)]
pub enum BusMessage {
@ -33,6 +33,8 @@ pub mod bus {
RawString,
Cli,
MetricsObj,
Result,
MetricsModeTransfered,
}
#[derive(Debug)]
@ -53,7 +55,11 @@ pub mod bus {
pub trait BusContent: Send + Sync + 'static + Debug + Any {
fn get_bus_type(&self) -> BusMessageContentType;
}
impl BusContent for anyhow::Result<String> {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::Result
}
}
impl BusContent for String {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::RawString
@ -74,6 +80,16 @@ pub mod bus {
BusMessageContentType::MetricsObj
}
}
impl BusContent for Box<dyn MetricsExportable> {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::MetricsObj
}
}
impl BusContent for MetricsMode {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::MetricsModeTransfered
}
}
}
#[derive(Debug)]

View File

@ -159,12 +159,12 @@ pub mod v2 {
};
let sender = self.bus.1.clone();
let resp_content = match res {
Ok(_) => format!("Ok on user call abour process {}", prc.name),
Err(er) => format!("Error: User call for process {} failed : {}", prc.name, er),
Ok(_) => Ok(format!("Ok on user call abour process {}", prc.name)),
Err(er) => Err(anyhow::Error::msg(format!("Error: User call for process {} failed : {}", prc.name, er))),
};
let _ = sender.send(BusMessage::Response(
crate::options::structs::bus::BusMessageDirection::ToCli,
BusMessageContentType::RawString,
BusMessageContentType::Result,
Box::new(resp_content)
)).await;
1
@ -176,7 +176,9 @@ pub mod v2 {
let _ = self.bus.1.clone().send(BusMessage::Response(
crate::options::structs::bus::BusMessageDirection::ToCli,
BusMessageContentType::RawString,
Box::new(format!("No process named `{}` was found in controlled scope", cli.prc))
Box::new(
Err(anyhow::Error::msg(format!("No process named `{}` was found in controlled scope", cli.prc)))
)
)).await;
}
}

View File

@ -1,25 +1,23 @@
// submodule needed to get metrics such as
// cpu load, ram/rom load and net activity
// use std::sync::Mutex;
use std::{collections::BTreeMap, sync::Arc};
use std::{any::Any, collections::BTreeMap, sync::Arc};
use crate::options::structs::{ProcessState, TrackingProcess};
use log::warn;
use noxis_cli::metrics_models::MetricsMode;
// use chrono::Duration;
use sysinfo::{System, Disks as DisksList, Networks};
use crate::options::structs::Dependencies;
use serde::Serialize;
use super::prcs::v2::Pid;
use std::fmt::Debug;
use crate::options::structs::bus::BusMessage;
// use pcap::{Device, Capture, Active};
// use std::net::Ipv4Addr;
// use anyhow::{Result, Ok};
// type PacketBuffer = Arc<Mutex<Vec<PacketInfo>>>;
use crate::options::structs::bus::{BusMessage, BusMessageDirection, BusMessageContentType};
// use noxis_cli::metrics_models::MetricsMode;
pub type MetricProcesses = Vec<ProcessExtended>;
type CoreUsage = BTreeMap<usize, CoreInfo>;
type Disks = Vec<Disk>;
type Ifaces = Vec<Network>;
pub type MetricProcesses = Vec<ProcessExtended>;
type BusReciever = tokio::sync::mpsc::Receiver<BusMessage>;
type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
@ -36,22 +34,64 @@ type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
///
/// *depends on* : -
///
#[allow(dead_code)]
pub async fn init_metrics_grubber(
/* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */
) {
bus_sender : BusSender,
bus_recirever : BusReciever,
) -> anyhow::Result<()> {
let mut system = System::new();
get_all_metrics(&mut system).await;
// get_all_metrics(&mut system).await;
/* TODO */
let mut bus_recirever = bus_recirever;
loop {
if let Ok(BusMessage::Request(_, _, cont)) = bus_recirever.try_recv() {
let cont: Box<dyn Any + Send> = cont;
match cont.downcast::<MetricsMode>() {
Err(_) => {
warn!("Unrecognized Metric mode was given");
let _ = bus_sender.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::Result,
Box::new(
Err(anyhow::Error::msg(format!("Unrecognized Metric mode was given"))
))
)).await;
}
Ok(mode) => {
let metric: Box<dyn MetricsExportable> = match *mode {
MetricsMode::Full => Box::new(get_all_metrics(&mut system).await),
MetricsMode::Cpu => Box::new(get_cpu_metrics(&mut system).await),
MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await),
MetricsMode::Rom => Box::new(get_all_disks_metrics().await),
MetricsMode::Network => Box::new(get_all_ifaces_metrics().await),
// MetricsMode::Processes => {},
_ => todo!(),
};
// let metric: Box<dyn BusContent> = Box::new(metric);
let metric = metric.serialze_into_output();
let _ = bus_sender.send(BusMessage::Response(
BusMessageDirection::ToCli,
BusMessageContentType::MetricsObj,
Box::new(metric)
)).await;
},
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
async fn get_all_metrics(system: &mut System) {
async fn get_all_metrics(system: &mut System) -> FullMetrics {
system.refresh_all();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
dbg!(get_host_info().await);
dbg!(get_cpu_metrics(system).await);
dbg!(get_ram_metrics(system).await);
dbg!(get_all_disks_metrics().await);
dbg!(get_all_ifaces_metrics().await);
let host = get_host_info().await;
let cpu = get_cpu_metrics(system).await;
let ram = get_ram_metrics(system).await;
let disks = get_all_disks_metrics().await;
let ifaces = get_all_ifaces_metrics().await;
let prcs = MetricProcesses::new();
FullMetrics::create(host, cpu, ram, disks, ifaces, prcs)
}
async fn get_host_info() -> HostInfo {
@ -147,7 +187,7 @@ async fn get_all_ifaces_metrics() -> Ifaces {
ifaces
}
async fn get_all_processes_metrics(system: &mut System) {}
async fn get_all_processes_metrics(system: &mut System) { /* TODO */}
pub enum MetricType {
FullMetrics,
@ -159,8 +199,9 @@ pub enum MetricType {
Processes,
}
pub trait MetricsExportable: Send + Sync + 'static + Debug {
pub trait MetricsExportable: Send + Sync + 'static + Debug + Any {
fn get_metric_type(&self) -> MetricType;
fn serialze_into_output(&self) -> anyhow::Result<String>;
}
#[derive(Serialize, Debug)]
@ -174,38 +215,65 @@ struct FullMetrics {
networks : Ifaces,
processes : MetricProcesses,
}
impl MetricsExportable for FullMetrics{
impl FullMetrics {
fn create(
host: HostInfo,
cpu : Cpu,
ram : Ram,
disks : Disks,
ifaces : Ifaces,
processes : MetricProcesses,
) -> Self {
Self {
hostname : host.hostname,
os : host.os,
kernel : host.kernel,
cpu,
ram,
disks,
networks : ifaces,
processes
}
}
}
impl MetricsExportable for FullMetrics {
fn get_metric_type(&self) -> MetricType {
MetricType::FullMetrics
}
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Debug)]
#[derive(Debug, Serialize)]
struct HostInfo {
hostname : String,
os : String,
kernel : String,
}
impl MetricsExportable for HostInfo{
impl MetricsExportable for HostInfo {
fn get_metric_type(&self) -> MetricType {
MetricType::HostInfo
}
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Serialize, Debug)]
struct Cpu {
global_usage : f32,
usage : CoreUsage,
}
impl MetricsExportable for Cpu{
impl MetricsExportable for Cpu {
fn get_metric_type(&self) -> MetricType {
MetricType::Cpu
}
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
@ -230,6 +298,9 @@ impl MetricsExportable for Ram{
fn get_metric_type(&self) -> MetricType {
MetricType::Ram
}
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Serialize, Debug)]
@ -248,6 +319,9 @@ impl MetricsExportable for Disks{
fn get_metric_type(&self) -> MetricType {
MetricType::Disks
}
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
// vec<Network>
@ -269,6 +343,9 @@ impl MetricsExportable for Ifaces {
fn get_metric_type(&self) -> MetricType {
MetricType::Ifaces
}
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[derive(Serialize, Debug)]
@ -317,6 +394,9 @@ impl MetricsExportable for MetricProcesses {
fn get_metric_type(&self) -> MetricType {
MetricType::Processes
}
fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
}
#[cfg(test)]

View File

@ -387,6 +387,9 @@ pub async fn is_frozen(name: &str) -> bool {
/// *depends on* : -
///
pub async fn terminate_process(name: &str) -> anyhow::Result<()> {
if !is_active(name).await {
return Err(anyhow::Error::msg(format!("Process {} is already stopped", name)))
}
let _ = Command::new("pkill")
.arg(name)
.output()?;
@ -466,7 +469,9 @@ pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<u32> {
/// *depends on* : -
///
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<u32> {
// let runsh = format!("{} {}", "exec", path);
if is_active(name).await {
return Err(anyhow::Error::msg(format!("Process {} is already running", name)))
}
let mut command = Command::new(path);
// command.arg(path);