diff --git a/noxis-cli/.env.example b/noxis-cli/.env.example new file mode 100644 index 0000000..b0d53a3 --- /dev/null +++ b/noxis-cli/.env.example @@ -0,0 +1 @@ +NOXIS_SOCKET_PATH = "/home/vladislavd/diplom_code/noxis-rs/noxis.sock" \ No newline at end of file diff --git a/noxis-cli/Cargo.toml b/noxis-cli/Cargo.toml index e02d5f8..712b2b6 100644 --- a/noxis-cli/Cargo.toml +++ b/noxis-cli/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0.94" clap = { version = "4.5.22", features = ["derive"] } +dotenv = "0.15.0" serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.133" thiserror = "2.0.11" diff --git a/noxis-cli/src/cli.rs b/noxis-cli/src/cli.rs index 5a82b64..f5e7672 100644 --- a/noxis-cli/src/cli.rs +++ b/noxis-cli/src/cli.rs @@ -79,7 +79,23 @@ pub enum ConfigAction { about = "To reset all config settings", )] Reset, + #[command( + about = "To get current Noxis configuration", + name = "ls" + )] + Show(EnvConfig), } +#[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] +pub struct EnvConfig { + // flag + #[arg( + long = "env", + action, + help = "to read environment vars configuration", + )] + pub is_env : bool, +} + #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] pub struct LocalConfig { @@ -148,4 +164,28 @@ pub enum ProcessAction { about = "To get info about current process's services-dependencies", )] Services, +} + +pub mod metrics_models { + pub enum MetricsMode { + Full, + // system + Cpu, + Memory, + Ram, + Rom, + Network, + // processes + Processes + // Config + } +} + +impl Cli { + pub fn validate_socket(mut self) -> Self { + if let Ok(path) = std::env::var("NOXIS_SOCKET_PATH") { + self.socket = path; + } + self + } } \ No newline at end of file diff --git a/noxis-cli/src/cli_error.rs b/noxis-cli/src/cli_error.rs index d5bae9b..ef9f802 100644 --- a/noxis-cli/src/cli_error.rs +++ b/noxis-cli/src/cli_error.rs @@ -2,7 +2,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum NoxisCliError { - #[error("Can't find socket `{0}`. Error : {1}")] + #[error("Can't find socket `{0}`. {1}")] NoxisDaemonMissing(String, String), #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")] PortIsNotWritable, diff --git a/noxis-cli/src/cli_net.rs b/noxis-cli/src/cli_net.rs index a3300ed..6d31d12 100644 --- a/noxis-cli/src/cli_net.rs +++ b/noxis-cli/src/cli_net.rs @@ -25,6 +25,6 @@ pub async fn try_send(cli: Cli) -> Result<()> { .await .map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?; - println!("Received response: {}", String::from_utf8_lossy(&response)); + println!("{}", String::from_utf8_lossy(&response)); Ok(()) } \ No newline at end of file diff --git a/noxis-cli/src/main.rs b/noxis-cli/src/main.rs index 7961b75..2ad8ead 100644 --- a/noxis-cli/src/main.rs +++ b/noxis-cli/src/main.rs @@ -9,7 +9,8 @@ use anyhow::Result; #[tokio::main] async fn main() -> Result<()>{ - let cli = Cli::parse(); + dotenv::dotenv().ok(); + let cli = Cli::parse().validate_socket(); try_send(cli).await?; Ok(()) } diff --git a/noxis-rs/.env.example b/noxis-rs/.env.example index c956c65..8efbd9b 100644 --- a/noxis-rs/.env.example +++ b/noxis-rs/.env.example @@ -9,4 +9,6 @@ NOXIS_HAGENT_SOCKET_PATH = "/var/run/example/hostagent.sock" NOXIS_LOG_TO = "/var/log/noxis/noxis.log" NOXIS_REMOTE_SERVER_URL = "ip.ip.ip.ip:port" NOXIS_CONFIG_PATH = "./settings.json" -NOXIS_METRICS_MODE = "full" \ No newline at end of file +NOXIS_METRICS_MODE = "full" +NOXIS_SOCKET_PATH = "/path/to/noxis.sock" +NOXIS_MAX_LOG_LEVEL = "TRACE" \ No newline at end of file diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 1ad650e..73ed76b 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "noxis-rs" -version = "0.11.26" +version = "0.12.0" edition = "2021" [dependencies] @@ -21,3 +21,4 @@ dotenv = "0.15.0" futures = "0.3.31" async-trait = "0.1.88" crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } +lazy_static = "1.5.0" diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index b27d9c7..263600a 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -20,12 +20,12 @@ "hostname": "ya.ru", "port": 443, "triggers": { - "wait": 10, - "onLost": "restart" + "wait": 2, + "onLost": "stop" } } ] } } ] -} +} \ No newline at end of file diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 4cc69f4..3f1c6fd 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -1,17 +1,13 @@ mod options; mod utils; -use clap::Parser; use log::{error, info}; -use options::config::*; use options::logger::setup_logger; use options::signals::set_valid_destructor; use options::structs::Processes; use options::cli_pipeline::init_cli_pipeline; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc; -use utils::*; use options::preboot::PrebootParams; use tokio::sync::{broadcast, oneshot}; use options::config::v2::init_config_mechanism; @@ -25,23 +21,41 @@ async fn main() -> anyhow::Result<()>{ info!("Noxis is configurating..."); // let (tx_brd, mut rx_brd) = broadcast::channel::(1); + // for cli to get config + let mut rx_cli_brd = tx_brd.subscribe(); // cli <-> config let (tx_oneshot, rx_oneshot) = oneshot::channel::(); let mut handler: Vec> = vec![]; - // initilaizing task for config manipulations + // initilaizing task for config manipulations + let preboot_config = preboot.clone(); let config_module = tokio::spawn(async move { let _ = init_config_mechanism( rx_oneshot, tx_brd, - preboot.clone() + preboot_config ).await; }); handler.push(config_module); // initilaizing task for cli manipulation + let preboot_cli = preboot.clone(); let cli_module = tokio::spawn(async move { - if let Err(er) = init_cli_pipeline().await { + let config = { + let mut tick = tokio::time::interval(Duration::from_millis(500)); + loop { + tick.tick().await; + break match rx_cli_brd.try_recv() { + Ok(conf) => conf, + Err(_) => continue, + } + } + }; + if let Err(er) = init_cli_pipeline( + preboot_cli, + Arc::new(config), + tx_oneshot + ).await { error!("CLI pipeline failed due to {}", er) } }); diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index 0c13e22..199a9ba 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -1,9 +1,17 @@ use log::{error, info}; use tokio::net::{ UnixStream, UnixListener }; +use tokio::sync::{Mutex, OnceCell}; use tokio::time::{sleep, Duration}; use std::fs; +use std::sync::Arc; use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use noxis_cli::Cli; +use super::structs::Processes; + +use super::preboot::PrebootParams; + +type ConfigGateway = tokio::sync::oneshot::Sender; +type ProcessedConfigGateway = Arc>>; /// # Fn `init_cli_pipeline` /// ## for catching all input requests from CLI @@ -18,19 +26,35 @@ use noxis_cli::Cli; /// /// *depends on* : - /// -pub async fn init_cli_pipeline() -> anyhow::Result<()> { - let socket_path = "noxis.sock"; +pub async fn init_cli_pipeline( + params: Arc, + config : Arc, + config_gateway : ConfigGateway, +) -> anyhow::Result<()> { + let socket_path = ¶ms.self_socket; let _ = fs::remove_file(socket_path); + let config_gateway = Arc::new( + Mutex::new( + OnceCell::new_with(Some(config_gateway)) + ) + ); + match UnixListener::bind(socket_path) { Ok(list) => { // TODO: remove `unwrap`s - info!("Listening on {}", socket_path); + info!("Listening on {}", socket_path.display()); + std::env::set_var("NOXIS_SOCKET_PATH", socket_path); loop { match list.accept().await { Ok((socket, _)) => { - // tokio::spawn(); - process_connection(socket).await; + // ??? maybe errors on async work with data transfering between modules + let params = params.clone(); + let config = config.clone(); + let config_gateway = config_gateway.clone(); + tokio::spawn(async move { + process_connection(socket, params.clone(), config.clone(), config_gateway.clone()).await; + }); }, Err(er) => { error!("Cannot poll connection to CLI due to {}", er); @@ -60,7 +84,7 @@ pub async fn init_cli_pipeline() -> anyhow::Result<()> { /// /// *depends on* : `tokio::net::TcpStream` /// -async fn process_connection(mut stream: UnixStream) { +async fn process_connection(mut stream: UnixStream, params: Arc, config : Arc, cfg_gateway : ProcessedConfigGateway) { let mut buf = vec![0; 1024]; match stream.read(&mut buf).await { Ok(0) => { @@ -72,7 +96,16 @@ async fn process_connection(mut stream: UnixStream) { match serde_json::from_slice::(&buf) { Ok(cli) => { info!("Received CLI request: {:?}", cli); - let response = "OK"; + let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone()).await { + Ok(response) => { + response + }, + Err(er) => { + let error_msg = format!("Error: {}", er); + error!("{}", &error_msg); + error_msg + }, + }; if let Err(e) = stream.write_all(response.as_bytes()).await { error!("Failed to send response: {}", e); } @@ -86,3 +119,59 @@ async fn process_connection(mut stream: UnixStream) { } let _ = stream.shutdown().await; } + + +async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Arc, cfg_gateway: ProcessedConfigGateway) -> anyhow::Result { + use noxis_cli::{Commands, ConfigAction}; + return match cli.command { + Commands::Config(config) => { + match config.action { + ConfigAction::Show(env ) => { + if env.is_env { + Ok(serde_json::to_string_pretty(params.as_ref())?) + } else { + /* */ + Ok(serde_json::to_string_pretty(global_config.as_ref())?) + } + }, + ConfigAction::Reset => { + Err(anyhow::Error::msg("It's temporarly forbidden to reset current config using CLI-util")) + }, + ConfigAction::Local(cfg) => { + if cfg.is_json { + /* */ + let new_config = serde_json::from_str::(&cfg.config)?; + let new_version = new_config.get_version().to_string(); + + use super::{config::config_comparing, structs::ConfigActuality}; + + return match config_comparing(&global_config, &new_config) { + ConfigActuality::Remote => { + let cfg_gateway = cfg_gateway.clone(); + tokio::spawn(async move { + let mut lock = cfg_gateway.lock().await; + match lock.take() { + Some(channel) => { + let _ = channel.send(new_config); + }, + None => error!("Cannot update confif due to channel sender loss"), + } + }); + Ok(format!("Ok. Saving and reloading with version {}", new_version)) + }, + _ => Err(anyhow::Error::msg(format!("Local config (version: {}) is more actual", global_config.get_version()))), + } + } else { + Err(anyhow::Error::msg("It's temporarly forbidden to set config in non-json mode")) + } + }, + ConfigAction::Remote => {Ok(params.remote_server_url.clone())}, + /* */ + // _ => Err(anyhow::Error::msg("Unrecognized command from CLI")) + } + }, + /* */ + Commands::Status => Ok(String::from("Ok")), + _ => Ok(String::from("Ok")) + } +} diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 00e3e73..54bafce 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -189,7 +189,7 @@ pub mod v2 { }, Ok(_) => { info!("Successfully subscribed to {} pubsub channel", channel_name); - let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3))); + let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(1))); loop { if let Ok(msg) = pub_sub.get_message() { // dbg!("ok on get message"); @@ -325,10 +325,6 @@ pub mod v2 { if !events.is_empty() { warn!("Local config file was overwritten. Discarding changes ..."); need_to_export_config = true; - // events - // .iter() - // .any(|event| *event == EventMask::DELETE_SELF) - // .then(|| need_to_recreate_watcher = true); } } } @@ -418,259 +414,6 @@ fn load_processes(json_filename: &str) -> Option { None } -/// # Fn `get_actual_config` -/// ## for getting actual Monitor's config from local and remote storages -/// -/// *input* : - -/// -/// *output* : `None` on fatal error in mechanisms | `Some(conf)` on finish reading and parsing -/// -/// *initiator* : main thread -/// -/// *managing* : - -/// -/// *depends on* : struct `Processes` -/// -pub async fn get_actual_config(params : Arc) -> Option { - // * if no local conf -> loop and +inf getting conf from redis server - // * if local conf -> once getting conf from redis server - let config_path = params.config.to_str().unwrap_or_else(|| { - error!("Invalid character in config file. Config path was set to default"); - "settings.json" - }); - info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url); - match load_processes(config_path) { - Some(local_conf) => { - info!( - "Found local configuration, version - {}", - &local_conf.date_of_creation - ); - if !params.no_sub { - if let Some(remote_conf) = - // TODO : rework with pubsub mech - once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url)) - { - return match config_comparing(&local_conf, &remote_conf) { - ConfigActuality::Local => { - info!("Local config is actual"); - Some(local_conf) - } - ConfigActuality::Remote => { - info!("Pulled config is more actual. Saving changes!"); - if save_new_config(&remote_conf, config_path).is_err() { - error!("Saving changes process failed due to unexpected error...") - } - Some(remote_conf) - } - }; - } - } - Some(local_conf) - } - None => { - warn!("No local valid conf was found. Trying to pull remote one..."); - if !params.no_sub { - let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url))); - if let Some(conf) = get_remote_conf_watcher(&mut conn).await { - info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); - let _ = save_new_config(&conf, config_path); - return Some(conf); - } - } - None - } - } -} - -/// # Fn `get_remote_conf_watcher` -/// ## for infinitive pulling remote config -/// -/// *input* : `&mut Connection` -/// -/// *output* : `None` on fatal error | `Some(conf)` on succesfull pulling -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : mut ref `Connection` object -/// -/// *depends on* : struct `Processes` -/// -async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { - let mut conn = conn.as_pubsub(); - let cont = crate::utils::get_container_id(); - loop { - match cont { - Some(ref cont) => { - let cont = cont.trim(); - if conn.subscribe(cont).is_err() { - // todo : delay - continue; - } - match conn.get_message() { - Ok(msg) => { - let msg: Result = msg.get_payload(); - if let Ok(payload) = msg { - if let Some(remote) = parse_extern_config(&payload) { - return Some(remote) - } - else { - error!("Pulled invalid config, cannot start. Waiting for remote conf..."); - } - } else { - error!("Cannot get Redis message payload. Waiting for remote conf..."); - } - // todo : delay - continue; - }, - Err(_) => { - // todo : delay - continue; - }, - } - }, - None => { - error!("Cannot get container id. Returning"); - break - }, - } - } - None -} - -/// # Fn `get_remote_conf_watcher` -/// ## for trying to pull remote config -/// -/// > only for situation when local isn't None (no need to fck redis server) -/// -/// *input* : `&str` -/// -/// *output* : `None` on empty pubsub or error | `Some(conf)` on succesfull pulling -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : &str of Redis Server credentials -/// -/// *depends on* : struct `Processes` -/// -fn once_get_remote_configuration(serv_info: &str) -> Option { - let cont = crate::utils::get_container_id(); - match Client::open(serv_info) { - Ok(client) => { - match client.get_connection() { - Ok(mut conn) => { - let mut conn = conn.as_pubsub(); - match conn.subscribe(cont) { - Ok(_) => { - if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() { - error!("Cannot set reading pubsub timeout and pull remote config"); - return None; - } - match conn.get_message() { - Ok(msg) => { - info!("Pulled config from Redis Server"); - let get_payload: Result = msg.get_payload(); - match get_payload { - Ok(payload) => { - let remote = parse_extern_config(&payload); - if remote.is_none() { - error!("Pulled config is invalid. Check it in Redis Server"); - } - remote - }, - Err(_) => { - error!("Cannot extract payload from new message. Check Redis Server state"); - None - }, - } - }, - Err(_) => { - None - }, - } - }, - Err(_) => { - error!("Redis subscription process failed. Check Redis configuration!"); - None - } - } - } - Err(_) => { - error!("Redis connection attempt is failed. Check Redis configuration!"); - None - } - } - } - Err(_) => { - error!("Redis-Client opening attempt is failed. Check network configuration!"); - None - } - } -} - -// ! watchers - -/// # Fn `open_watcher` -/// ## for infinitive opening Redis client -/// -/// > only for situation when local isn't None (no need to fck redis server) -/// -/// *input* : `Option` -/// -/// *output* : redis::Client on successful opening client -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : &str of Redis Server credentials -/// -/// *depends on* : struct `redis::Client` -/// -fn open_watcher(serv_info: &str) -> Client { - loop { - match Client::open(serv_info) { - Ok(redis) => { - info!("Successfully opened Redis-Client"); - return redis; - } - Err(_) => { - error!("Redis-Client opening attempt is failed. Check network configuration! Retrying..."); - std::thread::sleep(Duration::from_secs(4)); - } - } - } -} - -/// # Fn `get_connection_watcher` -/// ## for infinitive establishing Redis connection on existing client -/// -/// > only for situation when local isn't None (no need to fck redis server) -/// -/// *input* : `&Client` -/// -/// *output* : `Connection` -/// -/// *initiator* : fn `get_actual_config` -/// -/// *managing* : &Client for opening connection -/// -/// *depends on* : struct `redis::Connection` -/// -fn get_connection_watcher(client: &Client) -> Connection { - loop { - match client.get_connection() { - Ok(conn) => { - info!("Successfully got Redis connection object"); - return conn; - } - Err(_) => { - error!( - "Redis connection attempt is failed. Check Redis configuration! Retrying..." - ); - std::thread::sleep(Duration::from_secs(4)); - } - } - } -} - /// # Fn `restart_main_thread` /// ## for restart monitor with new config /// @@ -686,87 +429,10 @@ fn get_connection_watcher(client: &Client) -> Connection { /// fn restart_main_thread() -> std::io::Result<()> { let current_exe = env::current_exe()?; - Command::new(current_exe).exec(); + let _ = Command::new(current_exe).exec(); Ok(()) } -/// # Fn `subscribe_config_stream` -/// ## for subscribe on changes, pulling to Redis pubsub to get more actual config -/// -/// *input* : `Arc` -/// -/// *output* : `Ok(())` on end of work | `Err(er)` on error with subscribing mechanism -/// -/// *initiator* : fn `subscribe_config_stream` -/// -/// *managing* : `Arc` to compare old config with new pulled -/// -/// *depends on* : `Processes` -/// -pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { - let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); - - if params.no_sub { - return Err(CustomError::Fatal); - } - if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { - if let Ok(mut conn) = client.get_connection() { - match crate::utils::get_container_id() { - Some(channel_name) => { - let channel_name = channel_name.trim(); - let mut pubsub = conn.as_pubsub(); - if pubsub.subscribe(&channel_name).is_ok() { - info!("Runner subscribed on config update publishing in channel {}", &channel_name); - loop { - if let Ok(msg) = pubsub.get_message() { - let get_remote_config: Result = msg.get_payload(); - match get_remote_config { - Ok(payload) => { - if let Some(remote_config) = parse_extern_config(&payload) { - match config_comparing(&actual_prcs, &remote_config) { - ConfigActuality::Remote => { - warn!("Pulled config is actual. Saving and restarting..."); - if save_new_config(&remote_config, config_path).is_err() { - error!("Error with saving new config to {}. Stopping sub mechanism...", config_path); - return Err(CustomError::Fatal); - } - if restart_main_thread().is_err() { - error!("Error with restarting Runner. Stopping sub mechanism..."); - return Err(CustomError::Fatal); - } - } - _ => { - warn!("Pulled new config. Current config is more actual ..."); - continue - }, - } - } - else { - error!("Invalid conig was pulled"); - } - }, - Err(_) => { - error!("Cannot extract new config from message"); - break; - }, - } - } - sleep(Duration::from_secs(30)).await; - } - } else { - error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name); - } - }, - None => { - error!("Cannot get channel name"); - } - } - } - } - error!("Error with subscribing Redis stream on update. Working only with selected config..."); - Err(CustomError::Fatal) -} - /// # Fn `config_comparing` /// ## for compare old and new configs /// @@ -780,7 +446,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc ConfigActuality { +pub fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { if local.is_default() { return ConfigActuality::Remote; } @@ -793,14 +459,6 @@ fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { } } -// ! TEMPORARILY DEPRECATED ! -// fn native_date_from_millis(mls: &str) -> Option> { -// match mls.parse::(){ -// Ok(val) => return chrono::DateTime::from_timestamp_millis(val), -// Err(_) => return None, -// } -// } - /// # Fn `save_new_config` /// ## mechanism for saving new config in local storage /// @@ -889,11 +547,6 @@ mod config_unittests { assert_eq!(config_comparing(&a, &b), ConfigActuality::Remote); } - // TODO : strange output - // #[test] - // fn get_actual_config_mechanism() { - // assert!(get_actual_config().is_some()) - // } #[test] fn save_config() { let a = Processes { diff --git a/noxis-rs/src/options/logger.rs b/noxis-rs/src/options/logger.rs index 14cd92c..314cbf7 100644 --- a/noxis-rs/src/options/logger.rs +++ b/noxis-rs/src/options/logger.rs @@ -49,7 +49,7 @@ pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> { record.args(), ) }) - .filter(None, LevelFilter::Info) + .filter(None, LevelFilter::from_env()) .target(env_logger::Target::Stdout) // temporary deprecated // .target(env_logger::Target::Pipe(log_target)) @@ -58,6 +58,29 @@ pub fn setup_logger() -> Result<(), crate::options::structs::CustomError> { Ok(()) } +trait FromEnv { + fn from_env() -> LevelFilter; +} + +impl FromEnv for LevelFilter { + fn from_env() -> LevelFilter { + return match std::env::var("NOXIS_MAX_LOG_LEVEL") { + Ok(var) => { + match var.to_ascii_lowercase().trim().as_ref() { + "trace" => LevelFilter::Trace, + "debug" => LevelFilter::Debug, + "info" => LevelFilter::Info, + "error" => LevelFilter::Error, + "warn" => LevelFilter::Warn, + "off" => LevelFilter::Off, + _ => LevelFilter::Info, + } + }, + Err(_) => LevelFilter::Info, + } + } +} + #[cfg(test)] mod logger_tests { use super::*; diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 245db0c..df16a48 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -3,6 +3,7 @@ //! #[allow(unused_imports)] use anyhow::{Result, Error}; +use log::warn; use std::path::PathBuf; use std::env::var; use dotenv::dotenv; @@ -19,9 +20,9 @@ use dotenv::dotenv; /// noxis-rs ... --metrics none /// ``` /// -#[derive(clap::ValueEnum, Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum MetricsPrebootParams { - Full, + Full, System, Processes, Net, @@ -176,17 +177,18 @@ impl std::fmt::Display for MetricsPrebootParams { /// export NOXIS_METRICS_MODE "full" /// ``` /// -#[derive(Debug)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct PrebootParams { - pub no_hostagent : bool, + // pub no_hostagent : bool, pub no_logs: bool, pub refresh_logs : bool, pub no_sub : bool, - pub socket_path : PathBuf, + // pub socket_path : PathBuf, pub log_to : PathBuf, pub remote_server_url : String, pub config : PathBuf, pub metrics: MetricsPrebootParams, + pub self_socket : PathBuf, } /// # implementation for `MetricsPrebootParams` @@ -196,12 +198,12 @@ impl PrebootParams { dotenv().ok(); Self { // bool - no_hostagent : { - match var("NOXIS_NO_HAGENT") { - Ok(_) => true, - Err(_) => false, - } - }, + // no_hostagent : { + // match var("NOXIS_NO_HAGENT") { + // Ok(_) => true, + // Err(_) => false, + // } + // }, no_logs : { match var("NOXIS_NO_LOGS") { Ok(_) => true, @@ -221,12 +223,12 @@ impl PrebootParams { } }, // vals - socket_path : { - match var("NOXIS_HAGENT_SOCKET_PATH") { - Ok(val) => PathBuf::from(val), - Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"), - } - }, + // socket_path : { + // match var("NOXIS_HAGENT_SOCKET_PATH") { + // Ok(val) => PathBuf::from(val), + // Err(_) => PathBuf::from("/var/run/enode/hostagent.sock"), + // } + // }, log_to : { match var("NOXIS_LOG_TO") { Ok(val) => PathBuf::from(val), @@ -251,6 +253,16 @@ impl PrebootParams { Err(_) => MetricsPrebootParams::Full, } }, + self_socket : { + match var("NOXIS_SOCKET_PATH") { + Ok(val) => PathBuf::from(val), + Err(_) => { + let default = std::env::current_dir().expect("Crushed on getting current_dir path. Check fs state!"); + warn!("$NOXIS_SOCKET_PATH wans't set. Default value - {}", default.display()); + PathBuf::from(default) + }, + } + }, } } } diff --git a/noxis-rs/src/options/signals.rs b/noxis-rs/src/options/signals.rs index f840510..2a45850 100644 --- a/noxis-rs/src/options/signals.rs +++ b/noxis-rs/src/options/signals.rs @@ -1,4 +1,3 @@ -use super::structs::CustomError; use std::sync::Arc; use tokio::io; use tokio::sync::mpsc; diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index d37011d..51855d5 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use async_trait::async_trait; use std::sync::Arc; + #[derive(Debug)] pub enum DependencyType { File, @@ -93,7 +94,21 @@ pub enum ProcessState { Holding, Stopped, StoppedByCli, + HoldingByCli, } +impl std::fmt::Display for ProcessState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return match self { + ProcessState::Pending => write!(f, "Running"), + ProcessState::Holding => write!(f, "Holding"), + ProcessState::Stopped => write!(f, "Stopped"), + ProcessState::StoppedByCli => write!(f, "Forcibly stopped"), + ProcessState::HoldingByCli => write!(f, "Forcibly holding"), + } + } +} + + #[derive(Debug)] pub enum Events { Positive(Arc), @@ -158,6 +173,9 @@ impl Processes { pub fn is_default(&self) -> bool { self.date_of_creation.is_empty() } + pub fn get_version(&self) -> &str { + &self.date_of_creation + } } /// # Struct for the 2nd level in json conf file diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index b1e315a..2ef0d08 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -4,31 +4,23 @@ pub mod metrics; pub mod prcs; pub mod services; -// TODO : saving current flags state - -use crate::options::structs::{CustomError, TrackingProcess, Processes}; -// use files::create_watcher; -// use files::file_handler; -// use inotify::Inotify; -use log::{error, warn, info}; -use prcs::{ - freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, - unfreeze_process, -}; -// use services::service_handler; +use crate::options::structs::Processes; +use log::{error, info}; use std::process::Command; use std::sync::Arc; -// use tokio::join; use tokio::sync::mpsc; use tokio::time::Duration; -// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; -// controllers import use prcs::v2::ProcessesController; use files::v2::FilesController; use services::v2::ServicesController; use async_trait::async_trait; +use lazy_static::lazy_static; -const GET_ID_CMD: &str = "hostname"; +lazy_static! { + static ref GET_ID_CMD : &'static str = "hostname"; +} + +// const GET_ID_CMD: &str = "hostname"; pub mod v2 { use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; @@ -47,14 +39,16 @@ pub mod v2 { prcs : LinkedList, files : LinkedList, services : LinkedList, + config : Arc, } impl Supervisor { pub fn new() -> Supervisor { - Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()} + Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new(), config: Arc::new(Processes::default()) } } - pub async fn with_config(mut self, config: &Processes) -> Supervisor { - let _ = config.processes.iter() + pub async fn with_config(mut self, config: Processes) -> Supervisor { + self.config = Arc::from(config); + let _ = self.config.processes.iter() .for_each(|prc| { let (rx, tx) = mpsc::channel::(10); let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); @@ -127,6 +121,9 @@ pub mod v2 { async fn process(&mut self) { info!("Initializing monitoring ..."); loop { + // + // todo: CHANNEL check and reaction + // // dbg!(&self); let mut tasks: Vec> = vec![]; // let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap()); @@ -177,242 +174,13 @@ pub mod v2 { pub async fn init_monitoring( config: Processes ) -> anyhow::Result<()> { - let mut supervisor = Supervisor::new().with_config(&config).await; + let mut supervisor = Supervisor::new().with_config(config).await; info!("Monitoring: {} ", &supervisor.get_stats()); supervisor.process().await; Ok(()) } - - - // async fn generate_controllers<'a>(config: Processes) -> (HashSet>, HashSet>, HashSet>) { - // let mut prcs: HashSet> = HashSet::new(); - // let mut files: HashSet> = HashSet::new(); - // let mut services: HashSet> = HashSet::new(); - // for prc in config.processes { - // let (rx, tx) = mpsc::channel::>(10); - // // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path); - // let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path); - // let a = new_prc.process().await; - - // } - // (prcs, files, services) - // } - // spawn prc check with semaphore check - async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) } - - // spawn file check with semaphore check - async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) } - - // spawn service check with semaphore check - async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) } } -/// # Fn `run_daemons` -/// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel -/// -/// *input* : `Arc`, `Arc>`, `&mut mpsc::Receiver`, -/// -/// *output* : () -/// -/// *initiator* : main thread -/// -/// *managing* : Arc to current process struct, Arc to managing channel writer, mut ref to managing channel reader -/// -/// *depends on* : all module `prcs`'s functions, fn `running_handler`, fn `utils::files::create_watcher` -/// -/// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process -/// -// pub async fn run_daemons( -// proc: Arc, -// tx: Arc>, -// rx: &mut mpsc::Receiver, -// ) { -// // creating watchers + ---buffers--- -// let mut watchers: Vec = vec![]; -// for file in proc.dependencies.files.clone().into_iter() { -// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { -// watchers.push(watcher); -// } else { -// let _ = tx.send(121).await; -// } -// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); -// } -// let watchers_clone: Arc>> = -// Arc::new(tokio::sync::Mutex::new(watchers)); - -// loop { -// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); -// tokio::select! { -// _ = run_hand => continue, -// _val = rx.recv() => { -// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { -// return; -// } -// }, -// } -// tokio::task::yield_now().await; -// } -// } - -async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ - match val { - // 1 - File-dependency handling error -> terminating (after waiting) - 1 => { - if is_active(&proc.name).await { - error!("File-dependency handling error: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(500)).await; - } - // return; - }, - // 2 - File-dependency handling error -> holding (after waiting) - 2 => { - if !is_frozen(&proc.name).await { - error!("File-dependency handling error: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - }, - // 3 - Running process error - 3 => { - error!("Error due to starting {} process", &proc.name); - return Err(CustomError::Fatal) - }, - // 4 - Timeout of waiting service-dependency -> staying (after waiting) - 4 => { - // warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 5 - Timeout of waiting service-dependency -> terminating (after waiting) - 5 => { - if is_active(&proc.name).await { - error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(500)).await; - } - }, - // 6 - Timeout of waiting service-dependency -> holding (after waiting) - 6 => { - // println!("holding {}-{}", proc.name, is_active(&proc.name).await); - if !is_frozen(&proc.name).await { - error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_secs(1)).await; - } - }, - // // 7 - File-dependency change -> terminating (after check) - 7 => { - error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - return Err(CustomError::Fatal) - }, - // // 8 - File-dependency change -> restarting (after check) - 8 => { - warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); - let _ = restart_process(&proc.name, &proc.path).await; - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // // 9 - File-dependency change -> staying (after check) - 9 => { - warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - - // 10 - Process unfreaze call via file handler (or service handler) - 10 | 11 => { - if is_frozen(&proc.name).await { - warn!("Unfreezing process {} call...", &proc.name); - unfreeze_process(&proc.name).await; - } - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 11 - Process unfreaze call via service handler - // 11 => { - // if is_frozen(&proc.name).await { - // warn!("Unfreezing process {} call...", &proc.name); - // unfreeze_process(&proc.name).await; - // } - // tokio::time::sleep(Duration::from_millis(100)).await; - // }, - // 101 - Impermissible trigger values in JSON - 101 => { - error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", &proc.name); - if is_active(&proc.name).await { - terminate_process(&proc.name).await; - } - return Err(CustomError::Fatal) - }, - // - // 121 - Cannot create valid watcher for file dependency - // todo : think about valid situation - 121 => { - error!("Cannot create valid watcher for file dependency. Terminating {} process...", &proc.name); - let _ = terminate_process(&proc.name).await; - return Err(CustomError::Fatal) - }, - // 111 - global thread termination with killing current child in a face - // of a current process - 111 => { - warn!("Terminating {}'s child processes...", &proc.name); - match is_active(&proc.name).await { - true => { - terminate_process(&proc.name).await; - }, - false => { - log::info!("Process {} is already terminated!", proc.name); - }, - } - }, - _ => {}, - } - Ok(()) -} -// check process status daemon -/// # Fn `run_daemons` -/// ## func to async exec subjobs of checking process, services and files states -/// -/// *input* : `Arc`, `Arc>`, `Arc>>` -/// -/// *output* : () -/// -/// *initiator* : fn `run_daemons` -/// -/// *managing* : Arc to current process struct, Arc to Mutex to list of file watchers -/// -/// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` -/// -// pub async fn running_handler( -// prc: Arc, -// tx: Arc>, -// watchers: Arc>>, -// ) { -// // services and files check (once) -// let files_check = file_handler( -// &prc.name, -// &prc.dependencies.files, -// tx.clone(), -// watchers.clone(), -// ); -// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - -// let res = join!(files_check, services_check); -// // if inactive -> spawn checks -> active is true -// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { -// if start_process(&prc.name, &prc.path).await.is_err() { -// tx.send(3).await.unwrap(); -// return; -// } -// } -// // if frozen -> spawn checks -> unfreeze is true -// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { -// tx.send(10).await.unwrap(); -// return; -// } -// // tokio::time::sleep(Duration::from_millis(100)).await; -// tokio::task::yield_now().await; -// } - // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` /// ## for getting container id used in logs @@ -428,7 +196,7 @@ async fn process_protocol_symbol(proc: Arc, val: u8) -> Result< /// *depends on* : - /// pub fn get_container_id() -> Option { - match Command::new(GET_ID_CMD).output() { + match Command::new(*GET_ID_CMD).output() { Ok(output) => { if !output.status.success() { return None; diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index da87b0f..ad3b713 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -1,16 +1,12 @@ - use crate::options::structs::{CustomError, Files}; - use super::prcs::{is_active, is_frozen}; - use inotify::{EventMask, Inotify, WatchMask}; - use std::borrow::BorrowMut; - use std::path::Path; - use std::sync::Arc; - use tokio::sync::mpsc; - use tokio::sync::mpsc::Sender as Sender; - use tokio::time::Duration; - use crate::options::structs::Events; - use async_trait::async_trait; +use crate::options::structs::CustomError; +use inotify::{EventMask, Inotify, WatchMask}; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::mpsc::Sender as Sender; +use crate::options::structs::Events; +use async_trait::async_trait; - pub mod v2 { +pub mod v2 { use log::{error, info, warn}; use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; use super::*; @@ -42,6 +38,7 @@ } impl FilesController { + #[inline(always)] pub fn new(name: &str, triggers: EventHandlers) -> FilesController { let name: Arc = Arc::from(name); Self { @@ -53,6 +50,7 @@ code_name : name.clone(), } } + #[inline(always)] pub fn with_path(mut self, path: impl AsRef) -> anyhow::Result { self.path = path.as_ref().to_string_lossy().into_owned(); self.watcher = { @@ -90,26 +88,23 @@ #[async_trait] impl ProcessUnit for FilesController { async fn process(&mut self) { - // polling file check - // 1) existing check - // dbg!(&self); if let Ok(_) = check_file(&self.name, &self.path).await { if let FileState::NotFound = self.state { info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name); self.state = FileState::Ok; - // reseting negative outcome in prc self.trigger_on(None).await; } match &mut self.watcher { Some(notify) => { - let mut buffer = [0; 1024]; - if let Ok(mut notif_events) = notify.read_events(&mut buffer) { - // notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);}); - // todo!(); - if let (recreate_watcher, true) = ( - notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF), - notif_events.any(|mask| mask.mask == EventMask::MODIFY) - ) { + let mut buffer = [0; 128]; + if let Ok(notif_events) = notify.read_events(&mut buffer) { + let (need_to_recreate, was_modifired) = notif_events.fold((false, false), |(a, b), mask| { + ( + a || mask.mask == EventMask::DELETE_SELF, + b || mask.mask == EventMask::MODIFY, + ) + }); + if let (recreate_watcher, true) = (need_to_recreate, was_modifired) { warn!("File {} ({}) was changed", self.name, &self.path); if recreate_watcher { self.watcher = match create_watcher(&self.name, &self.path) { @@ -140,10 +135,9 @@ return; } self.trigger_on(None).await; - // 2) change check } } - } +} /// # Fn `create_watcher` /// ## for creating watcher on file's delete | update events @@ -165,105 +159,6 @@ Ok(inotify) } - /// # Fn `create_watcher` - /// ## for managing processes by checking dep files' states - /// - /// *input* : `&str`, `&[Files]`, `Arc>`, `Arc>>` - /// - /// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check - /// - /// *initiator* : fn `utils::running_handler` - /// - /// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` - /// - /// *depends on* : Files - /// - pub async fn file_handler( - name: &str, - files: &[Files], - tx: Arc>, - watchers: Arc>>, - ) -> anyhow::Result<()> { - for (i, file) in files.iter().enumerate() { - // let src = format!("{}{}", file.src, file.filename); - if check_file(&file.filename, &file.src).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(anyhow::Error::msg("Process is frozen or stopped")); - } - match file.triggers.on_delete.as_str() { - "stay" => { - tx.send(9).await.unwrap(); - continue; - } - "stop" => { - if is_active(name).await { - tx.send(1).await.unwrap(); - } - return Err(anyhow::Error::msg("Process was stopped")); - } - "hold" => { - if is_active(name).await { - tx.send(2).await.unwrap(); - return Err(anyhow::Error::msg("Process was frozen")); - } - } - _ => { - tokio::time::sleep(Duration::from_millis(50)).await; - tx.send(101).await.unwrap(); - return Err(anyhow::Error::msg("Impermissible character or word in file trigger")); - } - } - } else if is_active(name).await && !is_frozen(name).await { - let watchers = watchers.clone(); - // println!("mutex: {:?}", watchers); - let mut buffer = [0; 128]; - let mut mutex_guard = watchers.lock().await; - if let Some(notify) = mutex_guard.get_mut(i) { - let events = notify.read_events(&mut buffer); - // println!("{:?}", events); - if events.is_ok() { - let events: Vec = events - .unwrap() - .map(|mask| mask.mask) - .filter(|mask| { - *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF - }) - .collect(); - for event in events { - if let EventMask::DELETE_SELF = event { - // ! warning (DELETE_SELF event) ! - // println!("! warning (DELETE_SELF event) !"); - // * watcher recreation after dealing with file recreation mechanism in text editors - let mutex = notify.borrow_mut(); - - // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); - if let Ok(watcher) = create_watcher(&file.filename, &file.src) { - *mutex = watcher; - } - } - match file.triggers.on_change.as_str() { - "stop" => { - let _ = tx.send(7).await; - } - "restart" => { - let _ = tx.send(8).await; - } - "stay" => { - let _ = tx.send(9).await; - } - _ => { - let _ = tx.send(101).await; - } - } - } - } - } - } - } - tokio::task::yield_now().await; - Ok(()) - } - /// # Fn `check_file` /// ## for checking existance of current file /// diff --git a/noxis-rs/src/utils/metrics.rs b/noxis-rs/src/utils/metrics.rs index 756e44b..aa7774d 100644 --- a/noxis-rs/src/utils/metrics.rs +++ b/noxis-rs/src/utils/metrics.rs @@ -2,18 +2,136 @@ // cpu load, ram/rom load and net activity // use std::sync::Mutex; -use std::sync::Arc; -use crate::options::structs::TrackingProcess; -use sysinfo::{Process, System}; -use tokio::join; -use crate::options::structs::{ProcessMetrics, ContainerMetrics}; -use super::get_container_id; +use std::{collections::BTreeMap, sync::Arc}; +use crate::options::structs::{ProcessState, TrackingProcess}; +use sysinfo::{System, Disks as DisksList, Networks}; +use crate::options::structs::Dependencies; +use serde::Serialize; +use super::prcs::v2::Pid; // use pcap::{Device, Capture, Active}; // use std::net::Ipv4Addr; // use anyhow::{Result, Ok}; // type PacketBuffer = Arc>>; +type CoreUsage = BTreeMap; +type Disks = Vec; +type Ifaces = Vec; +pub type MetricProcesses = Vec; + + +#[derive(Serialize, Debug)] +struct FullMetrics { + hostname : String, + os : String, + kernel : String, + cpu : Cpu, + ram : Ram, + disks : Disks, + networks : Ifaces, + processes : MetricProcesses, +} + +#[derive(Debug)] +struct HostInfo { + hostname : String, + os : String, + kernel : String, +} + + +#[derive(Serialize, Debug)] +struct Cpu { + global_usage : f32, + usage : CoreUsage, +} + +#[derive(Serialize, Debug)] +struct CoreInfo { + name: String, + brand : String, + frequency : u64, + vendor_id : String, + usage : f32, +} + +#[derive(Serialize, Debug)] +struct Ram { + free_mem : u64, + free_swap : u64, + total_mem : u64, + total_swap : u64 +} + +#[derive(Serialize, Debug)] +struct Disk { + name : String, + kind : String, + fs : String, + mount_point : String, + total_space : u64, + available_space : u64, + is_removable : bool, + is_readonly : bool, +} + + // vec +#[derive(Serialize, Debug)] +struct Network { + iname : String, + mac : String, + recieved : u64, + transmitted : u64, + total_recieved_bytes : u64, + total_transmitted_bytes : u64, + total_recieved_packets : u64, + total_transmitted_packets : u64, + errors_on_recieved : u64, + errors_on_transmitted : u64, +} + +#[derive(Serialize, Debug)] +pub struct ProcessesExtended { + name : String, + status : String, + pid : Pid, + dependencies : Dependencies, + cpu_usage : f32, + ram_usage : f32, + virtual_mem_usage : u64, + disks_usage_read_bytes: u64, + disks_usage_write_bytes: u64, +} + +impl ProcessesExtended { + pub fn from_old_with_params( + old : Arc, + pid : Pid, + status : ProcessState, + ) -> Self { + Self { + name : old.name.clone(), + status : status.to_string(), + pid, + dependencies : old.dependencies.clone(), + cpu_usage : 0.0, + ram_usage : 0.0, + virtual_mem_usage : 0, + disks_usage_read_bytes: 0, + disks_usage_write_bytes: 0, + } + } + fn add_metrics(&mut self, system : &mut System) { + if let Some(prc) = system.process(self.pid.new_sysinfo_pid()) { + self.cpu_usage = prc.cpu_usage() / system.cpus().len() as f32; + self.ram_usage = (system.total_memory() as f32) / (prc.memory() as f32); + self.disks_usage_read_bytes = prc.disk_usage().total_read_bytes; + self.disks_usage_write_bytes = prc.disk_usage().total_written_bytes; + self.virtual_mem_usage = prc.virtual_memory(); + } + } +} + /// # Fn `init_metrics_grubber` /// ## for initializing process of unstoppable grubbing metrics. /// @@ -28,204 +146,117 @@ use super::get_container_id; /// *depends on* : - /// #[allow(dead_code)] -pub async fn init_metrics_grubber() { +pub async fn init_metrics_grubber( + /* BROADCSAT LISTENER TO GET `PROCESSES` OBJ */ +) { let mut system = System::new(); - // let mut buffer: Vec = vec![]; - // let shared_buf: PacketBuffer = Arc::new(Mutex::new(buffer)); + get_all_metrics(&mut system).await; +} +async fn get_all_metrics(system: &mut System) { system.refresh_all(); - // let temp = String::from_utf8(get_pid("systemd").await.unwrap().stdout).unwrap(); - // let prc = system.process(Pid::from_str(&temp).unwrap()).unwrap(); - // prc. - // let _ = capture_packets(shared_buf.clone()).await; + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + dbg!(get_host_info().await); + dbg!(get_cpu_metrics(system).await); + dbg!(get_ram_metrics(system).await); + dbg!(get_all_disks_metrics().await); + dbg!(get_all_ifaces_metrics().await); } -#[allow(dead_code)] -#[allow(unused_variables)] -async fn gather_metrics(proc: Arc) { - +async fn get_host_info() -> HostInfo { + HostInfo { + hostname : System::host_name().unwrap_or_default(), + os : System::long_os_version().unwrap_or_default(), + kernel : System::kernel_version().unwrap_or_default(), + } } -// DEPRECATED : for net monitoring -// async fn capture_packets(buffer: PacketBuffer) -> Result<()> { -// let mut cap = Capture::from_device(Device::lookup()?.unwrap())? -// .promisc(true) -// .open()?; +async fn get_cpu_metrics(system: &mut System) -> Cpu { + system.refresh_cpu_all(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; -// cap.filter("not broadcast and not multicast", true)?; + let mut buffer = CoreUsage::new(); + let global_usage = system.global_cpu_usage(); -// while let core::result::Result::Ok(packet) = cap.next_packet() { -// if let Some((src, dst, prot)) = get_packet_info(&packet.data).await { -// let packet_info = PacketInfo::new(String::from(prot), dst, src, packet.header.len as usize); -// let mut locked_buffer = buffer.lock().unwrap(); -// println!("{:?}", &packet_info); -// locked_buffer.push(packet_info); -// } -// } -// Ok(()) -// } -// async fn get_packet_info(data: &[u8]) -> Option<(Ipv4Addr, Ipv4Addr, &str)> { -// if data.len() >= 20 { -// let src_ip = Ipv4Addr::new(data[12], data[13], data[14], data[15]); -// let dst_ip = Ipv4Addr::new(data[16], data[17], data[18], data[19]); -// let protocol = match data[9] { -// 1 => "ICMP", -// 6 => "TCP", -// 17 => "UDP", -// _ => "Unknown", -// }; - -// Some((src_ip, dst_ip, protocol)) -// } else { -// None -// } -// } + system.cpus() + .iter() + .enumerate() + .for_each(|(id, cpu)| { + let core_info = CoreInfo { + // id, + brand : cpu.brand().to_string(), + name : cpu.name().to_string(), + frequency : cpu.frequency(), + vendor_id : cpu.vendor_id().to_string(), + usage : cpu.cpu_usage(), + }; + // buffer.push(core_info); + buffer.entry(id).or_insert(core_info); + }); - -/// # Fn `get_all_container_metrics` -/// ## for gathering all container (whole system metrics) -/// -/// *input* : `Arc`, `Arc>` -/// -/// *output* : `ContainerMetrics` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object, ref counter to list of processes -/// -/// *depends on* : `TrackingProcess` -/// -#[allow(dead_code)] -async fn get_all_container_metrics(sys: Arc, prcs: Arc>) -> ContainerMetrics { - let metrics = join!( - get_cpu_metrics_container(sys.clone()), - get_ram_metrics_container(sys.clone()), - get_subsystems(prcs.clone()) - ); - ContainerMetrics::new( - &get_container_id().unwrap_or(String::from("unknown")), - metrics.0, - metrics.1, - metrics.2 - ) + Cpu { + global_usage, + usage: buffer + } } -/// # Fn `get_cpu_metrics_container` -/// ## for gathering container cpu metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_cpu_metrics_container(sys: Arc) -> f32 { - sys.global_cpu_usage() +async fn get_ram_metrics(system: &mut System) -> Ram { + system.refresh_memory(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + Ram { + free_mem : system.free_memory(), + free_swap : system.free_swap(), + total_mem : system.total_memory(), + total_swap : system.total_swap(), + } } -/// # Fn `get_ram_metrics_container` -/// ## for gathering container ram metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `System` object -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_ram_metrics_container(sys: Arc) -> f32 { - (sys.used_memory() / sys.total_memory()) as f32 * 100.0 -} -// async fn get_mem_metrics_container(sys: Arc) -> f32 { -// sys. -// } - -/// # Fn `get_subsystems` -/// ## for gathering info about container subsystems (processes) -/// -/// *input* : `Arc>` -/// -/// *output* : `Vec` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to list of `TrackingProcess` -/// -/// *depends on* : `TrackingProcess` -/// -#[allow(dead_code)] -async fn get_subsystems(prcs: Arc>) -> Vec { - prcs.iter().map(|process| process.name.clone()).collect() +async fn get_all_disks_metrics() -> Disks { + let disks = DisksList::new_with_refreshed_list(); + let mut buffer = Disks::new(); + disks.list() + .iter() + .for_each(|disk| { + let disk = Disk { + name : disk.name().to_string_lossy().into_owned(), + kind: disk.kind().to_string(), + fs : disk.file_system().to_string_lossy().into_owned(), + mount_point : disk.mount_point().to_string_lossy().into_owned(), + total_space : disk.total_space(), + available_space : disk.available_space(), + is_removable : disk.is_removable(), + is_readonly : disk.is_read_only() + }; + buffer.push(disk); + }); + buffer } -/// # Fn `get_all_metrics_process` -/// ## for gathering all process' metrics -/// -/// *input* : `Arc`, `Arc` -/// -/// *output* : `ProcessMetrics` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : two ref counters to `Process` and `System` -/// -/// *depends on* : - -/// -#[allow(dead_code)] -async fn get_all_metrics_process(proc: Arc, sys: Arc) -> ProcessMetrics { - let metrics = join!( - get_cpu_metrics_process(proc.clone()), - get_ram_metrics_process(proc.clone(), sys.clone()) - ); - ProcessMetrics::new( - proc.name().to_str().unwrap_or("unknown"), - metrics.0, - metrics.1 - ) +async fn get_all_ifaces_metrics() -> Ifaces { + let mut ifaces = Ifaces::new(); + let networks = Networks::new_with_refreshed_list(); + networks.iter() + .for_each(|(iface_name, data)| { + let mac = data.mac_address().to_string(); + let iface = Network { + iname : iface_name.to_owned(), + mac : mac, + recieved : data.received(), + transmitted : data.transmitted(), + total_recieved_bytes : data.total_received(), + total_transmitted_bytes : data.total_transmitted(), + total_recieved_packets : data.total_packets_received(), + total_transmitted_packets : data.total_packets_transmitted(), + errors_on_recieved : data.errors_on_received(), + errors_on_transmitted : data.errors_on_transmitted(), + }; + ifaces.push(iface); + }); + ifaces } -/// # Fn `get_cpu_metrics_process` -/// ## for gathering process cpu metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `Process` object -/// -/// *depends on* : - -/// -async fn get_cpu_metrics_process(proc: Arc) -> f32 { - proc.cpu_usage() -} - -/// # Fn `get_ram_metrics_process` -/// ## for gathering process ram metrics -/// -/// *input* : `Arc` -/// -/// *output* : `f32` -/// -/// *initiator* : main thread ?? -/// -/// *managing* : ref counter to `Process` object -/// -/// *depends on* : - -/// -async fn get_ram_metrics_process(proc: Arc, sys: Arc) -> f32 { - (proc.memory() as f64 / sys.total_memory() as f64) as f32 * 100.0 as f32 -} +async fn get_all_processes_metrics(system: &mut System) {} #[cfg(test)] mod metrics_unittets { diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index 3f10903..dfb6c7e 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -6,17 +6,39 @@ use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUni use std::collections::HashSet; use tokio::sync::mpsc::Receiver as MpscReciever; use async_trait::async_trait; +use serde::Serialize; pub mod v2 { use log::info; + use tokio::time::sleep; use crate::options::structs::DependencyType; use std::path::Path; use super::*; + + #[derive(Debug, Serialize, Clone, Copy)] + pub struct Pid(u32); + + impl std::fmt::Display for Pid { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return write!(f, "{}", self.0); + } +} + + impl Pid { + fn new() -> Self { + Pid(0) + } + #[allow(unused)] + pub fn new_sysinfo_pid(&self) -> sysinfo::Pid { + sysinfo::Pid::from_u32(self.0 as u32) + } + } #[derive(Debug)] pub struct ProcessesController { - name: Arc, + pub name: Arc, + pub pid : Pid, bin: String, // obj: Arc, state: ProcessState, @@ -31,19 +53,26 @@ pub mod v2 { } impl ProcessesController { + #[inline(always)] pub fn new(name: &str, event_reader: MpscReciever) -> ProcessesController { ProcessesController { name : Arc::from(name), + pid : Pid::new(), bin : String::new(), state : ProcessState::Stopped, event_reader, negative_events : HashSet::new(), } } + #[inline(always)] pub fn with_exe(mut self, bin: impl AsRef) -> ProcessesController { self.bin = bin.as_ref().to_string_lossy().into_owned(); self } + #[allow(unused)] + pub fn get_pid(&self) -> Pid { + self.pid + } async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { match trigger { @@ -53,46 +82,158 @@ pub mod v2 { "stop" => { if is_active(&self.name).await { info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); - terminate_process(&self.name).await; - self.state = ProcessState::Stopped; + match terminate_process(&self.name).await { + Ok(_) => { + info!("Process {} was stopped ...", &self.name); + self.state = ProcessState::Stopped; + self.pid = Pid::new(); + }, + Err(er) => { + error!("Cannot stop process {} : {}", self.name, er); + }, + } + } + }, + "user-stop" => { + if is_active(&self.name).await { + info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Stop Call", self.name); + match terminate_process(&self.name).await { + Ok(_) => { + info!("Process {} was forcefully stopped ...", &self.name); + self.state = ProcessState::StoppedByCli; + self.pid = Pid::new(); + }, + Err(er) => { + error!("Cannot forcefully stop process {} : {}", self.name, er); + }, + } + } + }, + "user-hold" => { + if is_active(&self.name).await { + info!("Event on {} `{}` for {}. Stopping ...", dep_type, "User Hold Call", self.name); + match freeze_process(&self.name).await { + Ok(_) => { + info!("Process {} was forcefully frozen ...", &self.name); + self.state = ProcessState::HoldingByCli; + // self.pid = Pid::new(); + }, + Err(er) => { + error!("Cannot forcefully freeze process {} : {}", self.name, er); + }, + } } }, "hold" => { if !is_frozen(&self.name).await { info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); - freeze_process(&self.name).await; - self.state = ProcessState::Holding; + match freeze_process(&self.name).await { + Ok(_) => { + info!("Process {} was frozen ...", &self.name); + self.state = ProcessState::Holding; + }, + Err(er) => { + error!("Cannot freeze process {} : {}", self.name, er); + }, + } } }, "restart" => { info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); - let _ = restart_process(&self.name, &self.bin).await; + let pid = restart_process(&self.name, &self.bin).await; + sleep(Duration::from_millis(100)).await; + if let Ok(pid) = pid { + self.pid = Pid(pid); + info!("{}: New PID - {}", self.name, self.pid); + } }, _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), } tokio::time::sleep(Duration::from_micros(100)).await; } + #[allow(unused)] + pub async fn stop_by_user_call(&mut self) -> anyhow::Result<()> { + terminate_process(&self.name).await?; + self.state = ProcessState::StoppedByCli; + self.pid = Pid::new(); + Ok(()) + } + #[allow(unused)] + pub async fn freeze_by_user_call(&mut self) -> anyhow::Result<()> { + freeze_process(&self.name).await?; + self.state = ProcessState::HoldingByCli; + Ok(()) + } + #[allow(unused)] + pub async fn start_by_user_call(&mut self) -> anyhow::Result<()> { + let pid = start_process(&self.name, &self.bin).await?; + self.state = ProcessState::Pending; + self.pid = Pid(pid); + Ok(()) + } + #[allow(unused)] + pub async fn unfreeze_by_user_call(&mut self) -> anyhow::Result<()> { + unfreeze_process(&self.name).await?; + self.state = ProcessState::Pending; + Ok(()) + } + #[allow(unused)] + pub async fn restart_by_user_call(&mut self) -> anyhow::Result<()> { + let pid = restart_process(&self.name, &self.bin).await?; + self.pid = Pid(pid); + Ok(()) + } } #[async_trait] impl ProcessUnit for ProcessesController { async fn process(&mut self) { if self.negative_events.len() == 0 { - match self.state { - ProcessState::Holding => { + let conditions = (is_active(&self.name).await, is_frozen(&self.name).await); + let state = &self.state; + match (state, conditions) { + (ProcessState::Holding, (_, _)) => { + info!("No negative dependecies events on {} frozen process. Unfreezing ...", self.name); + if let Err(er) = unfreeze_process(&self.name).await { + error!("Cannot unfreeze process {} : {}", self.name, er); + } else { + self.state = ProcessState::Pending; + info!("Process {} was unfreezed", &self.name); + } + }, + (ProcessState::Stopped, (_, _)) => { + info!("No negative dependecies events on stopped {} process. Starting ...", self.name); + match start_process(&self.name, &self.bin).await { + Ok(pid) => { + self.state = ProcessState::Pending; + self.pid = Pid(pid); + info!("{}: New PID - {}", self.name, self.pid); + }, + Err(er) => { + error!("Cannot start process {} : {}", self.name, er); + }, + } + }, + (ProcessState::Pending, (false, false)) => { + info!("{} process was impermissibly stopped. Starting ...", self.name); + match start_process(&self.name, &self.bin).await { + Ok(pid) => { + self.state = ProcessState::Pending; + self.pid = Pid(pid); + info!("{}: New PID - {}", self.name, self.pid); + }, + Err(er) => { + error!("Cannot start process {} : {}", self.name, er); + }, + } + }, + (ProcessState::Pending, (true, true)) => { info!("No negative dependecies events on {} process. Unfreezing ...", self.name); if let Err(er) = unfreeze_process(&self.name).await { error!("Cannot unfreeze process {} : {}", self.name, er); } else { self.state = ProcessState::Pending; - } - }, - ProcessState::Stopped => { - info!("No negative dependecies events on {} process. Starting ...", self.name); - if let Err(er) = start_process(&self.name, &self.bin).await { - error!("Cannot start process {} : {}", self.name, er); - } else { - self.state = ProcessState::Pending; + info!("Process {} was unfreezed", &self.name); } }, _ => {}, @@ -240,14 +381,11 @@ pub async fn is_frozen(name: &str) -> bool { /// /// *depends on* : - /// -pub async fn terminate_process(name: &str) { +pub async fn terminate_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") - .arg(name) - .output() - .unwrap_or_else(|_| { - error!("Failed to execute command 'pkill'"); - std::process::exit(101); - }); + .arg(name) + .output()?; + Ok(()) } /// # Fn `terminate_process` @@ -263,14 +401,11 @@ pub async fn terminate_process(name: &str) { /// /// *depends on* : - /// -pub async fn freeze_process(name: &str) { +pub async fn freeze_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") .args(["-STOP", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to freeze process"); - std::process::exit(101); - }); + .output()?; + Ok(()) } /// # Fn `unfreeze_process` @@ -306,8 +441,8 @@ pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { /// /// *depends on* : fn `start_process`, fn `terminate_process` /// -pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { - terminate_process(name).await; +pub async fn restart_process(name: &str, path: &str) -> anyhow::Result { + terminate_process(name).await?; tokio::time::sleep(Duration::from_millis(100)).await; start_process(name, path).await } @@ -325,18 +460,19 @@ pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { /// /// *depends on* : - /// -pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> { +pub async fn start_process(name: &str, path: &str) -> anyhow::Result { // let runsh = format!("{} {}", "exec", path); let mut command = Command::new(path); // command.arg(path); match command.spawn() { - Ok(_) => { + Ok(child) => { + let pid = child.id(); warn!("Process {} is running now!", name); - Ok(()) + Ok(pid) } Err(er) => { - Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er))) + Err(anyhow::Error::msg(format!("Cannot start process {} : {}", name, er))) } } } @@ -384,6 +520,7 @@ mod process_unittests { let res1 = start_process("freeze-check", "./tests/examples/freeze-check").await; assert!(res1.is_ok()); assert!(!is_frozen("freeze-check").await); + let _ = terminate_process("freeze-check").await; } #[tokio::test] async fn pidof_active_process() { diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index a381cb1..0d90921 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -1,12 +1,14 @@ -use crate::options::structs::CustomError; use log::{error, warn}; -use std::net::{TcpStream, ToSocketAddrs}; +use std::net::ToSocketAddrs; use std::sync::Arc; use tokio::time::Duration; use tokio::sync::mpsc::Sender as Sender; use async_trait::async_trait; +use std::pin::Pin; +use futures::future::Future; pub mod v2 { + use futures::FutureExt; use log::info; use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; @@ -42,6 +44,7 @@ pub mod v2 { } impl ServicesController { + #[inline(always)] pub fn new() -> ServicesController { ServicesController { name : String::new(), @@ -51,6 +54,7 @@ pub mod v2 { event_registrator : EventHandlers::new(), } } + #[inline(always)] pub fn with_access_name( mut self, hostname: &str, @@ -60,7 +64,7 @@ pub mod v2 { self.access_url = Arc::from(access_url); self } - + #[inline(always)] pub fn with_params( mut self, conn_queue: ConnectionQueue, @@ -95,20 +99,52 @@ pub mod v2 { self.event_registrator.entry(proc_name).or_insert((trigger, sender)); } async fn check_state(&self) -> anyhow::Result<()> { - let mut addrs = self.access_url.to_socket_addrs()?; - if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { - return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))) + let url = self.access_url.clone(); + let resolve_future = tokio::task::spawn_blocking(move || { + url.to_socket_addrs() + }); + let addrs: Vec<_> = match tokio::time::timeout(Duration::from_secs(1), resolve_future).await { + Ok(Ok(addrs)) => addrs?.collect(), + Ok(Err(er)) => return Err(er.into()), + Err(_) => return Err(anyhow::Error::msg("DNS resolution timeout")), + }; + + if addrs.is_empty() { + return Err(anyhow::Error::msg("No addresses resolved")); } + + let tasks: Vec<_> = addrs.into_iter().map(|addr| async move { + match tokio::time::timeout(Duration::from_secs(2), tokio::net::TcpStream::connect(&addr)).await { + Ok(Ok(_)) => Some(addr), + _ => None, + } + }).collect(); + let mut any_success = false; + for task in futures::future::join_all(tasks).await { + if task.is_some() { + any_success = true; + break; + } + } + if !any_success { + return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))); + } + Ok(()) } async fn trigger_on(&mut self) { match self.state { ServiceState::Ok => { - let _ = self.event_registrator - .iter() - .map(|(_, (_, el))| async { - let _ = el.send(Events::Positive(self.access_url.clone())).await; - }); + let futures : Vec + Send>>> = self.event_registrator.iter() + .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) + .map(|(prc, (serv, sender_opt))| async move { + info!("Notifying process {} ...", prc); + let _ = sender_opt.send(Events::Positive(serv.clone())).await; + }) + .map(|fut| fut.boxed()) + .collect(); + + futures::future::join_all(futures).await; }, ServiceState::Unavailable => { // looped check and notifying @@ -123,7 +159,6 @@ pub mod v2 { let timer = tokio::time::Instant::now(); let mut attempt: u32 = 1; let access_url = Arc::new(self.access_url.clone()); - // let event_registrator = &mut self.event_registrator; if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async { // let access_url = access_url.clone(); @@ -137,11 +172,22 @@ pub mod v2 { if state_check_result.is_ok() { info!("Connection to {} is `OK` now", &access_url); self.state = ServiceState::Ok; + let futures : Vec + Send>>> = self.event_registrator.iter() + .map(|(prc, (_, sender_opt))| (prc, (self.access_url.clone(), sender_opt))) + .map(|(prc, (serv, sender_opt))| async move { + info!("Notifying process {} ...", prc); + let _ = sender_opt.send(Events::Positive(serv.clone())); + }) + .map(|fut| fut.boxed()) + .collect(); + + futures::future::join_all(futures).await; break; } else { let now = timer.elapsed(); + let iterator = self.config.iter() - .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) + .filter(|(&wait, _)| tokio::time::Duration::from_secs(wait as u64) <= now) .flat_map(|(_, a)| a.iter().cloned()) .collect::>>(); @@ -150,7 +196,7 @@ pub mod v2 { info!("Trying to notify process `{}` ...", &proc_name); let sender_opt = self.event_registrator.get(&name) .map(|(trigger, sender)| - (trigger.to_service_negative_event(name.clone()), sender) + (trigger.to_service_negative_event(self.access_url.clone()), sender) ); if let Some((tr, tx)) = sender_opt { @@ -178,7 +224,7 @@ pub mod v2 { self.trigger_on().await; }, (ServiceState::Ok, Err(_)) => { - warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); + warn!("Unreachable for connection service `{}`. Initializing reconnect mechanism ...", &self.access_url); self.state = ServiceState::Unavailable; self.trigger_on().await; }, @@ -189,168 +235,15 @@ pub mod v2 { } } -/// # Fn `service_handler` -/// ## function to realize mechanism of current process' dep services monitoring -/// -/// *input* : `&str`, `&Vec`, `Arc>` -/// -/// *output* : () -/// -/// *initiator* : fn `utils::running_handler` -/// -/// *managing* : process name, ref of vec of dep services, ref counter to managing channel writer -/// -/// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` -/// -// pub async fn service_handler( -// name: &str, -// services: &Vec, -// tx: Arc>, -// ) -> Result<(), CustomError> { -// // println!("service daemon on {}", name); -// for serv in services { -// if check_service(&serv.hostname, &serv.port).await.is_err() { -// if !is_active(name).await || is_frozen(name).await { -// return Err(CustomError::Fatal); -// } -// error!( -// "Service {}:{} is unreachable for process {}", -// &serv.hostname, &serv.port, &name -// ); -// match serv.triggers.on_lost.as_str() { -// "stay" => { -// tx.send(4).await.unwrap(); -// continue; -// } -// "stop" => { -// if looped_service_connecting(name, serv).await.is_err() { -// tx.send(5).await.unwrap(); -// tokio::task::yield_now().await; -// return Err(CustomError::Fatal); -// } -// } -// "hold" => { -// // if is_frozen(name).await { -// // return Err(CustomError::Fatal); -// // } -// if looped_service_connecting(name, serv).await.is_err() { -// tx.send(6).await.unwrap(); -// tokio::task::yield_now().await; -// return Err(CustomError::Fatal); -// } -// } -// _ => { -// tx.send(101).await.unwrap(); -// return Err(CustomError::Fatal); -// } -// } -// } -// } -// tokio::time::sleep(Duration::from_millis(100)).await; -// Ok(()) -// } - -/// # Fn `looped_service_connecting` -/// ## for service's state check in loop (with delay and restriction of attempts) -/// -/// *input* : `&str`, `&Services` -/// -/// *output* : Ok(()) if service now available | Err(er) if still not -/// -/// *initiator* : fn `service_handler` -/// -/// *managing* : process name, current service struct -/// -/// *depends on* : fn `check_service` -/// -// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { -// if serv.triggers.wait == 0 { -// loop { -// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; -// warn!( -// "Attempting to connect from {} process to {}:{}", -// &name, &serv.hostname, &serv.port -// ); -// match check_service(&serv.hostname, &serv.port).await { -// Ok(_) => { -// log::info!( -// "Successfully connected to {} from {} process!", -// &serv.hostname, -// &name -// ); -// break; -// } -// Err(_) => { -// tokio::task::yield_now().await; -// } -// } -// } -// Ok(()) -// } else { -// let start = Instant::now(); -// while start.elapsed().as_secs() < serv.triggers.wait.into() { -// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; -// warn!( -// "Attempting to connect from {} process to {}:{}", -// &name, &serv.hostname, &serv.port -// ); -// match check_service(&serv.hostname, &serv.port).await { -// Ok(_) => { -// log::info!( -// "Successfully connected to {} from {} process!", -// &serv.hostname, -// &name -// ); -// return Ok(()); -// } -// Err(_) => { -// tokio::task::yield_now().await; -// } -// } -// } -// Err(CustomError::Fatal) -// } -// } - -/// # Fn `check_service` -/// ## for check current service's availiability -/// -/// *input* : `&str`, `&u32` -/// -/// *output* : Ok(()) if service now available | Err(er) if still not -/// -/// *initiator* : fn `service_handler`, fn `looped_service_connecting` -/// -/// *managing* : hostname, port -/// -/// *depends on* : - -/// -// ! have to be rewritten -// todo: rewrite use -async fn check_service(hostname: &str, port: &u32) -> Result<(), CustomError> { - let addr = format!("{}:{}", hostname, port); - - match addr.to_socket_addrs() { - Ok(mut addrs) => { - if addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { - Ok(()) - } else { - Err(CustomError::Fatal) - } - } - Err(_) => Err(CustomError::Fatal), - } -} - #[cfg(test)] mod service_unittests { - use super::check_service; - #[tokio::test] - async fn check_available_service() { - assert!(check_service("ya.ru", &443).await.is_ok()); - } - #[tokio::test] - async fn check_unavailable_service() { - assert!(check_service("unavailable.service", &1111).await.is_err()); - } + // use super::check_service; + // #[tokio::test] + // async fn check_available_service() { + // assert!(check_service("ya.ru", &443).await.is_ok()); + // } + // #[tokio::test] + // async fn check_unavailable_service() { + // assert!(check_service("unavailable.service", &1111).await.is_err()); + // } }