TODO else if response in metrics

migrate
prplV 2025-06-03 18:20:35 +03:00
parent 8ba911385f
commit 888fb41885
16 changed files with 1683 additions and 1223 deletions

View File

@ -184,6 +184,10 @@ pub mod metrics_models {
)] )]
Full, Full,
// system // system
#[command(
about = "To capture general host info",
)]
Host,
#[command( #[command(
about = "To capture detailed CPU metrics", about = "To capture detailed CPU metrics",
)] )]

View File

@ -2,22 +2,22 @@ mod options;
mod utils; mod utils;
use log::{error, info}; use log::{error, info};
use options::cli_pipeline::init_cli_pipeline;
use options::config::v2::init_config_mechanism;
use options::logger::setup_logger; use options::logger::setup_logger;
use options::preboot::PrebootParams;
use options::signals::set_valid_destructor; use options::signals::set_valid_destructor;
use options::structs::ProcessUnit; use options::structs::ProcessUnit;
use options::structs::{Processes, bus::BusMessage}; use options::structs::{bus::BusMessage, Processes};
use options::cli_pipeline::init_cli_pipeline;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use options::preboot::PrebootParams; use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::sync::{broadcast, oneshot, mpsc};
use options::config::v2::init_config_mechanism;
use utils::v2::init_monitoring;
use utils::bus::Bus; use utils::bus::Bus;
use utils::metrics::init_metrics_grubber; use utils::metrics::init_metrics_grubber;
use utils::v2::init_monitoring;
#[tokio::main(flavor = "multi_thread", worker_threads = 4)] #[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()>{ async fn main() -> anyhow::Result<()> {
let preboot = Arc::new(PrebootParams::validate()); let preboot = Arc::new(PrebootParams::validate());
let _ = setup_logger(); let _ = setup_logger();
@ -43,7 +43,12 @@ async fn main() -> anyhow::Result<()>{
let tx_to_metrics = Arc::new(tx_to_metrics); let tx_to_metrics = Arc::new(tx_to_metrics);
let bus_module = tokio::spawn(async move { let bus_module = tokio::spawn(async move {
let mut bus = Bus::new(rx_to_bus, tx_to_cli.clone(), tx_to_supervisor.clone(), tx_to_metrics.clone()); let mut bus = Bus::new(
rx_to_bus,
tx_to_cli.clone(),
tx_to_supervisor.clone(),
tx_to_metrics.clone(),
);
bus.process().await; bus.process().await;
error!("Info Bus crushed !"); error!("Info Bus crushed !");
}); });
@ -52,11 +57,7 @@ async fn main() -> anyhow::Result<()>{
// initilaizing task for config manipulations // initilaizing task for config manipulations
let preboot_config = preboot.clone(); let preboot_config = preboot.clone();
let config_module = tokio::spawn(async move { let config_module = tokio::spawn(async move {
let _ = init_config_mechanism( let _ = init_config_mechanism(rx_oneshot, tx_brd, preboot_config).await;
rx_oneshot,
tx_brd,
preboot_config
).await;
}); });
handler.push(config_module); handler.push(config_module);
@ -71,7 +72,7 @@ async fn main() -> anyhow::Result<()>{
break match rx_cli_brd.try_recv() { break match rx_cli_brd.try_recv() {
Ok(conf) => conf, Ok(conf) => conf,
Err(_) => continue, Err(_) => continue,
} };
} }
}; };
if let Err(er) = init_cli_pipeline( if let Err(er) = init_cli_pipeline(
@ -79,8 +80,10 @@ async fn main() -> anyhow::Result<()>{
Arc::new(config), Arc::new(config),
tx_oneshot, tx_oneshot,
rx_to_cli, rx_to_cli,
tx_bus.clone() tx_bus.clone(),
).await { )
.await
{
error!("CLI pipeline failed due to {}", er) error!("CLI pipeline failed due to {}", er)
} }
}); });
@ -89,10 +92,7 @@ async fn main() -> anyhow::Result<()>{
// metrics // metrics
let tx_bus = tx_to_bus.clone(); let tx_bus = tx_to_bus.clone();
let metrics_module = tokio::spawn(async move { let metrics_module = tokio::spawn(async move {
if let Err(er) = init_metrics_grubber( if let Err(er) = init_metrics_grubber(tx_bus.clone(), rx_to_metrics).await {
tx_bus.clone(),
rx_to_metrics
).await {
error!("Metrics module crushed : {}", er); error!("Metrics module crushed : {}", er);
} }
}); });
@ -116,14 +116,10 @@ async fn main() -> anyhow::Result<()>{
break match rx_brd.try_recv() { break match rx_brd.try_recv() {
Ok(conf) => conf, Ok(conf) => conf,
Err(_) => continue, Err(_) => continue,
} };
} }
}; };
if let Err(er) = init_monitoring( if let Err(er) = init_monitoring(config, rx_to_supervisor, tx_bus).await {
config,
rx_to_supervisor,
tx_bus
).await {
error!("Monitoring mod failed due to {}", er); error!("Monitoring mod failed due to {}", er);
} }
}); });

View File

@ -1,8 +1,8 @@
// ! gathering optional items module // ! gathering optional items module
pub mod cli_pipeline;
pub mod config; pub mod config;
pub mod logger; pub mod logger;
pub mod preboot;
pub mod signals; pub mod signals;
pub mod structs; pub mod structs;
pub mod preboot;
pub mod cli_pipeline;

View File

@ -1,15 +1,15 @@
use super::structs::bus::BusMessage;
use super::structs::Processes;
use crate::options::structs::bus::InternalCli;
use log::{error, info}; use log::{error, info};
use tokio::net::{ UnixStream, UnixListener }; use noxis_cli::{Cli, ProcessAction};
use tokio::sync::{Mutex, OnceCell};
use tokio::time::{sleep, Duration};
use std::any::Any; use std::any::Any;
use std::fs; use std::fs;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use noxis_cli::{Cli, ProcessAction}; use tokio::net::{UnixListener, UnixStream};
use crate::options::structs::bus::InternalCli; use tokio::sync::{Mutex, OnceCell};
use super::structs::Processes; use tokio::time::{sleep, Duration};
use super::structs::bus::BusMessage;
use super::preboot::PrebootParams; use super::preboot::PrebootParams;
@ -34,19 +34,15 @@ type ReadyBusReciever = Arc<Mutex<tokio::sync::mpsc::Receiver<BusMessage>>>;
/// ///
pub async fn init_cli_pipeline( pub async fn init_cli_pipeline(
params: Arc<PrebootParams>, params: Arc<PrebootParams>,
config : Arc<Processes>, config: Arc<Processes>,
config_gateway : ConfigGateway, config_gateway: ConfigGateway,
bus_reciever : BusReciever, bus_reciever: BusReciever,
bus_sender : BusSender, bus_sender: BusSender,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let socket_path = &params.self_socket; let socket_path = &params.self_socket;
let _ = fs::remove_file(socket_path); let _ = fs::remove_file(socket_path);
let config_gateway = Arc::new( let config_gateway = Arc::new(Mutex::new(OnceCell::new_with(Some(config_gateway))));
Mutex::new(
OnceCell::new_with(Some(config_gateway))
)
);
let bus_reciever = Arc::new(Mutex::new(bus_reciever)); let bus_reciever = Arc::new(Mutex::new(bus_reciever));
match UnixListener::bind(socket_path) { match UnixListener::bind(socket_path) {
@ -64,21 +60,29 @@ pub async fn init_cli_pipeline(
let bus_reciever = bus_reciever.clone(); let bus_reciever = bus_reciever.clone();
let bus_sender = bus_sender.clone(); let bus_sender = bus_sender.clone();
tokio::spawn(async move { tokio::spawn(async move {
process_connection(socket, params.clone(), config.clone(), config_gateway.clone(), bus_reciever, bus_sender).await; process_connection(
socket,
params.clone(),
config.clone(),
config_gateway.clone(),
bus_reciever,
bus_sender,
)
.await;
}); });
}, }
Err(er) => { Err(er) => {
error!("Cannot poll connection to CLI due to {}", er); error!("Cannot poll connection to CLI due to {}", er);
sleep(Duration::from_millis(300)).await; sleep(Duration::from_millis(300)).await;
}, }
} }
} }
// Ok(()) // Ok(())
}, }
Err(er) => { Err(er) => {
error!("Failed to open UnixListener for CLI"); error!("Failed to open UnixListener for CLI");
Err(er.into()) Err(er.into())
}, }
} }
} }
@ -98,31 +102,38 @@ pub async fn init_cli_pipeline(
async fn process_connection( async fn process_connection(
mut stream: UnixStream, mut stream: UnixStream,
params: Arc<PrebootParams>, params: Arc<PrebootParams>,
config : Arc<Processes>, config: Arc<Processes>,
cfg_gateway : ProcessedConfigGateway, cfg_gateway: ProcessedConfigGateway,
bus_reciever : ReadyBusReciever, bus_reciever: ReadyBusReciever,
bus_sender : BusSender, bus_sender: BusSender,
) { ) {
let mut buf = vec![0; 1024]; let mut buf = vec![0; 1024];
match stream.read(&mut buf).await { match stream.read(&mut buf).await {
Ok(0) => { Ok(0) => {
info!("Client disconnected "); info!("Client disconnected ");
}, }
Ok(n) => { Ok(n) => {
buf.truncate(n); buf.truncate(n);
info!("CLI have sent {} bytes", n); info!("CLI have sent {} bytes", n);
match serde_json::from_slice::<Cli>(&buf) { match serde_json::from_slice::<Cli>(&buf) {
Ok(cli) => { Ok(cli) => {
info!("Received CLI request: {:?}", cli); info!("Received CLI request: {:?}", cli);
let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone(), bus_reciever.clone(), bus_sender.clone()).await { let response = match process_cli_cmd(
Ok(response) => { cli,
response params.clone(),
}, config,
cfg_gateway.clone(),
bus_reciever.clone(),
bus_sender.clone(),
)
.await
{
Ok(response) => response,
Err(er) => { Err(er) => {
let error_msg = format!("Error: {}", er); let error_msg = format!("Error: {}", er);
error!("{}", &error_msg); error!("{}", &error_msg);
error_msg error_msg
}, }
}; };
for line in response.lines() { for line in response.lines() {
if let Err(er) = stream.write_all(line.as_bytes()).await { if let Err(er) = stream.write_all(line.as_bytes()).await {
@ -134,36 +145,35 @@ async fn process_connection(
error!("Failed to parse CLI request: {}", e); error!("Failed to parse CLI request: {}", e);
} }
} }
}, }
Err(e) => error!("Failed to read from socket: {}", e), Err(e) => error!("Failed to read from socket: {}", e),
} }
let _ = stream.shutdown().await; let _ = stream.shutdown().await;
} }
async fn process_cli_cmd( async fn process_cli_cmd(
cli : Cli, cli: Cli,
params: Arc<PrebootParams>, params: Arc<PrebootParams>,
global_config : Arc<Processes>, global_config: Arc<Processes>,
cfg_gateway: ProcessedConfigGateway, cfg_gateway: ProcessedConfigGateway,
bus_reciever : ReadyBusReciever, bus_reciever: ReadyBusReciever,
bus_sender : BusSender, bus_sender: BusSender,
) -> anyhow::Result<String> { ) -> anyhow::Result<String> {
use noxis_cli::{Commands, ConfigAction}; use noxis_cli::{Commands, ConfigAction};
return match cli.command { return match cli.command {
Commands::Config(config) => { Commands::Config(config) => {
match config.action { match config.action {
ConfigAction::Show(env ) => { ConfigAction::Show(env) => {
if env.is_env { if env.is_env {
Ok(serde_json::to_string_pretty(params.as_ref())?) Ok(serde_json::to_string_pretty(params.as_ref())?)
} else { } else {
/* */ /* */
Ok(serde_json::to_string_pretty(global_config.as_ref())?) Ok(serde_json::to_string_pretty(global_config.as_ref())?)
} }
}, }
ConfigAction::Reset => { ConfigAction::Reset => Err(anyhow::Error::msg(
Err(anyhow::Error::msg("It's temporarly forbidden to reset current config using CLI-util")) "It's temporarly forbidden to reset current config using CLI-util",
}, )),
ConfigAction::Local(cfg) => { ConfigAction::Local(cfg) => {
if cfg.is_json { if cfg.is_json {
/* */ /* */
@ -180,71 +190,102 @@ async fn process_cli_cmd(
match lock.take() { match lock.take() {
Some(channel) => { Some(channel) => {
let _ = channel.send(new_config); let _ = channel.send(new_config);
}, }
None => error!("Cannot update confif due to channel sender loss"), None => error!(
"Cannot update confif due to channel sender loss"
),
} }
}); });
Ok(format!("Ok. Saving and reloading with version {}", new_version)) Ok(format!(
}, "Ok. Saving and reloading with version {}",
_ => Err(anyhow::Error::msg(format!("Local config (version: {}) is more actual", global_config.get_version()))), new_version
))
} }
_ => Err(anyhow::Error::msg(format!(
"Local config (version: {}) is more actual",
global_config.get_version()
))),
};
} else { } else {
Err(anyhow::Error::msg("It's temporarly forbidden to set config in non-json mode")) Err(anyhow::Error::msg(
"It's temporarly forbidden to set config in non-json mode",
))
} }
}, }
ConfigAction::Remote => {Ok(params.remote_server_url.clone())}, ConfigAction::Remote => Ok(params.remote_server_url.clone()),
/* */ /* */
// _ => Err(anyhow::Error::msg("Unrecognized command from CLI")) // _ => Err(anyhow::Error::msg("Unrecognized command from CLI"))
} }
}, }
Commands::Processes => {
use crate::options::structs::bus::{BusMessageContentType, BusMessageDirection};
use crate::utils::metrics::processes::ProcessesQuery;
let _ = bus_sender
.send(BusMessage::Request(
BusMessageDirection::ToSupervisor,
BusMessageContentType::ProcessQuery,
Box::new(ProcessesQuery::QueryAll),
))
.await;
let mut bus = bus_reciever.lock().await;
let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move {
loop {
if let Ok(cont) = bus.try_recv() {
return cont;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
})
.await?;
if let BusMessage::Response(_, _, content) = resp {
let content: Box<dyn Any> = content;
if let Ok(resp) = content.downcast::<anyhow::Result<String>>() {
return Ok((*resp)?);
}
}
Err(anyhow::Error::msg(format!(
"Unknown type of response from the Supervisor"
)))
}
Commands::Process(prc) => { Commands::Process(prc) => {
use crate::options::structs::bus::{BusMessageDirection, BusMessageContentType, CLiCommand}; use crate::options::structs::bus::{
BusMessageContentType, BusMessageDirection, CLiCommand,
};
let proc_name = prc.process; let proc_name = prc.process;
let req = BusMessage::Request( let req = BusMessage::Request(
BusMessageDirection::ToSupervisor, BusMessageDirection::ToSupervisor,
BusMessageContentType::Cli, BusMessageContentType::Cli,
Box::new( Box::new(match prc.action {
match prc.action { ProcessAction::Start => InternalCli {
ProcessAction::Start => { prc: proc_name,
InternalCli { cmd: CLiCommand::Start,
prc : proc_name,
cmd : CLiCommand::Start,
}
}, },
ProcessAction::Stop => { ProcessAction::Stop => InternalCli {
InternalCli { prc: proc_name,
prc : proc_name, cmd: CLiCommand::Stop,
cmd : CLiCommand::Stop,
}
}, },
ProcessAction::Restart => { ProcessAction::Restart => InternalCli {
InternalCli { prc: proc_name,
prc : proc_name, cmd: CLiCommand::Restart,
cmd : CLiCommand::Restart,
}
}, },
ProcessAction::Freeze => { ProcessAction::Freeze => InternalCli {
InternalCli { prc: proc_name,
prc : proc_name, cmd: CLiCommand::Freeze,
cmd : CLiCommand::Freeze,
}
}, },
ProcessAction::Unfreeze => { ProcessAction::Unfreeze => InternalCli {
InternalCli { prc: proc_name,
prc : proc_name, cmd: CLiCommand::Unfreeze,
cmd : CLiCommand::Unfreeze,
}
}, },
/* TODO: ALL CMDS */ /* TODO: ALL CMDS */
_ => { _ => InternalCli {
InternalCli { prc: proc_name,
prc : proc_name, cmd: CLiCommand::Restart,
cmd : CLiCommand::Restart,
}
}, },
} }),
)
); );
let mut bus = bus_reciever.lock().await; let mut bus = bus_reciever.lock().await;
bus_sender.send(req).await?; bus_sender.send(req).await?;
@ -252,52 +293,58 @@ async fn process_cli_cmd(
let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move { let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move {
loop { loop {
if let Ok(cont) = bus.try_recv() { if let Ok(cont) = bus.try_recv() {
return cont return cont;
} }
tokio::time::sleep(std::time::Duration::from_millis(500)).await; tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} }
}).await?; })
.await?;
if let BusMessage::Response(_, _, content) = resp { if let BusMessage::Response(_, _, content) = resp {
let content: Box<dyn Any> = content; let content: Box<dyn Any> = content;
if let Ok(resp) = content.downcast::<anyhow::Result<String>>() { if let Ok(resp) = content.downcast::<anyhow::Result<String>>() {
return Ok((*resp)?) return Ok((*resp)?);
} }
} }
Err(anyhow::Error::msg(format!("Unknown type of response from supervisor"))) Err(anyhow::Error::msg(format!(
}, "Unknown type of response from the Supervisor"
/* */ )))
}
Commands::Status => Ok(String::from("Ok")), Commands::Status => Ok(String::from("Ok")),
Commands::Inspect(mode) => { Commands::Inspect(mode) => {
use crate::options::structs::bus::{BusMessageDirection, BusMessageContentType}; use crate::options::structs::bus::{BusMessageContentType, BusMessageDirection};
let mode = mode.mode; let mode = mode.mode;
if let Ok(()) = bus_sender.send(BusMessage::Request( if let Ok(()) = bus_sender
.send(BusMessage::Request(
BusMessageDirection::ToMetrics, BusMessageDirection::ToMetrics,
BusMessageContentType::MetricsModeTransfered, BusMessageContentType::MetricsModeTransfered,
Box::new(mode) Box::new(mode),
)).await { ))
.await
{
let mut bus_reciever = bus_reciever.lock().await; let mut bus_reciever = bus_reciever.lock().await;
sleep(Duration::from_millis(300)).await; sleep(Duration::from_millis(300)).await;
let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move { let resp = tokio::time::timeout(std::time::Duration::from_secs(5), async move {
loop { loop {
if let Ok(cont) = bus_reciever.try_recv() { if let Ok(cont) = bus_reciever.try_recv() {
return cont return cont;
} }
tokio::time::sleep(std::time::Duration::from_millis(500)).await; tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} }
}).await?; })
.await?;
if let BusMessage::Response(_, _, content) = resp { if let BusMessage::Response(_, _, content) = resp {
let content: Box<dyn Any> = content; let content: Box<dyn Any> = content;
if let Ok(resp) = content.downcast::<anyhow::Result<String>>() { if let Ok(resp) = content.downcast::<anyhow::Result<String>>() {
// let mut file = std::fs::File::create("output.json")?; return Ok((*resp)?);
// file.write_all(resp.unwrap_or_else(|_| String::from("no")).)?;
return Ok((*resp)?)
} }
} }
return Err(anyhow::Error::msg(format!("Unknown type of response from CLI"))); return Err(anyhow::Error::msg(format!(
"Unknown type of response from CLI"
)));
} }
Ok(String::from("Ok")) Ok(String::from("Ok"))
},
_ => Ok(String::from("Ok"))
} }
_ => Ok(String::from("Ok")),
};
} }

View File

@ -1,27 +1,29 @@
use super::preboot::PrebootParams;
use super::structs::*; use super::structs::*;
use crate::utils::files::create_watcher;
use inotify::EventMask;
use log::{error, info, warn}; use log::{error, info, warn};
use redis::{Client, Connection}; use redis::{Client, Connection};
use std::fs::File;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Write; use std::io::Write;
use std::os::unix::process::CommandExt; use std::os::unix::process::CommandExt;
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
use std::{env, fs}; use std::{env, fs};
use super::preboot::PrebootParams;
use tokio::time::{Duration, sleep};
use tokio::sync::{ use tokio::sync::{
broadcast::Receiver as BroadcastReceiver,
broadcast::Sender as BroadcastSender,
oneshot, oneshot,
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender }, oneshot::{Receiver as OneShotReciever, Sender as OneShotSender},
broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver }; };
use crate::utils::files::create_watcher; use tokio::time::{sleep, Duration};
use std::fs::File;
use inotify::EventMask;
// const CONFIG_PATH: &str = "settings.json"; // const CONFIG_PATH: &str = "settings.json";
pub mod v2 { pub mod v2 {
use std::path::PathBuf;
use crate::utils::get_container_id; use crate::utils::get_container_id;
use std::path::PathBuf;
use super::*; use super::*;
@ -29,10 +31,9 @@ pub mod v2 {
// to handle cli config changes // to handle cli config changes
cli_oneshot: OneShotReciever<Processes>, cli_oneshot: OneShotReciever<Processes>,
// to share local config with PRCS, CLI_PIPELINE and CONFIG modules // to share local config with PRCS, CLI_PIPELINE and CONFIG modules
brd_tx : BroadcastSender<Processes>, brd_tx: BroadcastSender<Processes>,
// preboot params (args) // preboot params (args)
params : Arc<PrebootParams> params: Arc<PrebootParams>, /*...*/
/*...*/
) { ) {
// channel for pubsub to handle local config pulling // channel for pubsub to handle local config pulling
let local_config_brd_reciever = brd_tx.subscribe(); let local_config_brd_reciever = brd_tx.subscribe();
@ -44,40 +45,24 @@ pub mod v2 {
// dbg!("before lc"); // dbg!("before lc");
let params_clone = params.clone(); let params_clone = params.clone();
let for_lc_path = params.clone(); let for_lc_path = params.clone();
let lc_path = for_lc_path let lc_path = for_lc_path.config.to_str().unwrap_or("settings.json");
.config
.to_str()
.unwrap_or("settings.json");
// future to init work with local config // future to init work with local config
let lc_future = tokio::spawn( let lc_future = tokio::spawn(
// let params = params.clone(); // let params = params.clone();
local_config_reciever( local_config_reciever(params_clone, rx_pb_lc, rx_cli_lc, Arc::new(brd_tx)),
params_clone,
rx_pb_lc,
rx_cli_lc,
Arc::new(brd_tx)
)
); );
// dbg!("before pb"); // dbg!("before pb");
// future to init work with pub sub mechanism // future to init work with pub sub mechanism
let pubsub_future = tokio::spawn( let pubsub_future = tokio::spawn(pubsub_config_reciever(
pubsub_config_reciever(
tx_pb_lc, tx_pb_lc,
params.clone(), params.clone(),
local_config_brd_reciever local_config_brd_reciever,
) ));
);
// dbg!("before cli"); // dbg!("before cli");
// future to catch new configs from cli pipeline // future to catch new configs from cli pipeline
let cli_future = tokio::spawn( let cli_future = tokio::spawn(from_cli_config_reciever(cli_oneshot, tx_cli_lc));
from_cli_config_reciever(
cli_oneshot,
tx_cli_lc
)
);
// let _ = lc_future.await; // let _ = lc_future.await;
// dbg!("before select"); // dbg!("before select");
tokio::select! { tokio::select! {
@ -140,7 +125,10 @@ pub mod v2 {
pub async fn get_redis_connection(params: &str) -> Option<Connection> { pub async fn get_redis_connection(params: &str) -> Option<Connection> {
for i in 1..=3 { for i in 1..=3 {
let redis_url = format!("redis://{}/", params); let redis_url = format!("redis://{}/", params);
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i); info!(
"Trying to connect Redis pubsub `{}`. Attempt {}",
&redis_url, i
);
if let Ok(client) = Client::open(redis_url) { if let Ok(client) = Client::open(redis_url) {
if let Ok(conn) = client.get_connection() { if let Ok(conn) = client.get_connection() {
info!("Successfully opened Redis connection"); info!("Successfully opened Redis connection");
@ -156,10 +144,10 @@ pub mod v2 {
// loop checking redis pubsub // loop checking redis pubsub
async fn pubsub_config_reciever( async fn pubsub_config_reciever(
// to stop checking local config // to stop checking local config
local_conf_tx : OneShotSender<bool>, local_conf_tx: OneShotSender<bool>,
params : Arc<PrebootParams>, params: Arc<PrebootParams>,
tx_brd_local : BroadcastReceiver<Processes>, tx_brd_local: BroadcastReceiver<Processes>,
) -> anyhow::Result<()>{ ) -> anyhow::Result<()> {
/*...*/ /*...*/
// dbg!("start of pb"); // dbg!("start of pb");
let mut tx_brd_local = tx_brd_local; let mut tx_brd_local = tx_brd_local;
@ -180,19 +168,24 @@ pub mod v2 {
Some(mut conn) => { Some(mut conn) => {
let mut pub_sub = conn.as_pubsub(); let mut pub_sub = conn.as_pubsub();
let channel_name = get_container_id().unwrap_or(String::from("default")); let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim(); match pub_sub.subscribe(&channel_name) {
match pub_sub.subscribe(channel_name) {
Err(er) => { Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er); error!("Cannot subscribe pubsub channel due to {}", &er);
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) return Err(anyhow::Error::msg(format!(
}, "Cannot subscribe pubsub channel due to {}",
er
)));
}
Ok(_) => { Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name); info!(
"Successfully subscribed to {} pubsub channel",
&channel_name
);
let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(1))); let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(1)));
loop { loop {
if let Ok(msg) = pub_sub.get_message() { if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message"); // dbg!("ok on get message");
let payload : Result<String, _> = msg.get_payload(); let payload: Result<String, _> = msg.get_payload();
match payload { match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
Ok(payload) => { Ok(payload) => {
@ -226,9 +219,9 @@ pub mod v2 {
// delay // delay
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
},
} }
}, }
}
None => { None => {
sleep(Duration::from_secs(20)).await; sleep(Duration::from_secs(20)).await;
} }
@ -238,10 +231,10 @@ pub mod v2 {
// //
async fn local_config_reciever( async fn local_config_reciever(
params : Arc<PrebootParams>, params: Arc<PrebootParams>,
pubsub_oneshot : OneShotReciever<bool>, pubsub_oneshot: OneShotReciever<bool>,
cli_oneshot : OneShotReciever<bool>, cli_oneshot: OneShotReciever<bool>,
brd_tx : Arc<BroadcastSender<Processes>>, brd_tx: Arc<BroadcastSender<Processes>>,
/*...*/ /*...*/
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
/*...*/ /*...*/
@ -251,10 +244,7 @@ pub mod v2 {
// fill with default empty config, mut to change later // fill with default empty config, mut to change later
let mut _current_config = Processes::default(); let mut _current_config = Processes::default();
// PathBuf to &str to work with local config path as slice // PathBuf to &str to work with local config path as slice
let local_config_path = params let local_config_path = params.config.to_str().unwrap_or("settings.json");
.config
.to_str()
.unwrap_or("settings.json");
match load_processes(local_config_path) { match load_processes(local_config_path) {
// if local exists // if local exists
@ -264,13 +254,13 @@ pub mod v2 {
if let Err(er) = brd_tx.send(_current_config.clone()) { if let Err(er) = brd_tx.send(_current_config.clone()) {
error!("Cannot share local config with broadcast due to {}", er); error!("Cannot share local config with broadcast due to {}", er);
} }
}, }
// if local is not exist // if local is not exist
None => { None => {
warn!("Local config wasn't found. Waiting for new ..."); warn!("Local config wasn't found. Waiting for new ...");
return Err(anyhow::Error::msg("No local config")); return Err(anyhow::Error::msg("No local config"));
// ... // ...
}, }
} }
// 100% local exists here // 100% local exists here
@ -329,7 +319,9 @@ pub mod v2 {
} }
// exporting data // exporting data
if need_to_export_config { if need_to_export_config {
if let Err(er) = export_saved_config_data_locally(&params.config, &_current_config).await { if let Err(er) =
export_saved_config_data_locally(&params.config, &_current_config).await
{
error!("Cannot save actual imported config due to {}", er); error!("Cannot save actual imported config due to {}", er);
} else { } else {
// recreation watcher (draining activity buffer mechanism) // recreation watcher (draining activity buffer mechanism)
@ -344,19 +336,20 @@ pub mod v2 {
sleep(Duration::from_millis(300)).await; sleep(Duration::from_millis(300)).await;
// tokio::task::yield_now().await; // tokio::task::yield_now().await;
} }
}, }
Err(_) => { Err(_) => {
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path); error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
return Err(anyhow::Error::msg("Cannot create watcher on local config file")); return Err(anyhow::Error::msg(
}, "Cannot create watcher on local config file",
));
}
} }
} }
// [:IN-TEST] // [:IN-TEST]
async fn from_cli_config_reciever( async fn from_cli_config_reciever(
cli_oneshot: OneShotReciever<Processes>, cli_oneshot: OneShotReciever<Processes>,
to_local_tx: OneShotSender<bool> to_local_tx: OneShotSender<bool>,
) -> Option<Processes> { ) -> Option<Processes> {
/* match awaits til channel*/ /* match awaits til channel*/
// dbg!("start of cli"); // dbg!("start of cli");
@ -364,33 +357,31 @@ pub mod v2 {
if !cli_oneshot.is_empty() { if !cli_oneshot.is_empty() {
match cli_oneshot.await { match cli_oneshot.await {
Ok(config_from_cli) => { Ok(config_from_cli) => {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); info!(
"New actual config `{}` from CLI was pulled. Saving and restaring ...",
&config_from_cli.date_of_creation
);
let _ = to_local_tx.send(true); let _ = to_local_tx.send(true);
return Some(config_from_cli) return Some(config_from_cli);
}, }
_ => return None, _ => return None,
} }
} }
sleep(Duration::from_millis(300)).await; sleep(Duration::from_millis(300)).await;
} }
} }
async fn export_saved_config_data_locally( async fn export_saved_config_data_locally(
config_file_path: &PathBuf, config_file_path: &PathBuf,
current_config: &Processes current_config: &Processes,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut file = File::create(config_file_path)?; let mut file = File::create(config_file_path)?;
file.write_all( file.write_all(serde_json::to_string_pretty(current_config)?.as_bytes())?;
serde_json::to_string_pretty(current_config)?.as_bytes()
)?;
Ok(()) Ok(())
// Ok(()) // Ok(())
} }
} }
/// # Fn `load_processes` /// # Fn `load_processes`
/// ## for reading and parsing *local* storing config /// ## for reading and parsing *local* storing config
/// ///

View File

@ -65,8 +65,7 @@ trait FromEnv {
impl FromEnv for LevelFilter { impl FromEnv for LevelFilter {
fn from_env() -> LevelFilter { fn from_env() -> LevelFilter {
return match std::env::var("NOXIS_MAX_LOG_LEVEL") { return match std::env::var("NOXIS_MAX_LOG_LEVEL") {
Ok(var) => { Ok(var) => match var.to_ascii_lowercase().trim().as_ref() {
match var.to_ascii_lowercase().trim().as_ref() {
"trace" => LevelFilter::Trace, "trace" => LevelFilter::Trace,
"debug" => LevelFilter::Debug, "debug" => LevelFilter::Debug,
"info" => LevelFilter::Info, "info" => LevelFilter::Info,
@ -74,10 +73,9 @@ impl FromEnv for LevelFilter {
"warn" => LevelFilter::Warn, "warn" => LevelFilter::Warn,
"off" => LevelFilter::Off, "off" => LevelFilter::Off,
_ => LevelFilter::Info, _ => LevelFilter::Info,
}
}, },
Err(_) => LevelFilter::Info, Err(_) => LevelFilter::Info,
} };
} }
} }

View File

@ -2,11 +2,11 @@
//! Module to handle `pre-boot params` of the monitor (calling also as `settings`) //! Module to handle `pre-boot params` of the monitor (calling also as `settings`)
//! //!
#[allow(unused_imports)] #[allow(unused_imports)]
use anyhow::{Result, Error}; use anyhow::{Error, Result};
use log::warn;
use std::path::PathBuf;
use std::env::var;
use dotenv::dotenv; use dotenv::dotenv;
use log::warn;
use std::env::var;
use std::path::PathBuf;
/// # Enum `MetricsPrebootParams` /// # Enum `MetricsPrebootParams`
/// ## for setting up metrics mode as preboot param from command prompt /// ## for setting up metrics mode as preboot param from command prompt
@ -181,14 +181,14 @@ impl std::fmt::Display for MetricsPrebootParams {
pub struct PrebootParams { pub struct PrebootParams {
// pub no_hostagent : bool, // pub no_hostagent : bool,
pub no_logs: bool, pub no_logs: bool,
pub refresh_logs : bool, pub refresh_logs: bool,
pub no_sub : bool, pub no_sub: bool,
// pub socket_path : PathBuf, // pub socket_path : PathBuf,
pub log_to : PathBuf, pub log_to: PathBuf,
pub remote_server_url : String, pub remote_server_url: String,
pub config : PathBuf, pub config: PathBuf,
pub metrics: MetricsPrebootParams, pub metrics: MetricsPrebootParams,
pub self_socket : PathBuf, pub self_socket: PathBuf,
} }
/// # implementation for `MetricsPrebootParams` /// # implementation for `MetricsPrebootParams`
@ -204,19 +204,19 @@ impl PrebootParams {
// Err(_) => false, // Err(_) => false,
// } // }
// }, // },
no_logs : { no_logs: {
match var("NOXIS_NO_LOGS") { match var("NOXIS_NO_LOGS") {
Ok(_) => true, Ok(_) => true,
Err(_) => false, Err(_) => false,
} }
}, },
refresh_logs : { refresh_logs: {
match var("NOXIS_REFRESH_LOGS") { match var("NOXIS_REFRESH_LOGS") {
Ok(_) => true, Ok(_) => true,
Err(_) => false, Err(_) => false,
} }
}, },
no_sub : { no_sub: {
match var("NOXIS_NO_SUB") { match var("NOXIS_NO_SUB") {
Ok(_) => true, Ok(_) => true,
Err(_) => false, Err(_) => false,
@ -229,45 +229,48 @@ impl PrebootParams {
// Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"), // Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"),
// } // }
// }, // },
log_to : { log_to: {
match var("NOXIS_LOG_TO") { match var("NOXIS_LOG_TO") {
Ok(val) => PathBuf::from(val), Ok(val) => PathBuf::from(val),
Err(_) => PathBuf::from("./"), Err(_) => PathBuf::from("./"),
} }
}, },
remote_server_url : { remote_server_url: {
match var("NOXIS_REMOTE_SERVER_URL") { match var("NOXIS_REMOTE_SERVER_URL") {
Ok(val) => val, Ok(val) => val,
Err(_) => String::from("localhost"), Err(_) => String::from("localhost"),
} }
}, },
config : { config: {
match var("NOXIS_CONFIG_PATH") { match var("NOXIS_CONFIG_PATH") {
Ok(val) => PathBuf::from(val), Ok(val) => PathBuf::from(val),
Err(_) => PathBuf::from("./settings.json"), Err(_) => PathBuf::from("./settings.json"),
} }
}, },
metrics : { metrics: {
match var("NOXIS_METRICS_MODE") { match var("NOXIS_METRICS_MODE") {
Ok(val) => MetricsPrebootParams::from_env(&val), Ok(val) => MetricsPrebootParams::from_env(&val),
Err(_) => MetricsPrebootParams::Full, Err(_) => MetricsPrebootParams::Full,
} }
}, },
self_socket : { self_socket: {
match var("NOXIS_SOCKET_PATH") { match var("NOXIS_SOCKET_PATH") {
Ok(val) => PathBuf::from(val), Ok(val) => PathBuf::from(val),
Err(_) => { Err(_) => {
let default = std::env::current_dir().expect("Crushed on getting current_dir path. Check fs state!"); let default = std::env::current_dir()
warn!("$NOXIS_SOCKET_PATH wans't set. Default value - {}", default.display()); .expect("Crushed on getting current_dir path. Check fs state!");
warn!(
"$NOXIS_SOCKET_PATH wans't set. Default value - {}",
default.display()
);
PathBuf::from(default) PathBuf::from(default)
}, }
} }
}, },
} }
} }
} }
// unit tests of preboot params parsing mech // unit tests of preboot params parsing mech
// #[cfg(test)] // #[cfg(test)]
// mod preboot_unitests{ // mod preboot_unitests{

View File

@ -1,24 +1,32 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::net::Ipv4Addr;
use serde::{Deserialize, Serialize};
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::Arc; use serde::{Deserialize, Serialize};
use std::any::Any; use std::any::Any;
use std::net::Ipv4Addr;
use std::sync::Arc;
pub mod bus { pub mod bus {
use std::fmt::Debug; use std::fmt::Debug;
use super::*; use super::*;
use noxis_cli::{Cli, metrics_models::MetricsMode}; use crate::utils::metrics::processes::{ProcessesAll, ProcessesGeneral, ProcessesQuery};
use crate::utils::metrics::MetricsExportable; use crate::utils::metrics::MetricsExportable;
use noxis_cli::{metrics_models::MetricsMode, Cli};
pub type BusMessageContent = Box<dyn BusContent>; pub type BusMessageContent = Box<dyn BusContent>;
#[derive(Debug)] #[derive(Debug)]
pub enum BusMessage { pub enum BusMessage {
Request(BusMessageDirection, BusMessageContentType, BusMessageContent), Request(
Response(BusMessageDirection, BusMessageContentType, BusMessageContent), BusMessageDirection,
BusMessageContentType,
BusMessageContent,
),
Response(
BusMessageDirection,
BusMessageContentType,
BusMessageContent,
),
} }
#[derive(Debug)] #[derive(Debug)]
@ -35,6 +43,7 @@ pub mod bus {
MetricsObj, MetricsObj,
Result, Result,
MetricsModeTransfered, MetricsModeTransfered,
ProcessQuery,
} }
#[derive(Debug)] #[derive(Debug)]
@ -43,13 +52,13 @@ pub mod bus {
Stop, Stop,
Restart, Restart,
Freeze, Freeze,
Unfreeze Unfreeze,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct InternalCli { pub struct InternalCli {
pub prc : String, pub prc: String,
pub cmd : CLiCommand, pub cmd: CLiCommand,
} }
pub trait BusContent: Send + Sync + 'static + Debug + Any { pub trait BusContent: Send + Sync + 'static + Debug + Any {
@ -90,6 +99,11 @@ pub mod bus {
BusMessageContentType::MetricsModeTransfered BusMessageContentType::MetricsModeTransfered
} }
} }
impl BusContent for ProcessesQuery {
fn get_bus_type(&self) -> BusMessageContentType {
BusMessageContentType::ProcessQuery
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -98,11 +112,20 @@ pub enum DependencyType {
Service, Service,
} }
#[derive(Debug)] #[derive(Debug, Serialize, Clone, Copy)]
pub enum ServiceState { pub enum ServiceState {
Ok, Ok,
Unavailable Unavailable,
} }
impl std::fmt::Display for ServiceState {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
ServiceState::Ok => write!(f, "Ok"),
ServiceState::Unavailable => write!(f, "Unavailable"),
};
}
}
pub struct ServiceWaitConfig(u32); pub struct ServiceWaitConfig(u32);
impl Default for ServiceWaitConfig { impl Default for ServiceWaitConfig {
@ -121,48 +144,87 @@ impl std::fmt::Display for FileTriggerType {
return match self { return match self {
FileTriggerType::OnChange => write!(f, "File was changed"), FileTriggerType::OnChange => write!(f, "File was changed"),
FileTriggerType::OnDelete => write!(f, "File was moved or deleted"), FileTriggerType::OnDelete => write!(f, "File was moved or deleted"),
} };
} }
} }
impl<'a> FileTriggerType { impl<'a> FileTriggerType {
pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events { pub fn event(&self, file_name: Arc<str>, trigger: Arc<str>) -> Events {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)), file_name,
DependencyType::File,
trigger,
)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(
file_name,
DependencyType::File,
trigger,
)),
};
} }
} pub fn event_from_file_trigger_controller(
pub fn event_from_file_trigger_controller(&self, file_name: Arc<str>, trigger: &FileTriggersForController) -> Events { &self,
file_name: Arc<str>,
trigger: &FileTriggersForController,
) -> Events {
return match self { return match self {
FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())), FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())), file_name,
} DependencyType::File,
trigger.on_change.clone(),
)),
FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(
file_name,
DependencyType::File,
trigger.on_delete.clone(),
)),
};
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Triggers { pub enum Triggers {
File { on_change: Arc<str>, on_delete: Arc<str> }, File {
Service {on_lost: Arc<str>, wait: u32}, on_change: Arc<str>,
on_delete: Arc<str>,
},
Service {
on_lost: Arc<str>,
wait: u32,
},
} }
impl Triggers { impl Triggers {
pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers { pub fn new_file(on_change: Arc<str>, on_delete: Arc<str>) -> Triggers {
Triggers::File { on_change, on_delete } Triggers::File {
on_change,
on_delete,
}
} }
pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers { pub fn new_service(on_lost: Arc<str>, wait_time: u32) -> Triggers {
Triggers::Service{on_lost, wait: wait_time} Triggers::Service {
on_lost,
wait: wait_time,
}
} }
pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> { pub fn to_service_negative_event(&self, service_name: Arc<str>) -> Option<Events> {
if let Triggers::Service { on_lost, .. } = self { if let Triggers::Service { on_lost, .. } = self {
return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone()))) return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(
service_name,
DependencyType::Service,
on_lost.clone(),
)));
} }
None None
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct FileTriggersForController{ pub on_change: Arc<str>, pub on_delete: Arc<str> } pub struct FileTriggersForController {
pub on_change: Arc<str>,
pub on_delete: Arc<str>,
}
pub struct ServiceTriggersForController(Arc<str>); pub struct ServiceTriggersForController(Arc<str>);
impl std::fmt::Display for DependencyType { impl std::fmt::Display for DependencyType {
@ -170,11 +232,11 @@ impl std::fmt::Display for DependencyType {
return match self { return match self {
DependencyType::File => write!(f, "File"), DependencyType::File => write!(f, "File"),
DependencyType::Service => write!(f, "Service"), DependencyType::Service => write!(f, "Service"),
} };
} }
} }
#[derive(Debug)] #[derive(Debug, serde::Serialize, Clone, Copy)]
pub enum ProcessState { pub enum ProcessState {
Pending, Pending,
Holding, Holding,
@ -186,19 +248,18 @@ impl std::fmt::Display for ProcessState {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self { return match self {
ProcessState::Pending => write!(f, "Running"), ProcessState::Pending => write!(f, "Running"),
ProcessState::Holding => write!(f, "Holding"), ProcessState::Holding => write!(f, "Frozen"),
ProcessState::Stopped => write!(f, "Stopped"), ProcessState::Stopped => write!(f, "Stopped"),
ProcessState::StoppedByCli => write!(f, "Forcibly stopped"), ProcessState::StoppedByCli => write!(f, "Stopped by Admin"),
ProcessState::HoldingByCli => write!(f, "Forcibly holding"), ProcessState::HoldingByCli => write!(f, "Frozen by Admin"),
} };
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Events { pub enum Events {
Positive(Arc<str>), Positive(Arc<str>),
Negative(NegativeOutcomes) Negative(NegativeOutcomes),
} }
#[derive(Debug)] #[derive(Debug)]
pub enum NegativeOutcomes { pub enum NegativeOutcomes {
@ -249,8 +310,8 @@ pub struct Processes {
impl Default for Processes { impl Default for Processes {
fn default() -> Self { fn default() -> Self {
Self { Self {
date_of_creation : String::new(), date_of_creation: String::new(),
processes : Vec::new(), processes: Vec::new(),
} }
} }
} }
@ -418,24 +479,23 @@ pub struct FileTriggers {
/// ///
/// *depends on* : `ContainerMetrics`, `ProcessMetrics` /// *depends on* : `ContainerMetrics`, `ProcessMetrics`
/// ///
#[derive(Debug, Clone, Serialize,)] #[derive(Debug, Clone, Serialize)]
pub struct Metrics { pub struct Metrics {
pub container_metrics : ContainerMetrics, pub container_metrics: ContainerMetrics,
pub processes_metrics : Vec<ProcessMetrics>, pub processes_metrics: Vec<ProcessMetrics>,
// pub net_metrics : Vec<PacketInfo>, // pub net_metrics : Vec<PacketInfo>,
} }
/// ## Metrics struct's constructor /// ## Metrics struct's constructor
impl Metrics { impl Metrics {
pub fn new(cm: ContainerMetrics, prm: Vec<ProcessMetrics>) -> Self { pub fn new(cm: ContainerMetrics, prm: Vec<ProcessMetrics>) -> Self {
Metrics { Metrics {
container_metrics : cm, container_metrics: cm,
processes_metrics : prm, processes_metrics: prm,
// net_metrics : net, // net_metrics : net,
} }
} }
} }
/// # Container metrics struct /// # Container metrics struct
/// ## for gathering all container metrics /// ## for gathering all container metrics
/// ///
@ -445,20 +505,20 @@ impl Metrics {
/// ///
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct ContainerMetrics { pub struct ContainerMetrics {
container_id : String, container_id: String,
cpu_load : f32, cpu_load: f32,
ram_load : f32, ram_load: f32,
// pub net_activity : ??? // pub net_activity : ???
processes : Vec<String>, processes: Vec<String>,
} }
/// ## Container struct's constructor /// ## Container struct's constructor
impl ContainerMetrics { impl ContainerMetrics {
pub fn new(container_id : &str, cpu: f32, ram: f32, subsystems: Vec<String>,) -> Self{ pub fn new(container_id: &str, cpu: f32, ram: f32, subsystems: Vec<String>) -> Self {
ContainerMetrics { ContainerMetrics {
container_id : String::from(container_id), container_id: String::from(container_id),
cpu_load : cpu, cpu_load: cpu,
ram_load : ram, ram_load: ram,
processes : subsystems, processes: subsystems,
} }
} }
} }
@ -472,17 +532,17 @@ impl ContainerMetrics {
/// ///
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct ProcessMetrics { pub struct ProcessMetrics {
pub process_name : String, pub process_name: String,
cpu_load : f32, cpu_load: f32,
ram_load : f32, ram_load: f32,
} }
/// ## Process struct's constructor /// ## Process struct's constructor
impl ProcessMetrics { impl ProcessMetrics {
pub fn new(process_name :&str, cpu: f32, ram: f32) -> Self { pub fn new(process_name: &str, cpu: f32, ram: f32) -> Self {
ProcessMetrics { ProcessMetrics {
process_name : String::from(process_name), process_name: String::from(process_name),
cpu_load : cpu, cpu_load: cpu,
ram_load : ram, ram_load: ram,
} }
} }
} }
@ -496,19 +556,19 @@ impl ProcessMetrics {
/// ///
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct PacketInfo { pub struct PacketInfo {
protocol : String, protocol: String,
dst_ip : Ipv4Addr, dst_ip: Ipv4Addr,
src_ip : Ipv4Addr, src_ip: Ipv4Addr,
size : usize, size: usize,
} }
/// ## PacketInfo's constructor /// ## PacketInfo's constructor
impl PacketInfo { impl PacketInfo {
pub fn new(prt: String, dest: Ipv4Addr, src: Ipv4Addr, size_of_packet: usize) -> Self { pub fn new(prt: String, dest: Ipv4Addr, src: Ipv4Addr, size_of_packet: usize) -> Self {
PacketInfo { PacketInfo {
protocol : prt, protocol: prt,
dst_ip : dest, dst_ip: dest,
src_ip : src, src_ip: src,
size : size_of_packet, size: size_of_packet,
} }
} }
} }

View File

@ -1,34 +1,40 @@
pub mod bus;
pub mod files; pub mod files;
pub mod hagent; pub mod hagent;
pub mod metrics; pub mod metrics;
pub mod prcs; pub mod prcs;
pub mod services; pub mod services;
pub mod bus;
use crate::options::structs::bus::{BusMessage, BusMessageContentType, InternalCli};
use crate::options::structs::Processes; use crate::options::structs::Processes;
use async_trait::async_trait;
use files::v2::FilesController;
use lazy_static::lazy_static;
use log::{error, info}; use log::{error, info};
use prcs::v2::ProcessesController;
use services::v2::ServicesController;
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Duration; use tokio::time::Duration;
use prcs::v2::ProcessesController;
use files::v2::FilesController;
use services::v2::ServicesController;
use async_trait::async_trait;
use lazy_static::lazy_static;
use crate::options::structs::bus::{BusMessage, InternalCli, BusMessageContentType};
lazy_static! { lazy_static! {
static ref GET_ID_CMD : &'static str = "hostname"; static ref GET_ID_CMD: &'static str = "hostname";
} }
// const GET_ID_CMD: &str = "hostname"; // const GET_ID_CMD: &str = "hostname";
pub mod v2 { pub mod v2 {
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
use crate::options::structs::{bus::CLiCommand, Events, FileTriggersForController, ProcessUnit, Triggers};
use super::*; use super::*;
use crate::utils::metrics::processes::{ProcessesAll, ProcessesQuery};
use crate::{
options::structs::{
bus::CLiCommand, Events, FileTriggersForController, ProcessUnit, Triggers,
},
utils::metrics::processes::deps::{Dependencies, FilesExtended, ServicesExtended},
};
use std::any::Any; use std::any::Any;
use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque};
type BusReciever = tokio::sync::mpsc::Receiver<BusMessage>; type BusReciever = tokio::sync::mpsc::Receiver<BusMessage>;
type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>; type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
@ -42,21 +48,26 @@ pub mod v2 {
#[derive(Debug)] #[derive(Debug)]
struct Supervisor { struct Supervisor {
prcs : LinkedList<ProcessesController>, prcs: LinkedList<ProcessesController>,
files : LinkedList<FilesController>, files: LinkedList<FilesController>,
services : LinkedList<ServicesController>, services: LinkedList<ServicesController>,
config : Arc<Processes>, config: Arc<Processes>,
bus : (BusReciever, BusSender), bus: (BusReciever, BusSender),
} }
impl Supervisor { impl Supervisor {
pub fn new(bus_reciever : BusReciever, bus_sender: BusSender) -> Supervisor { pub fn new(bus_reciever: BusReciever, bus_sender: BusSender) -> Supervisor {
Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()), bus : (bus_reciever, bus_sender) } Supervisor {
prcs: LinkedList::new(),
files: LinkedList::new(),
services: LinkedList::new(),
config: Arc::new(Processes::default()),
bus: (bus_reciever, bus_sender),
}
} }
pub async fn with_config(mut self, config: Processes) -> Supervisor { pub async fn with_config(mut self, config: Processes) -> Supervisor {
self.config = Arc::from(config); self.config = Arc::from(config);
let _ = self.config.processes.iter() let _ = self.config.processes.iter().for_each(|prc| {
.for_each(|prc| {
let (rx, tx) = mpsc::channel::<Events>(10); let (rx, tx) = mpsc::channel::<Events>(10);
let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path);
if !self.prcs.contains(&temp) { if !self.prcs.contains(&temp) {
@ -65,15 +76,16 @@ pub mod v2 {
let rx = Arc::new(rx); let rx = Arc::new(rx);
let proc_name: Arc<str> = Arc::from(prc.name.clone()); let proc_name: Arc<str> = Arc::from(prc.name.clone());
let _ = prc.dependencies.files.iter() let _ = prc.dependencies.files.iter().for_each(|file| {
.for_each(|file| {
let mut hm = HashMap::new(); let mut hm = HashMap::new();
let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())}; let triggers = FileTriggersForController {
on_change: Arc::from(file.triggers.on_change.clone()),
on_delete: Arc::from(file.triggers.on_delete.clone()),
};
hm.insert(proc_name.clone(), (triggers, rx.clone())); hm.insert(proc_name.clone(), (triggers, rx.clone()));
let tempfile = FilesController::new(&file.filename.as_str(), hm) let tempfile =
.with_path(&file.src); FilesController::new(&file.filename.as_str(), hm).with_path(&file.src);
if let Ok(file) = tempfile { if let Ok(file) = tempfile {
if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) { if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) {
@ -85,15 +97,13 @@ pub mod v2 {
}); });
// servs // servs
let _ = prc.dependencies.services.iter() let _ = prc.dependencies.services.iter().for_each(|serv| {
.for_each(|serv| { let access_url =
let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref()); ServicesController::get_access_url(&serv.hostname, serv.port.as_ref());
// preparations // preparations
let rx = rx.clone(); let rx = rx.clone();
let serv_cont = ServicesController::new().with_access_name( let serv_cont =
&serv.hostname, ServicesController::new().with_access_name(&serv.hostname, &access_url);
&access_url
);
// triggers // triggers
let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone()); let arc: Arc<str> = Arc::from(serv.triggers.on_lost.clone());
let triggers = Triggers::new_service(arc, serv.triggers.wait); let triggers = Triggers::new_service(arc, serv.triggers.wait);
@ -105,7 +115,8 @@ pub mod v2 {
let mut vec: VecDeque<Arc<str>> = VecDeque::new(); let mut vec: VecDeque<Arc<str>> = VecDeque::new();
vec.push_back(proc_name.clone()); vec.push_back(proc_name.clone());
// connection_queue // connection_queue
let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> = BTreeMap::new(); let mut connection_queue: BTreeMap<u32, VecDeque<Arc<str>>> =
BTreeMap::new();
connection_queue.insert(serv.triggers.wait, vec); connection_queue.insert(serv.triggers.wait, vec);
// event_reg // event_reg
let mut hm = HashMap::new(); let mut hm = HashMap::new();
@ -119,7 +130,84 @@ pub mod v2 {
self self
} }
pub fn get_stats(&self) -> String { pub fn get_stats(&self) -> String {
format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len()) format!(
"processes: {}, files: {}, services: {}",
self.prcs.len(),
self.files.len(),
self.services.len()
)
}
pub async fn extract_extended_procs(
config : Arc<Processes>,
prcs_list : &LinkedList<ProcessesController>,
files_list : &LinkedList<FilesController>,
servs_list : &LinkedList<ServicesController>,
) -> Vec<ProcessesAll> {
let mut procs = Vec::new();
for prc in config.processes.iter() {
if let Some(prc_cont) = prcs_list
.iter()
.find(|&prc_cont| prc.name == *prc_cont.name)
{
let mut vec_files = Vec::new();
let mut vec_services = Vec::new();
prc.dependencies
.files
.iter()
.map(|file| (file, format!("{}{}", file.filename, file.src)))
.for_each(|(file, code_name)| {
if let Some(file_cont) = files_list
.iter()
.find(|&file_cont| *file_cont.get_code_name() == code_name)
{
vec_files.push(FilesExtended {
name: file.filename.to_string(),
path: file.src.to_string(),
status: file_cont.get_state(),
triggers: file.triggers.to_owned(),
});
}
});
prc.dependencies
.services
.iter()
.map(|serv| {
(
serv,
format!("{}{}", serv.hostname, {
if let Some(port) = serv.port {
format!(":{}", port)
} else {
String::new()
}
}),
)
})
.for_each(|(serv, acces_url)| {
if let Some(serv_cont) = servs_list
.iter()
.find(|&serv_cont| *serv_cont.get_arc_access_url() == acces_url)
{
vec_services.push(ServicesExtended {
name: serv.hostname.to_owned(),
access_name: (*serv_cont.get_arc_access_url()).to_owned(),
status: serv_cont.get_state(),
triggers: serv.triggers.to_owned(),
});
}
});
procs.push(ProcessesAll {
name: prc_cont.name.clone().to_string(),
state: prc_cont.get_state(),
pid: prc_cont.get_pid(),
dependencies: Dependencies {
files: vec_files,
services: vec_services,
},
});
}
}
procs
} }
} }
@ -130,37 +218,37 @@ pub mod v2 {
loop { loop {
// //
let rec = &mut self.bus.0; let rec = &mut self.bus.0;
while let Ok(request) = rec.try_recv(){ while let Ok(request) = rec.try_recv() {
if let BusMessage::Request(_, _, cont) = request { if let BusMessage::Request(_, _, cont) = request {
let cont: Box<dyn Any + Send> = cont; let cont: Box<dyn Any + Send> = cont;
if let Ok(cli) = cont.downcast::<InternalCli>() { match cont.downcast::<InternalCli>() {
Ok(cli) => {
let mut count = 0; let mut count = 0;
let fut = (&mut self.prcs).into_iter() let fut = (&mut self.prcs)
.into_iter()
.find(|prc| prc.name == Arc::from(cli.prc.as_ref())) .find(|prc| prc.name == Arc::from(cli.prc.as_ref()))
.map(|prc| async { .map(|prc| async {
let count = &mut count; let count = &mut count;
*count += 1; *count += 1;
let res = match cli.cmd { let res = match cli.cmd {
CLiCommand::Start => { CLiCommand::Start => prc.start_by_user_call().await,
prc.start_by_user_call().await CLiCommand::Stop => prc.stop_by_user_call().await,
}, CLiCommand::Restart => prc.restart_by_user_call().await,
CLiCommand::Stop => { CLiCommand::Freeze => prc.freeze_by_user_call().await,
prc.stop_by_user_call().await
},
CLiCommand::Restart => {
prc.restart_by_user_call().await
},
CLiCommand::Freeze => {
prc.freeze_by_user_call().await
},
CLiCommand::Unfreeze => { CLiCommand::Unfreeze => {
prc.unfreeze_by_user_call().await prc.unfreeze_by_user_call().await
}, }
}; };
let sender = self.bus.1.clone(); let sender = self.bus.1.clone();
let resp_content = match res { let resp_content = match res {
Ok(_) => Ok(format!("Ok on user call abour process {}", prc.name)), Ok(_) => Ok(format!(
Err(er) => Err(anyhow::Error::msg(format!("Error: User call for process {} failed : {}", prc.name, er))), "Ok on user call abour process {}",
prc.name
)),
Err(er) => Err(anyhow::Error::msg(format!(
"Error: User call for process {} failed : {}",
prc.name, er
))),
}; };
let _ = sender.send(BusMessage::Response( let _ = sender.send(BusMessage::Response(
crate::options::structs::bus::BusMessageDirection::ToCli, crate::options::structs::bus::BusMessageDirection::ToCli,
@ -171,8 +259,7 @@ pub mod v2 {
}); });
if let Some(fut) = fut { if let Some(fut) = fut {
fut.await; fut.await;
} } else {
else {
let _ = self.bus.1.clone().send(BusMessage::Response( let _ = self.bus.1.clone().send(BusMessage::Response(
crate::options::structs::bus::BusMessageDirection::ToCli, crate::options::structs::bus::BusMessageDirection::ToCli,
BusMessageContentType::RawString, BusMessageContentType::RawString,
@ -182,33 +269,70 @@ pub mod v2 {
)).await; )).await;
} }
} }
// TODO: GET PRCS METRICS DOWNCASTING Err(boxed) => {
if let Ok(query) = boxed.downcast::<ProcessesQuery>() {
match *query {
ProcessesQuery::QueryAll => {
let procs = Self::extract_extended_procs(
self.config.clone(),
&self.prcs,
&self.files,
&self.services,
).await;
let _ = self.bus.1.clone().send(BusMessage::Response(
crate::options::structs::bus::BusMessageDirection::ToMetrics,
BusMessageContentType::ProcessQuery,
Box::new(
ProcessesQuery::All(procs)
)
)).await;
}
ProcessesQuery::QueryGeneral => {
let mut vec = Vec::new();
for prc in &self.prcs {
vec.push(prc.get_general_info().await);
}
let _ = self.bus.1.clone().send(BusMessage::Response(
crate::options::structs::bus::BusMessageDirection::ToMetrics,
BusMessageContentType::ProcessQuery,
Box::new(
ProcessesQuery::General(vec)
)
)).await;
}
_ => {
let _ = self.bus.1.clone().send(BusMessage::Response(
crate::options::structs::bus::BusMessageDirection::ToCli,
BusMessageContentType::RawString,
Box::new(
Err(anyhow::Error::msg("Unknown request format was send to the Supervisor"))
)
)).await;
}
}
}
}
}
} }
} }
let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![]; let mut tasks: Vec<tokio::task::JoinHandle<ControllerResult>> = vec![];
if let Some(mut val) = self.prcs.pop_front() { if let Some(mut val) = self.prcs.pop_front() {
tasks.push( tasks.push(tokio::spawn(async move {
tokio::spawn( async move {
val.process().await; val.process().await;
ControllerResult::Process(Some(val)) ControllerResult::Process(Some(val))
}) }));
);
} }
if let Some(mut val) = self.files.pop_front() { if let Some(mut val) = self.files.pop_front() {
tasks.push( tasks.push(tokio::spawn(async move {
tokio::spawn( async move {
val.process().await; val.process().await;
ControllerResult::File(Some(val)) ControllerResult::File(Some(val))
}) }));
);
} }
if let Some(mut val) = self.services.pop_front() { if let Some(mut val) = self.services.pop_front() {
tasks.push( tasks.push(tokio::spawn(async move {
tokio::spawn( async move {
val.process().await; val.process().await;
ControllerResult::Service(Some(val)) ControllerResult::Service(Some(val))
}) }));
);
} }
for task in tasks { for task in tasks {
match task.await { match task.await {
@ -226,10 +350,12 @@ pub mod v2 {
pub async fn init_monitoring( pub async fn init_monitoring(
config: Processes, config: Processes,
bus_reciever : BusReciever, bus_reciever: BusReciever,
bus_sender : BusSender, bus_sender: BusSender,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut supervisor = Supervisor::new(bus_reciever, bus_sender).with_config(config).await; let mut supervisor = Supervisor::new(bus_reciever, bus_sender)
.with_config(config)
.await;
info!("Monitoring: {} ", &supervisor.get_stats()); info!("Monitoring: {} ", &supervisor.get_stats());
supervisor.process().await; supervisor.process().await;
Ok(()) Ok(())
@ -260,7 +386,7 @@ pub fn get_container_id() -> Option<String> {
if id.is_empty() { if id.is_empty() {
return None; return None;
} }
Some(String::from_utf8_lossy(&output.stdout).to_string()) Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
} }
Err(_) => None, Err(_) => None,
} }

View File

@ -1,29 +1,33 @@
use std::sync::Arc; use std::sync::Arc;
use crate::options::structs::bus::{BusMessageDirection, BusMessage}; use crate::options::structs::bus::{BusMessage, BusMessageDirection};
use crate::options::structs::ProcessUnit; use crate::options::structs::ProcessUnit;
use tokio::sync::mpsc::{Sender, Receiver}; use log::{debug, error, trace};
use log::{trace, debug, error}; use tokio::sync::mpsc::{Receiver, Sender};
type Inner = Receiver<BusMessage>; type Inner = Receiver<BusMessage>;
type Outter = Arc<Sender<BusMessage>>; type Outter = Arc<Sender<BusMessage>>;
#[derive(Debug )] #[derive(Debug)]
pub struct Highway { pub struct Highway {
to_cli : Outter, to_cli: Outter,
to_supervisor : Outter, to_supervisor: Outter,
to_metrics : Outter, to_metrics: Outter,
} }
impl Highway { impl Highway {
fn new(to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self { fn new(to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self {
Self { to_cli, to_supervisor, to_metrics } Self {
to_cli,
to_supervisor,
to_metrics,
}
} }
async fn send(&self, msg: BusMessage) -> anyhow::Result<()> { async fn send(&self, msg: BusMessage) -> anyhow::Result<()> {
let dir = match &msg { let dir = match &msg {
BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => { BusMessage::Request(dir, ..) | BusMessage::Response(dir, ..) => {
trace!("redirecting message to {:?} ...", dir); trace!("redirecting message to {:?} ...", dir);
dir dir
}, }
}; };
match dir { match dir {
BusMessageDirection::ToCli => self.send_cli(msg).await, BusMessageDirection::ToCli => self.send_cli(msg).await,
@ -46,13 +50,16 @@ impl Highway {
} }
pub struct Bus { pub struct Bus {
inner : Inner, inner: Inner,
highway : Highway, highway: Highway,
} }
impl Bus { impl Bus {
pub fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self { pub fn new(inner: Inner, to_cli: Outter, to_supervisor: Outter, to_metrics: Outter) -> Self {
Self { inner, highway: Highway::new(to_cli, to_supervisor, to_metrics) } Self {
inner,
highway: Highway::new(to_cli, to_supervisor, to_metrics),
}
} }
} }
@ -64,13 +71,21 @@ impl ProcessUnit for Bus {
// debug!("new message to the Bus : {:?}", &content); // debug!("new message to the Bus : {:?}", &content);
let msg = match content { let msg = match content {
BusMessage::Request(direction, content_type, content) => { BusMessage::Request(direction, content_type, content) => {
trace!("bus has got a new Request with direction {:?} and type {:?}", &direction, &content_type); trace!(
"bus has got a new Request with direction {:?} and type {:?}",
&direction,
&content_type
);
BusMessage::Request(direction, content_type, content) BusMessage::Request(direction, content_type, content)
}, }
BusMessage::Response(direction, content_type, content) => { BusMessage::Response(direction, content_type, content) => {
trace!("bus has got a new Response with direction {:?} and type {:?}", &direction, &content_type); trace!(
"bus has got a new Response with direction {:?} and type {:?}",
&direction,
&content_type
);
BusMessage::Response(direction, content_type, content) BusMessage::Response(direction, content_type, content)
}, }
}; };
if let Err(er) = self.highway.send(msg).await { if let Err(er) = self.highway.send(msg).await {
error!("Cannot redirect message : {}", er); error!("Cannot redirect message : {}", er);

View File

@ -1,34 +1,37 @@
use crate::options::structs::CustomError; use crate::options::structs::CustomError;
use crate::options::structs::Events;
use async_trait::async_trait;
use inotify::{EventMask, Inotify, WatchMask}; use inotify::{EventMask, Inotify, WatchMask};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender as Sender; use tokio::sync::mpsc::Sender;
use crate::options::structs::Events;
use async_trait::async_trait;
pub mod v2 { pub mod v2 {
use log::{error, info, warn};
use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit};
use super::*; use super::*;
use crate::options::structs::{
FileTriggerType, FileTriggersForController as Triggers, ProcessUnit,
};
use log::{error, info, warn};
use serde::Serialize;
use std::{collections::HashMap, path::Path}; use std::{collections::HashMap, path::Path};
type MpscSender = Arc<Sender<Events>>; type MpscSender = Arc<Sender<Events>>;
type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>; type EventHandlers = HashMap<Arc<str>, (Triggers, MpscSender)>;
#[derive(Debug)] #[derive(Debug, Serialize, Clone, Copy)]
enum FileState { pub enum FileState {
Ok, Ok,
NotFound, NotFound,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct FilesController { pub struct FilesController {
name : Arc<str>, name: Arc<str>,
path : String, path: String,
code_name : Arc<str>, code_name: Arc<str>,
state : FileState, state: FileState,
watcher : Option<Inotify>, watcher: Option<Inotify>,
triggers : EventHandlers, triggers: EventHandlers,
} }
impl PartialEq for FilesController { impl PartialEq for FilesController {
@ -42,12 +45,12 @@ pub mod v2 {
pub fn new(name: &str, triggers: EventHandlers) -> FilesController { pub fn new(name: &str, triggers: EventHandlers) -> FilesController {
let name: Arc<str> = Arc::from(name); let name: Arc<str> = Arc::from(name);
Self { Self {
name : name.clone(), name: name.clone(),
path : String::new(), path: String::new(),
state : FileState::Ok, state: FileState::Ok,
watcher : None, watcher: None,
triggers, triggers,
code_name : name.clone(), code_name: name.clone(),
} }
} }
#[inline(always)] #[inline(always)]
@ -57,15 +60,18 @@ pub mod v2 {
match create_watcher(&self.name, &self.path) { match create_watcher(&self.name, &self.path) {
Ok(val) => Some(val), Ok(val) => Some(val),
Err(er) => { Err(er) => {
error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); error!(
return Err(er) "Cannot create watcher for {} ({}) due to {}",
self.name, &self.path, er
);
return Err(er);
} }
} }
}; };
self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name)); self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name));
Ok(self) Ok(self)
} }
pub fn add_event(&mut self, file_controller : FilesController) { pub fn add_event(&mut self, file_controller: FilesController) {
for (k, v) in file_controller.triggers { for (k, v) in file_controller.triggers {
self.triggers.entry(k).or_insert(v); self.triggers.entry(k).or_insert(v);
} }
@ -73,24 +79,34 @@ pub mod v2 {
async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) { async fn trigger_on(&mut self, trigger_type: Option<FileTriggerType>) {
for (prc_name, (triggers, channel)) in &self.triggers { for (prc_name, (triggers, channel)) in &self.triggers {
let msg = match &trigger_type { let msg = match &trigger_type {
None => { None => Events::Positive(self.code_name.clone()),
Events::Positive(self.code_name.clone())
},
Some(event) => { Some(event) => {
info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name); info!(
"Event on file {} ({}) : {}. Notifying `{}` ...",
&self.name, &self.path, event, &prc_name
);
event.event_from_file_trigger_controller(self.code_name.clone(), &triggers) event.event_from_file_trigger_controller(self.code_name.clone(), &triggers)
}, }
}; };
let _ = channel.send(msg).await; let _ = channel.send(msg).await;
} }
} }
pub fn get_state(&self) -> FileState {
self.state
}
pub fn get_code_name(&self) -> Arc<str> {
self.code_name.clone()
}
} }
#[async_trait] #[async_trait]
impl ProcessUnit for FilesController { impl ProcessUnit for FilesController {
async fn process(&mut self) { async fn process(&mut self) {
if let Ok(_) = check_file(&self.name, &self.path).await { if let Ok(_) = check_file(&self.name, &self.path).await {
if let FileState::NotFound = self.state { if let FileState::NotFound = self.state {
info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name); info!(
"File {} ({}) was found in determined scope. Notifying ...",
self.name, self.code_name
);
self.state = FileState::Ok; self.state = FileState::Ok;
self.trigger_on(None).await; self.trigger_on(None).await;
} }
@ -98,7 +114,8 @@ pub mod v2 {
Some(notify) => { Some(notify) => {
let mut buffer = [0; 128]; let mut buffer = [0; 128];
if let Ok(notif_events) = notify.read_events(&mut buffer) { if let Ok(notif_events) = notify.read_events(&mut buffer) {
let (need_to_recreate, was_modifired) = notif_events.fold((false, false), |(a, b), mask| { let (need_to_recreate, was_modifired) =
notif_events.fold((false, false), |(a, b), mask| {
( (
a || mask.mask == EventMask::DELETE_SELF, a || mask.mask == EventMask::DELETE_SELF,
b || mask.mask == EventMask::MODIFY, b || mask.mask == EventMask::MODIFY,
@ -110,25 +127,27 @@ pub mod v2 {
self.watcher = match create_watcher(&self.name, &self.path) { self.watcher = match create_watcher(&self.name, &self.path) {
Ok(notifier) => Some(notifier), Ok(notifier) => Some(notifier),
Err(er) => { Err(er) => {
error!("Failed to recreate watcher for {} ({}) due to {}", error!(
self.name, "Failed to recreate watcher for {} ({}) due to {}",
&self.path, self.name, &self.path, er
er
); );
None None
}, }
} }
} }
self.trigger_on(Some(FileTriggerType::OnChange)).await; self.trigger_on(Some(FileTriggerType::OnChange)).await;
return; return;
} }
} }
}, }
None => { /* DEAD END */}, None => { /* DEAD END */ }
} }
} else { } else {
if let FileState::Ok = self.state { if let FileState::Ok = self.state {
warn!("File {} ({}) was not found in determined scope", self.name, &self.path); warn!(
"File {} ({}) was not found in determined scope",
self.name, &self.path
);
self.state = FileState::NotFound; self.state = FileState::NotFound;
self.trigger_on(Some(FileTriggerType::OnDelete)).await; self.trigger_on(Some(FileTriggerType::OnDelete)).await;
} }
@ -139,40 +158,40 @@ pub mod v2 {
} }
} }
/// # Fn `create_watcher` /// # Fn `create_watcher`
/// ## for creating watcher on file's delete | update events /// ## for creating watcher on file's delete | update events
/// ///
/// *input* : `&str`, `&str` /// *input* : `&str`, `&str`
/// ///
/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction /// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction
/// ///
/// *initiator* : fn `file_handler`, fn `utils::run_daemons` /// *initiator* : fn `file_handler`, fn `utils::run_daemons`
/// ///
/// *managing* : current file's name: &str, path in local storage to current file: &str /// *managing* : current file's name: &str, path in local storage to current file: &str
/// ///
/// *depends on* : - /// *depends on* : -
/// ///
pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> { pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result<Inotify> {
let src = format!("{}{}", path, filename); let src = format!("{}{}", path, filename);
let inotify: Inotify = Inotify::init()?; let inotify: Inotify = Inotify::init()?;
inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; inotify.watches().add(&src, WatchMask::ALL_EVENTS)?;
Ok(inotify) Ok(inotify)
} }
/// # Fn `check_file` /// # Fn `check_file`
/// ## for checking existance of current file /// ## for checking existance of current file
/// ///
/// *input* : `&str`, `&str` /// *input* : `&str`, `&str`
/// ///
/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error /// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error
/// ///
/// *initiator* : fn `file_handler` /// *initiator* : fn `file_handler`
/// ///
/// *managing* : current file's name: `&str` and current file's path in local storage: `&str` /// *managing* : current file's name: `&str` and current file's path in local storage: `&str`
/// ///
/// *depends on* : network activity /// *depends on* : network activity
/// ///
pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let arc_name = Arc::new(filename.to_string()); let arc_name = Arc::new(filename.to_string());
let arc_path = Arc::new(path.to_string()); let arc_path = Arc::new(path.to_string());
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
@ -188,10 +207,10 @@ pub mod v2 {
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
panic!("Corrupted while file check process"); panic!("Corrupted while file check process");
}) })
} }
#[cfg(test)] #[cfg(test)]
mod files_unittests { mod files_unittests {
use super::*; use super::*;
#[tokio::test] #[tokio::test]
async fn try_to_create_watcher() { async fn try_to_create_watcher() {
@ -213,4 +232,4 @@ pub mod v2 {
let res = check_file("invalid-file", "/path/to/the/no/dir").await; let res = check_file("invalid-file", "/path/to/the/no/dir").await;
assert!(res.is_err()); assert!(res.is_err());
} }
} }

View File

@ -1,8 +1,8 @@
// //
// module needed to check host-agent health condition and to communicate with it // module needed to check host-agent health condition and to communicate with it
// //
use anyhow::{Error, Ok, Result};
use tokio::{io::Interest, net::UnixStream}; use tokio::{io::Interest, net::UnixStream};
use anyhow::{Ok, Result, Error};
// to kill lint bug // to kill lint bug
#[allow(unused_imports)] #[allow(unused_imports)]
use tokio::net::UnixListener; use tokio::net::UnixListener;
@ -61,7 +61,7 @@ async fn ha_healthcheck(socket: &UnixStream) -> Result<(), Error> {
/// *depends on* : - /// *depends on* : -
/// ///
#[allow(dead_code)] #[allow(dead_code)]
async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), Error > { async fn ha_send_data(socket: &UnixStream, data: &str) -> Result<(), Error> {
socket.ready(Interest::WRITABLE).await?; socket.ready(Interest::WRITABLE).await?;
socket.writable().await?; socket.writable().await?;
socket.try_write(data.as_bytes())?; socket.try_write(data.as_bytes())?;
@ -91,7 +91,7 @@ mod hagent_unittets {
// --Result<maybe Response> // --Result<maybe Response>
// one-shot func // one-shot func
async fn hagent_communication_test() { async fn hagent_communication_test() {
use crate::options::structs::{ProcessMetrics, ContainerMetrics, Metrics}; use crate::options::structs::{ContainerMetrics, Metrics, ProcessMetrics};
let procm = ProcessMetrics::new("test-prc", 15.0, 5.0); let procm = ProcessMetrics::new("test-prc", 15.0, 5.0);
let contm = ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]); let contm = ContainerMetrics::new("test", 32.0, 12.0, vec![procm.process_name.clone()]);
@ -105,10 +105,11 @@ mod hagent_unittets {
let sock = sock.unwrap(); let sock = sock.unwrap();
assert!(ha_healthcheck(&sock).await.is_ok()); assert!(ha_healthcheck(&sock).await.is_ok());
assert!(ha_send_data(&sock, &metrics).await.is_ok()); assert!(ha_send_data(&sock, &metrics).await.is_ok());
} }
#[tokio::test] #[tokio::test]
async fn open_unixsocket_test() { async fn open_unixsocket_test() {
assert!(open_unix_socket("non/valid/socket/file.sock").await.is_err()); assert!(open_unix_socket("non/valid/socket/file.sock")
.await
.is_err());
} }
} }

View File

@ -1,17 +1,21 @@
// submodule needed to get metrics such as // submodule needed to get metrics such as
// cpu load, ram/rom load and net activity // cpu load, ram/rom load and net activity
use std::{any::Any, collections::BTreeMap, sync::Arc}; use crate::{
use crate::options::structs::{ProcessState, TrackingProcess}; options::structs::{ProcessState, TrackingProcess},
utils::metrics::processes::{ProcessesGeneral, ProcessesQuery},
};
use futures::lock::Mutex;
use log::warn; use log::warn;
use noxis_cli::metrics_models::MetricsMode; use noxis_cli::metrics_models::MetricsMode;
use std::{any::Any, collections::BTreeMap, sync::Arc};
// use chrono::Duration; // use chrono::Duration;
use sysinfo::{System, Disks as DisksList, Networks}; use super::prcs::v2::Pid;
use crate::options::structs::bus::{BusMessage, BusMessageContentType, BusMessageDirection};
use crate::options::structs::Dependencies; use crate::options::structs::Dependencies;
use serde::Serialize; use serde::Serialize;
use super::prcs::v2::Pid;
use std::fmt::Debug; use std::fmt::Debug;
use crate::options::structs::bus::{BusMessage, BusMessageDirection, BusMessageContentType}; use sysinfo::{Disks as DisksList, Networks, System};
// use noxis_cli::metrics_models::MetricsMode; // use noxis_cli::metrics_models::MetricsMode;
pub type MetricProcesses = Vec<ProcessExtended>; pub type MetricProcesses = Vec<ProcessExtended>;
@ -36,158 +40,286 @@ type BusSender = Arc<tokio::sync::mpsc::Sender<BusMessage>>;
/// ///
pub async fn init_metrics_grubber( pub async fn init_metrics_grubber(
/* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */
bus_sender : BusSender, bus_sender: BusSender,
bus_recirever : BusReciever, bus_reciever: BusReciever,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut system = System::new(); let mut system = System::new();
let mut disks = DisksList::new_with_refreshed_list();
let mut networks = Networks::new_with_refreshed_list();
// get_all_metrics(&mut system).await; // get_all_metrics(&mut system).await;
/* TODO */ /* TODO */
let mut bus_recirever = bus_recirever; let mut bus_reciever = bus_reciever;
loop { loop {
if let Ok(BusMessage::Request(_, _, cont)) = bus_recirever.try_recv() { if let Ok(BusMessage::Request(_, _, cont)) = bus_reciever.try_recv() {
system.refresh_all();
disks.refresh_list();
networks.refresh_list();
let cont: Box<dyn Any + Send> = cont; let cont: Box<dyn Any + Send> = cont;
match cont.downcast::<MetricsMode>() { match cont.downcast::<MetricsMode>() {
Err(_) => { Err(_) => {
warn!("Unrecognized Metric mode was given"); warn!("Unrecognized Metric mode was given");
let _ = bus_sender.send(BusMessage::Response( let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli, BusMessageDirection::ToCli,
BusMessageContentType::Result, BusMessageContentType::Result,
Box::new( Box::new(Err(anyhow::Error::msg(format!(
Err(anyhow::Error::msg(format!("Unrecognized Metric mode was given")) "Unrecognized Metric mode was given"
)))),
)) ))
)).await; .await;
} }
Ok(mode) => { Ok(mode) => {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let metric: Box<dyn MetricsExportable> = match *mode { let metric: Box<dyn MetricsExportable> = match *mode {
MetricsMode::Full => Box::new(get_all_metrics(&mut system).await), MetricsMode::Full => {
let mut refs =
get_all_metrics(&mut system, bus_sender.clone(), &disks, &networks)
.await;
if let Some(prcs) = bus_reciever.recv().await {
if let BusMessage::Response(_, _, cont) = prcs {
let cont: Box<dyn Any> = cont;
if let Ok(cont) = cont.downcast::<ProcessesQuery>() {
if let ProcessesQuery::General(info) = *cont {
refs.processes = info;
}
}
}
}
Box::new(refs)
}
MetricsMode::Host => {
Box::new(get_global_host_info(&mut system, &disks, &networks).await)
}
MetricsMode::Cpu => Box::new(get_cpu_metrics(&mut system).await), MetricsMode::Cpu => Box::new(get_cpu_metrics(&mut system).await),
MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await), MetricsMode::Ram => Box::new(get_ram_metrics(&mut system).await),
MetricsMode::Rom => Box::new(get_all_disks_metrics().await), MetricsMode::Rom => Box::new(get_all_disks_metrics(&disks).await),
MetricsMode::Network => Box::new(get_all_ifaces_metrics().await), MetricsMode::Network => Box::new(get_all_ifaces_metrics(&networks).await),
// MetricsMode::Processes => {}, // MetricsMode::Processes => {},
// TODO ->
_ => todo!(), _ => todo!(),
}; };
// let metric: Box<dyn BusContent> = Box::new(metric); // let metric: Box<dyn BusContent> = Box::new(metric);
let metric = metric.serialze_into_output(); let metric = metric.serialze_into_output();
let _ = bus_sender.send(BusMessage::Response( let _ = bus_sender
.send(BusMessage::Response(
BusMessageDirection::ToCli, BusMessageDirection::ToCli,
BusMessageContentType::MetricsObj, BusMessageContentType::MetricsObj,
Box::new(metric) Box::new(metric),
)).await; ))
}, .await;
} }
} }
}
// TODO else if response in metrics
// else if let Response ....
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
} }
} }
async fn get_all_metrics(system: &mut System) -> FullMetrics { async fn get_all_metrics(
system.refresh_all(); system: &mut System,
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; sender: BusSender,
disks: &DisksList,
networks: &Networks,
) -> FullMetrics {
let host = get_host_info().await; let host = get_host_info().await;
let cpu = get_cpu_metrics(system).await; let cpu = get_cpu_metrics(system).await;
let ram = get_ram_metrics(system).await; let ram = get_ram_metrics(system).await;
let disks = get_all_disks_metrics().await; let disks = get_all_disks_metrics(&disks).await;
let ifaces = get_all_ifaces_metrics().await; let ifaces = get_all_ifaces_metrics(&networks).await;
let prcs = MetricProcesses::new(); let prcs: Vec<ProcessesGeneral> = Vec::new();
let _ = sender
.send(BusMessage::Request(
BusMessageDirection::ToSupervisor,
BusMessageContentType::ProcessQuery,
Box::new(ProcessesQuery::QueryGeneral),
))
.await;
FullMetrics::create(host, cpu, ram, disks, ifaces, prcs) FullMetrics::create(host, cpu, ram, disks, ifaces, prcs)
} }
async fn get_global_host_info(
system: &mut System,
disks: &DisksList,
networks: &Networks,
) -> HostGeneral {
HostGeneral {
hostname: System::host_name().unwrap_or_default(),
os: System::long_os_version().unwrap_or_default(),
kernel: System::kernel_version().unwrap_or_default(),
cpu_percentage: system.global_cpu_usage(),
ram_available: system.total_memory() - system.free_memory(),
disk_percentage: {
let total = disks
.iter()
.map(|disk| disk.available_space() * 100 / disk.total_space())
.collect::<Vec<u64>>();
total.iter().sum::<u64>() / (total.len() as u64)
},
net_stat: {
let total = networks
.iter()
.map(|(_, iface_data)| iface_data.received() + iface_data.transmitted())
.collect::<Vec<u64>>();
total.iter().sum::<u64>() / ((total.len() * 2) as u64)
},
}
}
async fn get_host_info() -> HostInfo { async fn get_host_info() -> HostInfo {
HostInfo { HostInfo {
hostname : System::host_name().unwrap_or_default(), hostname: System::host_name().unwrap_or_default(),
os : System::long_os_version().unwrap_or_default(), os: System::long_os_version().unwrap_or_default(),
kernel : System::kernel_version().unwrap_or_default(), kernel: System::kernel_version().unwrap_or_default(),
} }
} }
async fn get_cpu_metrics(system: &mut System) -> Cpu { async fn get_cpu_metrics(system: &mut System) -> Cpu {
system.refresh_cpu_all();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let mut buffer = CoreUsage::new(); let mut buffer = CoreUsage::new();
let global_usage = system.global_cpu_usage(); let global_usage = system.global_cpu_usage();
system.cpus() system.cpus().iter().enumerate().for_each(|(id, cpu)| {
.iter()
.enumerate()
.for_each(|(id, cpu)| {
let core_info = CoreInfo { let core_info = CoreInfo {
// id, // id,
brand : cpu.brand().to_string(), brand: cpu.brand().to_string(),
name : cpu.name().to_string(), name: cpu.name().to_string(),
frequency : cpu.frequency(), frequency: cpu.frequency(),
vendor_id : cpu.vendor_id().to_string(), vendor_id: cpu.vendor_id().to_string(),
usage : cpu.cpu_usage(), usage: cpu.cpu_usage(),
}; };
// buffer.push(core_info);
buffer.entry(id).or_insert(core_info); buffer.entry(id).or_insert(core_info);
}); });
Cpu { Cpu {
global_usage, global_usage,
usage: buffer usage: buffer,
} }
} }
async fn get_ram_metrics(system: &mut System) -> Ram { async fn get_ram_metrics(system: &mut System) -> Ram {
system.refresh_memory();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
Ram { Ram {
free_mem : system.free_memory(), free_mem: system.free_memory(),
free_swap : system.free_swap(), free_swap: system.free_swap(),
total_mem : system.total_memory(), total_mem: system.total_memory(),
total_swap : system.total_swap(), total_swap: system.total_swap(),
} }
} }
async fn get_all_disks_metrics() -> Disks { async fn get_all_disks_metrics(disks: &DisksList) -> Disks {
let disks = DisksList::new_with_refreshed_list(); // let disks = DisksList::new_with_refreshed_list();
let mut buffer = Disks::new(); let mut buffer = Disks::new();
disks.list() disks.list().iter().for_each(|disk| {
.iter()
.for_each(|disk| {
let disk = Disk { let disk = Disk {
name : disk.name().to_string_lossy().into_owned(), name: disk.name().to_string_lossy().into_owned(),
kind: disk.kind().to_string(), kind: disk.kind().to_string(),
fs : disk.file_system().to_string_lossy().into_owned(), fs: disk.file_system().to_string_lossy().into_owned(),
mount_point : disk.mount_point().to_string_lossy().into_owned(), mount_point: disk.mount_point().to_string_lossy().into_owned(),
total_space : disk.total_space(), total_space: disk.total_space(),
available_space : disk.available_space(), available_space: disk.available_space(),
is_removable : disk.is_removable(), is_removable: disk.is_removable(),
is_readonly : disk.is_read_only() is_readonly: disk.is_read_only(),
}; };
buffer.push(disk); buffer.push(disk);
}); });
buffer buffer
} }
async fn get_all_ifaces_metrics() -> Ifaces { async fn get_all_ifaces_metrics(networks: &Networks) -> Ifaces {
let mut ifaces = Ifaces::new(); let mut ifaces = Ifaces::new();
let networks = Networks::new_with_refreshed_list(); networks.iter().for_each(|(iface_name, data)| {
networks.iter()
.for_each(|(iface_name, data)| {
let mac = data.mac_address().to_string(); let mac = data.mac_address().to_string();
let ip_addrs = data
.ip_networks()
.iter()
.map(|ipaddr| format!("{}/{}", ipaddr.addr, ipaddr.prefix))
.collect::<Vec<String>>();
let iface = Network { let iface = Network {
iname : iface_name.to_owned(), iname: iface_name.to_owned(),
mac : mac, mac: mac,
recieved : data.received(), ip_addresses: ip_addrs,
transmitted : data.transmitted(), recieved: data.received(),
total_recieved_bytes : data.total_received(), transmitted: data.transmitted(),
total_transmitted_bytes : data.total_transmitted(), total_recieved_bytes: data.total_received(),
total_recieved_packets : data.total_packets_received(), total_transmitted_bytes: data.total_transmitted(),
total_transmitted_packets : data.total_packets_transmitted(), total_recieved_packets: data.total_packets_received(),
errors_on_recieved : data.errors_on_received(), total_transmitted_packets: data.total_packets_transmitted(),
errors_on_transmitted : data.errors_on_transmitted(), errors_on_recieved: data.errors_on_received(),
errors_on_transmitted: data.errors_on_transmitted(),
}; };
ifaces.push(iface); ifaces.push(iface);
}); });
ifaces ifaces
} }
async fn get_all_processes_metrics(system: &mut System) { /* TODO */} async fn get_all_processes_metrics(system: &mut System) { /* TODO */
}
pub mod processes {
use crate::options::structs::ProcessState;
use crate::utils::prcs::v2::Pid;
#[derive(Debug, serde::Serialize)]
pub enum ProcessesQuery {
General(Vec<ProcessesGeneral>),
All(Vec<ProcessesAll>),
QueryGeneral,
QueryAll,
}
impl ProcessesQuery {
pub fn serialze_to_bus(&self) -> Option<String> {
match self {
ProcessesQuery::General(prc) => serde_json::to_string_pretty(prc).ok(),
ProcessesQuery::All(prc) => serde_json::to_string_pretty(prc).ok(),
ProcessesQuery::QueryGeneral => None,
ProcessesQuery::QueryAll => None,
}
}
}
#[derive(Debug, serde::Serialize)]
pub struct ProcessesGeneral {
pub name: String,
pub state: ProcessState,
pub pid: Pid,
}
#[derive(Debug, serde::Serialize)]
pub struct ProcessesAll {
pub name: String,
pub state: ProcessState,
pub pid: Pid,
pub dependencies: deps::Dependencies,
}
pub mod deps {
use crate::options::structs::{FileTriggers, ServiceState, ServiceTriggers};
use crate::utils::files::v2::FileState;
// use super::*;
#[derive(Debug, serde::Serialize)]
pub struct FilesExtended {
pub name: String,
pub path: String,
pub status: FileState,
pub triggers: FileTriggers,
}
#[derive(Debug, serde::Serialize)]
pub struct ServicesExtended {
pub name: String,
pub access_name: String,
pub status: ServiceState,
pub triggers: ServiceTriggers,
}
#[derive(Debug, serde::Serialize)]
pub struct Dependencies {
pub files: Vec<FilesExtended>,
pub services: Vec<ServicesExtended>,
}
}
}
pub enum MetricType { pub enum MetricType {
FullMetrics, FullMetrics,
@ -200,46 +332,42 @@ pub enum MetricType {
} }
pub trait MetricsExportable: Send + Sync + 'static + Debug + Any { pub trait MetricsExportable: Send + Sync + 'static + Debug + Any {
fn get_metric_type(&self) -> MetricType;
fn serialze_into_output(&self) -> anyhow::Result<String>; fn serialze_into_output(&self) -> anyhow::Result<String>;
} }
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct FullMetrics { struct FullMetrics {
hostname : String, hostname: String,
os : String, os: String,
kernel : String, kernel: String,
cpu : Cpu, cpu: Cpu,
ram : Ram, ram: Ram,
disks : Disks, disks: Disks,
networks : Ifaces, networks: Ifaces,
processes : MetricProcesses, pub processes: Vec<ProcessesGeneral>,
} }
impl FullMetrics { impl FullMetrics {
fn create( fn create(
host: HostInfo, host: HostInfo,
cpu : Cpu, cpu: Cpu,
ram : Ram, ram: Ram,
disks : Disks, disks: Disks,
ifaces : Ifaces, ifaces: Ifaces,
processes : MetricProcesses, processes: Vec<ProcessesGeneral>,
) -> Self { ) -> Self {
Self { Self {
hostname : host.hostname, hostname: host.hostname,
os : host.os, os: host.os,
kernel : host.kernel, kernel: host.kernel,
cpu, cpu,
ram, ram,
disks, disks,
networks : ifaces, networks: ifaces,
processes processes,
} }
} }
} }
impl MetricsExportable for FullMetrics { impl MetricsExportable for FullMetrics {
fn get_metric_type(&self) -> MetricType {
MetricType::FullMetrics
}
fn serialze_into_output(&self) -> anyhow::Result<String> { fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?) Ok(serde_json::to_string_pretty(self)?)
} }
@ -247,15 +375,29 @@ impl MetricsExportable for FullMetrics {
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
struct HostInfo { struct HostInfo {
hostname : String, hostname: String,
os : String, os: String,
kernel : String, kernel: String,
} }
impl MetricsExportable for HostInfo { impl MetricsExportable for HostInfo {
fn get_metric_type(&self) -> MetricType { fn serialze_into_output(&self) -> anyhow::Result<String> {
MetricType::HostInfo Ok(serde_json::to_string_pretty(self)?)
} }
}
#[derive(Debug, Serialize)]
struct HostGeneral {
hostname: String,
os: String,
kernel: String,
cpu_percentage: f32,
ram_available: u64,
disk_percentage: u64,
net_stat: u64,
}
impl MetricsExportable for HostGeneral {
fn serialze_into_output(&self) -> anyhow::Result<String> { fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?) Ok(serde_json::to_string_pretty(self)?)
} }
@ -263,41 +405,34 @@ impl MetricsExportable for HostInfo {
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct Cpu { struct Cpu {
global_usage : f32, global_usage: f32,
usage : CoreUsage, usage: CoreUsage,
} }
impl MetricsExportable for Cpu { impl MetricsExportable for Cpu {
fn get_metric_type(&self) -> MetricType {
MetricType::Cpu
}
fn serialze_into_output(&self) -> anyhow::Result<String> { fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?) Ok(serde_json::to_string_pretty(self)?)
} }
} }
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct CoreInfo { struct CoreInfo {
name: String, name: String,
brand : String, brand: String,
frequency : u64, frequency: u64,
vendor_id : String, vendor_id: String,
usage : f32, usage: f32,
} }
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct Ram { struct Ram {
free_mem : u64, free_mem: u64,
free_swap : u64, free_swap: u64,
total_mem : u64, total_mem: u64,
total_swap : u64 total_swap: u64,
} }
impl MetricsExportable for Ram{ impl MetricsExportable for Ram {
fn get_metric_type(&self) -> MetricType {
MetricType::Ram
}
fn serialze_into_output(&self) -> anyhow::Result<String> { fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?) Ok(serde_json::to_string_pretty(self)?)
} }
@ -305,20 +440,17 @@ impl MetricsExportable for Ram{
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct Disk { struct Disk {
name : String, name: String,
kind : String, kind: String,
fs : String, fs: String,
mount_point : String, mount_point: String,
total_space : u64, total_space: u64,
available_space : u64, available_space: u64,
is_removable : bool, is_removable: bool,
is_readonly : bool, is_readonly: bool,
} }
impl MetricsExportable for Disks{ impl MetricsExportable for Disks {
fn get_metric_type(&self) -> MetricType {
MetricType::Disks
}
fn serialze_into_output(&self) -> anyhow::Result<String> { fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?) Ok(serde_json::to_string_pretty(self)?)
} }
@ -327,22 +459,20 @@ impl MetricsExportable for Disks{
// vec<Network> // vec<Network>
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
struct Network { struct Network {
iname : String, iname: String,
mac : String, mac: String,
recieved : u64, ip_addresses: Vec<String>,
transmitted : u64, recieved: u64,
total_recieved_bytes : u64, transmitted: u64,
total_transmitted_bytes : u64, total_recieved_bytes: u64,
total_recieved_packets : u64, total_transmitted_bytes: u64,
total_transmitted_packets : u64, total_recieved_packets: u64,
errors_on_recieved : u64, total_transmitted_packets: u64,
errors_on_transmitted : u64, errors_on_recieved: u64,
errors_on_transmitted: u64,
} }
impl MetricsExportable for Ifaces { impl MetricsExportable for Ifaces {
fn get_metric_type(&self) -> MetricType {
MetricType::Ifaces
}
fn serialze_into_output(&self) -> anyhow::Result<String> { fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?) Ok(serde_json::to_string_pretty(self)?)
} }
@ -350,36 +480,32 @@ impl MetricsExportable for Ifaces {
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
pub struct ProcessExtended { pub struct ProcessExtended {
name : String, name: String,
status : String, status: String,
pid : Pid, pid: Pid,
dependencies : Dependencies, dependencies: Dependencies,
cpu_usage : f32, cpu_usage: f32,
ram_usage : f32, ram_usage: f32,
virtual_mem_usage : u64, virtual_mem_usage: u64,
disks_usage_read_bytes: u64, disks_usage_read_bytes: u64,
disks_usage_write_bytes: u64, disks_usage_write_bytes: u64,
} }
impl ProcessExtended { impl ProcessExtended {
pub fn from_old_with_params( pub fn from_old_with_params(old: Arc<TrackingProcess>, pid: Pid, status: ProcessState) -> Self {
old : Arc<TrackingProcess>,
pid : Pid,
status : ProcessState,
) -> Self {
Self { Self {
name : old.name.clone(), name: old.name.clone(),
status : status.to_string(), status: status.to_string(),
pid, pid,
dependencies : old.dependencies.clone(), dependencies: old.dependencies.clone(),
cpu_usage : 0.0, cpu_usage: 0.0,
ram_usage : 0.0, ram_usage: 0.0,
virtual_mem_usage : 0, virtual_mem_usage: 0,
disks_usage_read_bytes: 0, disks_usage_read_bytes: 0,
disks_usage_write_bytes: 0, disks_usage_write_bytes: 0,
} }
} }
fn add_metrics(&mut self, system : &mut System) { fn add_metrics(&mut self, system: &mut System) {
if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) { if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) {
self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32; self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32;
self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32); self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32);
@ -391,9 +517,6 @@ impl ProcessExtended {
} }
impl MetricsExportable for MetricProcesses { impl MetricsExportable for MetricProcesses {
fn get_metric_type(&self) -> MetricType {
MetricType::Processes
}
fn serialze_into_output(&self) -> anyhow::Result<String> { fn serialze_into_output(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string_pretty(self)?) Ok(serde_json::to_string_pretty(self)?)
} }

View File

@ -1,18 +1,19 @@
use crate::options::structs::{Events, NegativeOutcomes, ProcessState, ProcessUnit};
use async_trait::async_trait;
use log::{error, warn}; use log::{error, warn};
use serde::Serialize;
use std::collections::HashSet;
use std::process::{Command, Output}; use std::process::{Command, Output};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration;
use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit};
use std::collections::HashSet;
use tokio::sync::mpsc::Receiver as MpscReciever; use tokio::sync::mpsc::Receiver as MpscReciever;
use async_trait::async_trait; use tokio::time::Duration;
use serde::Serialize;
pub mod v2 { pub mod v2 {
use log::info;
use tokio::time::sleep;
use crate::options::structs::DependencyType; use crate::options::structs::DependencyType;
use crate::utils::metrics::processes::{ProcessesAll, ProcessesGeneral, ProcessesQuery};
use log::info;
use std::path::Path; use std::path::Path;
use tokio::time::sleep;
use super::*; use super::*;
@ -23,7 +24,7 @@ pub mod v2 {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return write!(f, "{}", self.0); return write!(f, "{}", self.0);
} }
} }
impl Pid { impl Pid {
fn new() -> Self { fn new() -> Self {
@ -38,9 +39,8 @@ pub mod v2 {
#[derive(Debug)] #[derive(Debug)]
pub struct ProcessesController { pub struct ProcessesController {
pub name: Arc<str>, pub name: Arc<str>,
pub pid : Pid, pid: Pid,
bin: String, bin: String,
// obj: Arc<TrackingProcess>,
state: ProcessState, state: ProcessState,
event_reader: MpscReciever<Events>, event_reader: MpscReciever<Events>,
negative_events: HashSet<Arc<str>>, negative_events: HashSet<Arc<str>>,
@ -56,12 +56,12 @@ pub mod v2 {
#[inline(always)] #[inline(always)]
pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController { pub fn new(name: &str, event_reader: MpscReciever<Events>) -> ProcessesController {
ProcessesController { ProcessesController {
name : Arc::from(name), name: Arc::from(name),
pid : Pid::new(), pid: Pid::new(),
bin : String::new(), bin: String::new(),
state : ProcessState::Stopped, state: ProcessState::Stopped,
event_reader, event_reader,
negative_events : HashSet::new(), negative_events: HashSet::new(),
} }
} }
#[inline(always)] #[inline(always)]
@ -69,85 +69,106 @@ pub mod v2 {
self.bin = bin.as_ref().to_string_lossy().into_owned(); self.bin = bin.as_ref().to_string_lossy().into_owned();
self self
} }
#[allow(unused)]
pub fn get_pid(&self) -> Pid { pub fn get_pid(&self) -> Pid {
self.pid self.pid
} }
pub fn get_state(&self) -> ProcessState {
self.state
}
async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) {
match trigger { match trigger {
"stay" => { "stay" => {
info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name); info!(
}, "Event on {} `{}` for {}. Ignoring ...",
dep_type, dep_name, self.name
);
}
"stop" => { "stop" => {
if is_active(&self.name).await { if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); info!(
"Event on {} `{}` for {}. Stopping ...",
dep_type, dep_name, self.name
);
match terminate_process(&self.name).await { match terminate_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was stopped ...", &self.name); info!("Process {} was stopped ...", &self.name);
self.state = ProcessState::Stopped; self.state = ProcessState::Stopped;
self.pid = Pid::new(); self.pid = Pid::new();
}, }
Err(er) => { Err(er) => {
error!("Cannot stop process {} : {}", self.name, er); error!("Cannot stop process {} : {}", self.name, er);
},
} }
} }
}, }
}
"user-stop" => { "user-stop" => {
if is_active(&self.name).await { if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); info!(
"Event on {} `{}` for {}. Stopping ...",
dep_type, "User Stop Call", self.name
);
match terminate_process(&self.name).await { match terminate_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was forcefully stopped ...", &self.name); info!("Process {} was forcefully stopped ...", &self.name);
self.state = ProcessState::StoppedByCli; self.state = ProcessState::StoppedByCli;
self.pid = Pid::new(); self.pid = Pid::new();
}, }
Err(er) => { Err(er) => {
error!("Cannot forcefully stop process {} : {}", self.name, er); error!("Cannot forcefully stop process {} : {}", self.name, er);
},
} }
} }
}, }
}
"user-hold" => { "user-hold" => {
if is_active(&self.name).await { if is_active(&self.name).await {
info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); info!(
"Event on {} `{}` for {}. Stopping ...",
dep_type, "User Hold Call", self.name
);
match freeze_process(&self.name).await { match freeze_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was forcefully frozen ...", &self.name); info!("Process {} was forcefully frozen ...", &self.name);
self.state = ProcessState::HoldingByCli; self.state = ProcessState::HoldingByCli;
// self.pid = Pid::new(); }
},
Err(er) => { Err(er) => {
error!("Cannot forcefully freeze process {} : {}", self.name, er); error!("Cannot forcefully freeze process {} : {}", self.name, er);
},
} }
} }
}, }
}
"hold" => { "hold" => {
if !is_frozen(&self.name).await { if !is_frozen(&self.name).await {
info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); info!(
"Event on {} `{}` for {}. Freezing ...",
dep_type, dep_name, self.name
);
match freeze_process(&self.name).await { match freeze_process(&self.name).await {
Ok(_) => { Ok(_) => {
info!("Process {} was frozen ...", &self.name); info!("Process {} was frozen ...", &self.name);
self.state = ProcessState::Holding; self.state = ProcessState::Holding;
}, }
Err(er) => { Err(er) => {
error!("Cannot freeze process {} : {}", self.name, er); error!("Cannot freeze process {} : {}", self.name, er);
},
} }
} }
}, }
}
"restart" => { "restart" => {
info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); info!(
"Event on {} `{}` for {}. Restarting ...",
dep_type, dep_name, self.name
);
let pid = restart_process(&self.name, &self.bin).await; let pid = restart_process(&self.name, &self.bin).await;
sleep(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
if let Ok(pid) = pid { if let Ok(pid) = pid {
self.pid = Pid(pid); self.pid = Pid(pid);
info!("{}: New PID - {}", self.name, self.pid); info!("{}: New PID - {}", self.name, self.pid);
} }
}, }
_ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), _ => error!(
"Impermissible trigger in file-trigger for {}. Ignoring event ...",
self.name
),
} }
tokio::time::sleep(Duration::from_micros(100)).await; tokio::time::sleep(Duration::from_micros(100)).await;
} }
@ -173,12 +194,12 @@ pub mod v2 {
warn!("Process {} was started by user call ...", self.name); warn!("Process {} was started by user call ...", self.name);
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
self.pid = Pid(pid); self.pid = Pid(pid);
return Ok(()) return Ok(());
} else { } else {
warn!("Attempt to start process {} by user call was stopped due to existance of negative incidents ...", self.name); warn!("Attempt to start process {} by user call was stopped due to existance of negative incidents ...", self.name);
return Err(anyhow::Error::msg( return Err(anyhow::Error::msg(
format!("Attempt to start process {} by user call was stopped due to existance of negative incidents ...", self.name) format!("Attempt to start process {} by user call was stopped due to existance of negative incidents ...", self.name)
)) ));
} }
} }
#[allow(unused)] #[allow(unused)]
@ -192,7 +213,7 @@ pub mod v2 {
warn!("Attempt to unfreeze process {} by user call was stopped due to existance of negative incidents ...", self.name); warn!("Attempt to unfreeze process {} by user call was stopped due to existance of negative incidents ...", self.name);
return Err(anyhow::Error::msg( return Err(anyhow::Error::msg(
format!("Attempt to unfreeze process {} by user call was stopped due to existance of negative incidents ...", self.name) format!("Attempt to unfreeze process {} by user call was stopped due to existance of negative incidents ...", self.name)
)) ));
} }
} }
#[allow(unused)] #[allow(unused)]
@ -202,6 +223,14 @@ pub mod v2 {
self.pid = Pid(pid); self.pid = Pid(pid);
Ok(()) Ok(())
} }
pub async fn get_general_info(&self) -> ProcessesGeneral {
ProcessesGeneral {
name: self.name.to_string(),
state: self.state,
pid: self.pid,
}
}
} }
#[async_trait] #[async_trait]
@ -212,50 +241,62 @@ pub mod v2 {
let state = &self.state; let state = &self.state;
match (state, conditions) { match (state, conditions) {
(ProcessState::Holding, (_, _)) => { (ProcessState::Holding, (_, _)) => {
info!("No negative dependecies events on {} frozen process. Unfreezing ...", self.name); info!(
"No negative dependecies events on {} frozen process. Unfreezing ...",
self.name
);
if let Err(er) = unfreeze_process(&self.name).await { if let Err(er) = unfreeze_process(&self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er); error!("Cannot unfreeze process {} : {}", self.name, er);
} else { } else {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
info!("Process {} was unfreezed", &self.name); info!("Process {} was unfreezed", &self.name);
} }
}, }
(ProcessState::Stopped, (_, _)) => { (ProcessState::Stopped, (_, _)) => {
info!("No negative dependecies events on stopped {} process. Starting ...", self.name); info!(
"No negative dependecies events on stopped {} process. Starting ...",
self.name
);
match start_process(&self.name, &self.bin).await { match start_process(&self.name, &self.bin).await {
Ok(pid) => { Ok(pid) => {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
self.pid = Pid(pid); self.pid = Pid(pid);
info!("{}: New PID - {}", self.name, self.pid); info!("{}: New PID - {}", self.name, self.pid);
}, }
Err(er) => { Err(er) => {
error!("Cannot start process {} : {}", self.name, er); error!("Cannot start process {} : {}", self.name, er);
},
} }
}, }
}
(ProcessState::Pending, (false, false)) => { (ProcessState::Pending, (false, false)) => {
info!("{} process was impermissibly stopped. Starting ...", self.name); info!(
"{} process was impermissibly stopped. Starting ...",
self.name
);
match start_process(&self.name, &self.bin).await { match start_process(&self.name, &self.bin).await {
Ok(pid) => { Ok(pid) => {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
self.pid = Pid(pid); self.pid = Pid(pid);
info!("{}: New PID - {}", self.name, self.pid); info!("{}: New PID - {}", self.name, self.pid);
}, }
Err(er) => { Err(er) => {
error!("Cannot start process {} : {}", self.name, er); error!("Cannot start process {} : {}", self.name, er);
},
} }
}, }
}
(ProcessState::Pending, (true, true)) => { (ProcessState::Pending, (true, true)) => {
info!("No negative dependecies events on {} process. Unfreezing ...", self.name); info!(
"No negative dependecies events on {} process. Unfreezing ...",
self.name
);
if let Err(er) = unfreeze_process(&self.name).await { if let Err(er) = unfreeze_process(&self.name).await {
error!("Cannot unfreeze process {} : {}", self.name, er); error!("Cannot unfreeze process {} : {}", self.name, er);
} else { } else {
self.state = ProcessState::Pending; self.state = ProcessState::Pending;
info!("Process {} was unfreezed", &self.name); info!("Process {} was unfreezed", &self.name);
} }
}, }
_ => {}, _ => {}
} }
} }
while let Ok(event) = self.event_reader.try_recv() { while let Ok(event) = self.event_reader.try_recv() {
@ -264,22 +305,16 @@ pub mod v2 {
if self.negative_events.contains(&target) { if self.negative_events.contains(&target) {
self.negative_events.remove(&target); self.negative_events.remove(&target);
} }
}, }
Events::Negative(event) => { Events::Negative(event) => match event {
match event { NegativeOutcomes::FileWasChanged(target, dep_type, trigger)
NegativeOutcomes::FileWasChanged(target, dep_type, trigger) | | NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger)
NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) | | NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => {
if !self.negative_events.contains(&target) { if !self.negative_events.contains(&target) {
self.negative_events.insert(target.clone()); self.negative_events.insert(target.clone());
self.trigger_on( self.trigger_on(&target, &trigger, dep_type).await;
&target,
&trigger,
dep_type
).await;
} }
},
} }
}, },
} }
@ -402,11 +437,12 @@ pub async fn is_frozen(name: &str) -> bool {
/// ///
pub async fn terminate_process(name: &str) -> anyhow::Result<()> { pub async fn terminate_process(name: &str) -> anyhow::Result<()> {
if !is_active(name).await { if !is_active(name).await {
return Err(anyhow::Error::msg(format!("Process {} is already stopped", name))) return Err(anyhow::Error::msg(format!(
"Process {} is already stopped",
name
)));
} }
let _ = Command::new("pkill") let _ = Command::new("pkill").arg(name).output()?;
.arg(name)
.output()?;
Ok(()) Ok(())
} }
@ -424,9 +460,7 @@ pub async fn terminate_process(name: &str) -> anyhow::Result<()> {
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn freeze_process(name: &str) -> anyhow::Result<()> { pub async fn freeze_process(name: &str) -> anyhow::Result<()> {
let _ = Command::new("pkill") let _ = Command::new("pkill").args(["-STOP", name]).output()?;
.args(["-STOP", name])
.output()?;
Ok(()) Ok(())
} }
@ -444,9 +478,7 @@ pub async fn freeze_process(name: &str) -> anyhow::Result<()> {
/// *depends on* : - /// *depends on* : -
/// ///
pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> {
let _ = Command::new("pkill") let _ = Command::new("pkill").args(["-CONT", name]).output()?;
.args(["-CONT", name])
.output()?;
Ok(()) Ok(())
} }
@ -484,7 +516,10 @@ pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<u32> {
/// ///
pub async fn start_process(name: &str, path: &str) -> anyhow::Result<u32> { pub async fn start_process(name: &str, path: &str) -> anyhow::Result<u32> {
if is_active(name).await { if is_active(name).await {
return Err(anyhow::Error::msg(format!("Process {} is already running", name))) return Err(anyhow::Error::msg(format!(
"Process {} is already running",
name
)));
} }
let mut command = Command::new(path); let mut command = Command::new(path);
// command.arg(path); // command.arg(path);
@ -495,9 +530,10 @@ pub async fn start_process(name: &str, path: &str) -> anyhow::Result<u32> {
warn!("Process {} is running now!", name); warn!("Process {} is running now!", name);
Ok(pid) Ok(pid)
} }
Err(er) => { Err(er) => Err(anyhow::Error::msg(format!(
Err(anyhow::Error::msg(format!("Cannot start process {} : {}", name, er))) "Cannot start process {} : {}",
} name, er
))),
} }
} }
@ -516,8 +552,7 @@ mod process_unittests {
// let _ = std::io::stdout().write_all(b""); // let _ = std::io::stdout().write_all(b"");
let res1 = start_process("restart-prc", "./tests/examples/restart-prc").await; let res1 = start_process("restart-prc", "./tests/examples/restart-prc").await;
assert!(res1.is_ok()); assert!(res1.is_ok());
let res2 = let res2 = restart_process("restart-prc", "./tests/examples/restart-prc").await;
restart_process("restart-prc", "./tests/examples/restart-prc").await;
assert!(res2.is_ok()); assert!(res2.is_ok());
let _ = terminate_process("restart-prc").await; let _ = terminate_process("restart-prc").await;
let res3 = is_active("restart-prc").await; let res3 = is_active("restart-prc").await;

View File

@ -1,20 +1,20 @@
use async_trait::async_trait;
use futures::future::Future;
use log::{error, warn}; use log::{error, warn};
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::Arc;
use tokio::time::Duration;
use tokio::sync::mpsc::Sender as Sender;
use async_trait::async_trait;
use std::pin::Pin; use std::pin::Pin;
use futures::future::Future; use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::time::Duration;
pub mod v2 { pub mod v2 {
use futures::FutureExt; use futures::FutureExt;
use log::info; use log::info;
use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; use crate::options::structs::{Events, ProcessUnit, ServiceState, Triggers};
use super::*; use super::*;
use std::collections::{HashMap, BTreeMap, VecDeque}; use std::collections::{BTreeMap, HashMap, VecDeque};
type MpscSender = Arc<Sender<Events>>; type MpscSender = Arc<Sender<Events>>;
// type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>; // type EventHandlers<'a> = Vec<MpscSender<Events<'a>>>;
@ -26,15 +26,15 @@ pub mod v2 {
pub struct ServicesController { pub struct ServicesController {
// i.e. yandex.ru // i.e. yandex.ru
#[allow(unused)] #[allow(unused)]
name : String, name: String,
// i.e. yandex.ru:443 // i.e. yandex.ru:443
access_url : Arc<str>, access_url: Arc<str>,
// "OK" or "Unavailable" // "OK" or "Unavailable"
state: ServiceState, state: ServiceState,
// btree map with key as max wait time and it's key to hashmap // btree map with key as max wait time and it's key to hashmap
config: ConnectionQueue, config: ConnectionQueue,
// Map of processes with their (trigger and mpsc sender) // Map of processes with their (trigger and mpsc sender)
event_registrator : EventHandlers, event_registrator: EventHandlers,
} }
impl PartialEq for ServicesController { impl PartialEq for ServicesController {
@ -47,19 +47,15 @@ pub mod v2 {
#[inline(always)] #[inline(always)]
pub fn new() -> ServicesController { pub fn new() -> ServicesController {
ServicesController { ServicesController {
name : String::new(), name: String::new(),
access_url : Arc::from(String::new()), access_url: Arc::from(String::new()),
state : ServiceState::Unavailable, state: ServiceState::Unavailable,
config: ConnectionQueue::new(), config: ConnectionQueue::new(),
event_registrator : EventHandlers::new(), event_registrator: EventHandlers::new(),
} }
} }
#[inline(always)] #[inline(always)]
pub fn with_access_name( pub fn with_access_name(mut self, hostname: &str, access_url: &str) -> ServicesController {
mut self,
hostname: &str,
access_url: &str,
) -> ServicesController {
self.name = hostname.to_string(); self.name = hostname.to_string();
self.access_url = Arc::from(access_url); self.access_url = Arc::from(access_url);
self self
@ -76,18 +72,21 @@ pub mod v2 {
} }
pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String {
format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) format!(
"{}{}",
hostname,
port.map_or_else(|| "".to_string(), |p| format!(":{}", p))
)
} }
pub fn add_process( pub fn get_state(&self) -> ServiceState {
&mut self, self.state
proc_name: &str, }
trigger: Triggers, pub fn add_process(&mut self, proc_name: &str, trigger: Triggers, sender: MpscSender) {
sender: MpscSender,
) {
let proc_name: Arc<str> = Arc::from(proc_name); let proc_name: Arc<str> = Arc::from(proc_name);
// queue add // queue add
if let Triggers::Service { wait, .. } = trigger { if let Triggers::Service { wait, .. } = trigger {
self.config.entry(wait) self.config
.entry(wait)
.and_modify(|el| el.push_back(proc_name.clone())) .and_modify(|el| el.push_back(proc_name.clone()))
.or_insert({ .or_insert({
let mut temp = VecDeque::new(); let mut temp = VecDeque::new();
@ -96,14 +95,15 @@ pub mod v2 {
}); });
} }
// event add // event add
self.event_registrator.entry(proc_name).or_insert((trigger, sender)); self.event_registrator
.entry(proc_name)
.or_insert((trigger, sender));
} }
async fn check_state(&self) -> anyhow::Result<()> { async fn check_state(&self) -> anyhow::Result<()> {
let url = self.access_url.clone(); let url = self.access_url.clone();
let resolve_future = tokio::task::spawn_blocking(move || { let resolve_future = tokio::task::spawn_blocking(move || url.to_socket_addrs());
url.to_socket_addrs() let addrs: Vec<_> =
}); match tokio::time::timeout(Duration::from_secs(1), resolve_future).await {
let addrs: Vec<_> = match tokio::time::timeout(Duration::from_secs(1), resolve_future).await {
Ok(Ok(addrs)) => addrs?.collect(), Ok(Ok(addrs)) => addrs?.collect(),
Ok(Err(er)) => return Err(er.into()), Ok(Err(er)) => return Err(er.into()),
Err(_) => return Err(anyhow::Error::msg("DNS resolution timeout")), Err(_) => return Err(anyhow::Error::msg("DNS resolution timeout")),
@ -113,12 +113,20 @@ pub mod v2 {
return Err(anyhow::Error::msg("No addresses resolved")); return Err(anyhow::Error::msg("No addresses resolved"));
} }
let tasks: Vec<_> = addrs.into_iter().map(|addr| async move { let tasks: Vec<_> = addrs
match tokio::time::timeout(Duration::from_secs(2), tokio::net::TcpStream::connect(&addr)).await { .into_iter()
.map(|addr| async move {
match tokio::time::timeout(
Duration::from_secs(2),
tokio::net::TcpStream::connect(&addr),
)
.await
{
Ok(Ok(_)) => Some(addr), Ok(Ok(_)) => Some(addr),
_ => None, _ => None,
} }
}).collect(); })
.collect();
let mut any_success = false; let mut any_success = false;
for task in futures::future::join_all(tasks).await { for task in futures::future::join_all(tasks).await {
if task.is_some() { if task.is_some() {
@ -127,7 +135,10 @@ pub mod v2 {
} }
} }
if !any_success { if !any_success {
return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))); return Err(anyhow::Error::msg(format!(
"No access to service `{}`",
&self.access_url
)));
} }
Ok(()) Ok(())
@ -135,7 +146,9 @@ pub mod v2 {
async fn trigger_on(&mut self) { async fn trigger_on(&mut self) {
match self.state { match self.state {
ServiceState::Ok => { ServiceState::Ok => {
let futures : Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self.event_registrator.iter() let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.event_registrator
.iter()
.map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt)))
.map(|(prc, (serv, sender_opt))| async move { .map(|(prc, (serv, sender_opt))| async move {
info!("Notifying process {} ...", prc); info!("Notifying process {} ...", prc);
@ -145,11 +158,11 @@ pub mod v2 {
.collect(); .collect();
futures::future::join_all(futures).await; futures::future::join_all(futures).await;
}, }
ServiceState::Unavailable => { ServiceState::Unavailable => {
// looped check and notifying // looped check and notifying
self.looped_check().await; self.looped_check().await;
}, }
} }
} }
async fn looped_check(self: &mut Self) { async fn looped_check(self: &mut Self) {
@ -160,11 +173,16 @@ pub mod v2 {
let mut attempt: u32 = 1; let mut attempt: u32 = 1;
let access_url = Arc::new(self.access_url.clone()); let access_url = Arc::new(self.access_url.clone());
if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async { if let Err(_) = tokio::time::timeout(
tokio::time::Duration::from_secs((longest + 1) as u64),
async {
// let access_url = access_url.clone(); // let access_url = access_url.clone();
loop { loop {
interapter.tick().await; interapter.tick().await;
info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt); info!(
"Trying to connect to {} (attempt: {}) ...",
&access_url, attempt
);
attempt += 1; attempt += 1;
let state_check_result = self.check_state().await; let state_check_result = self.check_state().await;
@ -172,8 +190,12 @@ pub mod v2 {
if state_check_result.is_ok() { if state_check_result.is_ok() {
info!("Connection to {} is `OK` now", &access_url); info!("Connection to {} is `OK` now", &access_url);
self.state = ServiceState::Ok; self.state = ServiceState::Ok;
let futures : Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self.event_registrator.iter() let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = self
.map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) .event_registrator
.iter()
.map(|(prc, (_, sender_opt))| {
(prc, (self.access_url.clone(), sender_opt))
})
.map(|(prc, (serv, sender_opt))| async move { .map(|(prc, (serv, sender_opt))| async move {
info!("Notifying process {} ...", prc); info!("Notifying process {} ...", prc);
let _ = sender_opt.send(Events::Positive(serv.clone())); let _ = sender_opt.send(Events::Positive(serv.clone()));
@ -186,31 +208,49 @@ pub mod v2 {
} else { } else {
let now = timer.elapsed(); let now = timer.elapsed();
let iterator = self.config.iter() let iterator = self
.filter(|(&wait, _)| tokio::time::Duration::from_secs(wait as u64) <= now) .config
.iter()
.filter(|(&wait, _)| {
tokio::time::Duration::from_secs(wait as u64) <= now
})
.flat_map(|(_, a)| a.iter().cloned()) .flat_map(|(_, a)| a.iter().cloned())
.collect::<VecDeque<Arc<str>>>(); .collect::<VecDeque<Arc<str>>>();
for name in iterator { for name in iterator {
let proc_name = name.to_string(); let proc_name = name.to_string();
info!("Trying to notify process `{}` ...", &proc_name); info!("Trying to notify process `{}` ...", &proc_name);
let sender_opt = self.event_registrator.get(&name) let sender_opt =
.map(|(trigger, sender)| self.event_registrator.get(&name).map(|(trigger, sender)| {
(trigger.to_service_negative_event(self.access_url.clone()), sender) (
); trigger
.to_service_negative_event(self.access_url.clone()),
sender,
)
});
if let Some((tr, tx)) = sender_opt { if let Some((tr, tx)) = sender_opt {
let _ = tx.send(tr.unwrap()).await; let _ = tx.send(tr.unwrap()).await;
} else { } else {
error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url) error!(
"Cannot find {} channel sender in {} service",
name.clone(),
&self.access_url
)
} }
} }
} }
} }
}).await { },
)
.await
{
info!("Timeout of establishing connection to {}. ", &access_url); info!("Timeout of establishing connection to {}. ", &access_url);
} }
} }
pub fn get_arc_access_url(&self) -> Arc<str> {
self.access_url.clone()
}
} }
#[async_trait] #[async_trait]
impl ProcessUnit for ServicesController { impl ProcessUnit for ServicesController {
@ -222,14 +262,16 @@ pub mod v2 {
warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len());
self.state = ServiceState::Ok; self.state = ServiceState::Ok;
self.trigger_on().await; self.trigger_on().await;
}, }
(ServiceState::Ok, Err(_)) => { (ServiceState::Ok, Err(_)) => {
warn!("Unreachable for connection service `{}`. Initializing reconnect mechanism ...", &self.access_url); warn!("Unreachable for connection service `{}`. Initializing reconnect mechanism ...", &self.access_url);
self.state = ServiceState::Unavailable; self.state = ServiceState::Unavailable;
self.trigger_on().await; self.trigger_on().await;
}, }
(ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url), (ServiceState::Unavailable, Err(_)) => {
_ => { /* DEAD END WITH NO INTEREST */ }, warn!("Service {} is still unreachable", &self.access_url)
}
_ => { /* DEAD END WITH NO INTEREST */ }
} }
} }
} }