diff --git a/.gitignore b/.gitignore index c69fa41..eae2549 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ Cargo.lock hagent_test.sock release +*.sock \ No newline at end of file diff --git a/noxis-cli/Cargo.toml b/noxis-cli/Cargo.toml index 4c34412..e02d5f8 100644 --- a/noxis-cli/Cargo.toml +++ b/noxis-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "noxis-cli" -version = "0.2.4" +version = "0.2.7" edition = "2021" [dependencies] diff --git a/noxis-cli/src/cli.rs b/noxis-cli/src/cli.rs index 6c07db6..5a82b64 100644 --- a/noxis-cli/src/cli.rs +++ b/noxis-cli/src/cli.rs @@ -2,11 +2,17 @@ use clap::{Parser, Subcommand}; #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] pub struct Cli { + #[arg( + short, + default_value="noxis-rs.sock", + help="explicit specify of NOXIS Socket file" + )] + pub socket : String, #[command( subcommand, help = "to manage Noxis work", )] - command : Commands, + pub command : Commands, } #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] @@ -50,13 +56,13 @@ pub struct StartAction { num_args = 1.., value_delimiter = ' ' )] - flags : Vec, + pub flags : Vec, } #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] pub struct ConfigCommand { #[command(subcommand)] - action : ConfigAction, + pub action : ConfigAction, } #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] @@ -83,12 +89,12 @@ pub struct LocalConfig { action, help = "to read following input as JSON", )] - is_json : bool, + pub is_json : bool, // value #[arg( help = "path to config file or config String (with --json flag)", )] - config : String, + pub config : String, } #[derive(Debug, Parser, serde::Serialize, serde::Deserialize)] @@ -96,16 +102,16 @@ pub struct ProcessCommand { #[arg( help = "name of needed process", )] - process : String, + pub process : String, #[command( subcommand, help = "To get current process's status", )] - action : ProcessAction, + pub action : ProcessAction, } #[derive(Debug, Subcommand, serde::Serialize, serde::Deserialize)] -enum ProcessAction { +pub enum ProcessAction { #[command( about = "To get info about current process status", )] diff --git a/noxis-cli/src/cli_error.rs b/noxis-cli/src/cli_error.rs index bb3a8bd..d5bae9b 100644 --- a/noxis-cli/src/cli_error.rs +++ b/noxis-cli/src/cli_error.rs @@ -1,14 +1,15 @@ use thiserror::Error; -use super::cli_net::NOXIS_RS_CREDS; #[derive(Debug, Error)] pub enum NoxisCliError { - #[error("Can't send any data to {:?}. Noxis-rs daemon is disabled or can't be accessed", NOXIS_RS_CREDS)] - NoxisDaemonMissing, - #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's web-functionality")] + #[error("Can't find socket `{0}`. Error : {1}")] + NoxisDaemonMissing(String, String), + #[error("Noxis CLI can't write any data to the Noxis-rs port. Check daemon and it's runtime!")] PortIsNotWritable, #[error("Can't send Cli-prompt to the Noxis-rs. Check it's state")] CliPromptCanNotBeSent, #[error("Can't parse CLI struct and send as byte stream")] ToStringCliParsingParsing, + #[error("Can't read Noxis response due to {0}")] + CliResponseReadError(String) } \ No newline at end of file diff --git a/noxis-cli/src/cli_net.rs b/noxis-cli/src/cli_net.rs index b0fbfe7..a3300ed 100644 --- a/noxis-cli/src/cli_net.rs +++ b/noxis-cli/src/cli_net.rs @@ -1,32 +1,30 @@ -use tokio::net::TcpStream; -use tokio::io::AsyncWriteExt; +use tokio::net::UnixStream; +use tokio::io::{AsyncWriteExt, AsyncReadExt}; use tokio::time::{Duration, sleep}; use anyhow::Result; use super::Cli; use super::cli_error::NoxisCliError; -pub const NOXIS_RS_CREDS: &str = "127.0.0.1:7753"; - - -pub async fn create_tcp_stream() -> Result { - Ok(TcpStream::connect(NOXIS_RS_CREDS).await.map_err(|_| NoxisCliError::NoxisDaemonMissing)?) +async fn create_us_stream(cli: &Cli) -> Result { + Ok(UnixStream::connect(&cli.socket).await.map_err(|er| NoxisCliError::NoxisDaemonMissing((&cli.socket).to_string(), er.to_string()))?) } -pub async fn try_send(stream: Result, params: Cli) -> Result<()> { - use serde_json::to_string; - let mut stream = stream.map_err(|_| NoxisCliError::NoxisDaemonMissing)?; - loop { - if stream.writable().await.is_err() { - sleep(Duration::from_millis(100)).await; - continue; - } - // let msg: Cli = from_str(&format!("{:?}", params))?; - let msg= to_string(¶ms).map_err(|_| NoxisCliError::ToStringCliParsingParsing)?; - // let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@"; +pub async fn try_send(cli: Cli) -> Result<()> { + // let stream = create_us_stream(&cli).await; + let mut stream = create_us_stream(&cli).await?; - stream.write_all(msg.as_bytes()).await.map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?; - // ... - break; - } + let msg = serde_json::to_vec(&cli) + .map_err(|_| NoxisCliError::ToStringCliParsingParsing)?; + + stream.write_all(&msg) + .await + .map_err(|_| NoxisCliError::CliPromptCanNotBeSent)?; + + let mut response = [0; 1024]; + stream.read(&mut response) + .await + .map_err(|er| NoxisCliError::CliResponseReadError(er.to_string()))?; + + println!("Received response: {}", String::from_utf8_lossy(&response)); Ok(()) } \ No newline at end of file diff --git a/noxis-cli/src/main.rs b/noxis-cli/src/main.rs index 9262502..7961b75 100644 --- a/noxis-cli/src/main.rs +++ b/noxis-cli/src/main.rs @@ -4,12 +4,12 @@ mod cli_error; use clap::Parser; use cli::Cli; -use cli_net::{create_tcp_stream, try_send}; +use cli_net::try_send; use anyhow::Result; #[tokio::main] async fn main() -> Result<()>{ let cli = Cli::parse(); - try_send(create_tcp_stream().await, cli).await?; + try_send(cli).await?; Ok(()) } diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index 0745074..3fa9d6d 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "noxis-rs" -version = "0.11.10" +version = "0.11.26" edition = "2021" [dependencies] @@ -8,13 +8,15 @@ anyhow = "1.0.93" chrono = "0.4.38" clap = { version = "4.5.21", features = ["derive"] } env_logger = "0.11.3" -inotify = "0.10.2" +inotify = "0.11.0" log = "0.4.22" pcap = "2.2.0" -redis = "0.25.4" +redis = "0.29.2" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.118" sysinfo = "0.32.0" tokio = { version = "1.38.0", features = ["full", "time"] } noxis-cli = { path = "../noxis-cli" } dotenv = "0.15.0" +futures = "0.3.31" +async-trait = "0.1.88" diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index f830fce..496fe2a 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -1,6 +1,6 @@ { - "dateOfCreation": "1721381809104", - "configServer": "localhost", + "dateOfCreation": "1721381809112", + "configServer": "192.168.2.37", "processes": [ { "name": "temp-process", @@ -12,7 +12,7 @@ "src": "./tests/examples/", "triggers": { "onDelete": "stop", - "onChange": "stay" + "onChange": "restart" } } ], @@ -22,8 +22,7 @@ "port": 443, "triggers": { "wait": 10, - "delay": 2, - "onLost": "hold" + "onLost": "restart" } } ] diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 29886c8..b10a5da 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -1,7 +1,6 @@ mod options; mod utils; -use anyhow::Error; use clap::Parser; use log::{error, info}; use options::config::*; @@ -14,84 +13,140 @@ use std::time::Duration; use tokio::sync::mpsc; use utils::*; use options::preboot::PrebootParams; +use tokio::sync::{broadcast, oneshot}; +use options::config::v2::init_config_mechanism; +use utils::v2::init_monitoring; -#[tokio::main(flavor = "multi_thread")] +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ let preboot = Arc::new(PrebootParams::parse().validate()?); let _ = setup_logger(); - info!("Runner is configurating..."); - - // setting up redis connection \ - // then conf checks to choose the most actual \ - let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { - error!("No actual configuration for runner. Stopping..."); - std::process::exit(1); - }); - - info!( - "Current runner configuration: {}", - &processes.date_of_creation - ); - info!("Runner is ready. Initializing..."); - - if processes.processes.is_empty() { - error!("Processes list is null, runner-rs initialization is stopped"); - return Err(Error::msg("Empty processes segment in config")); - } + info!("Noxis is configurating..."); + // + let (tx_brd, mut rx_brd) = broadcast::channel::(1); + // cli <-> config + let (tx_oneshot, rx_oneshot) = oneshot::channel::(); let mut handler: Vec> = vec![]; - // is in need to send to the signals handler thread - let mut senders: Vec>> = vec![]; - for proc in processes.processes.iter() { - info!( - "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", - proc.name, - proc.path, - proc.dependencies.files.len(), - proc.dependencies.services.len() - ); - - // creating msg channel - // can or should be executed in new thread - let (tx, mut rx) = mpsc::channel::(1); - let proc = Arc::new(proc.clone()); - let tx = Arc::new(tx.clone()); - - senders.push(Arc::clone(&tx.clone())); - - let event = tokio::spawn(async move { - run_daemons(proc.clone(), tx.clone(), &mut rx).await; - }); - handler.push(event); - } - - // destructor addition - handler.push(tokio::spawn(async move { - if set_valid_destructor(Arc::new(senders)).await.is_err() { - error!("Linux signals handler creation failed. Terminating main thread..."); - return; + // initilaizing task for config manipulations + let config_module = tokio::spawn(async move { + let _ = init_config_mechanism( + rx_oneshot, + tx_brd, + preboot.clone() + ).await; + }); + handler.push(config_module); + + // initilaizing task for cli manipulation + let cli_module = tokio::spawn(async move { + if let Err(er) = init_cli_pipeline().await { + error!("CLI pipeline failed due to {}", er) } + }); + handler.push(cli_module); - tokio::time::sleep(Duration::from_millis(200)).await; - info!("End of job. Terminating main thread..."); + // initilaizing task for deinitializing `Noxis` + let ctrlc = tokio::spawn(async move { + if let Err(er) = set_valid_destructor(vec![].into()).await { + error!("Destructor mod failed due to {}", er); + } std::process::exit(0); - })); + }); + handler.push(ctrlc); - // remote config update subscription - handler.push(tokio::spawn(async move { - let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; - })); - - // cli pipeline - handler.push(tokio::spawn(async move { - let _ = init_cli_pipeline().await; - })); + let monitoring = tokio::spawn(async move { + let config = { + let mut tick = tokio::time::interval(Duration::from_millis(500)); + loop { + tick.tick().await; + break match rx_brd.try_recv() { + Ok(conf) => conf, + Err(_) => continue, + } + } + }; + if let Err(er) = init_monitoring(config).await { + error!("Monitoring mod failed due to {}", er); + } + }); + handler.push(monitoring); for i in handler { let _ = i.await; } + + // setting up redis connection \ + // then conf checks to choose the most actual \ + // let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { + // error!("No actual configuration for runner. Stopping..."); + // std::process::exit(1); + // }); + // + // info!( + // "Current runner configuration: {}", + // &processes.date_of_creation + // ); + // info!("Runner is ready. Initializing..."); + // + // if processes.processes.is_empty() { + // error!("Processes list is null, runner-rs initialization is stopped"); + // return Err(Error::msg("Empty processes segment in config")); + // } + // let mut handler: Vec> = vec![]; + // // is in need to send to the signals handler thread + // let mut senders: Vec>> = vec![]; + // + // for proc in processes.processes.iter() { + // info!( + // "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", + // proc.name, + // proc.path, + // proc.dependencies.files.len(), + // proc.dependencies.services.len() + // ); + // + // // creating msg channel + // // can or should be executed in new thread + // let (tx, mut rx) = mpsc::channel::(1); + // let proc = Arc::new(proc.clone()); + // let tx = Arc::new(tx.clone()); + // + // senders.push(Arc::clone(&tx.clone())); + // + // let event = tokio::spawn(async move { + // run_daemons(proc.clone(), tx.clone(), &mut rx).await; + // }); + // handler.push(event); + // } + // + // // destructor addition + // handler.push(tokio::spawn(async move { + // if set_valid_destructor(Arc::new(senders)).await.is_err() { + // error!("Linux signals handler creation failed. Terminating main thread..."); + // return; + // } + // + // tokio::time::sleep(Duration::from_millis(200)).await; + // info!("End of job. Terminating main thread..."); + // std::process::exit(0); + // })); + // + // // remote config update subscription + // handler.push(tokio::spawn(async move { + // let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; + // })); + // + // // cli pipeline + // handler.push(tokio::spawn(async move { + // let _ = init_cli_pipeline().await; + // })); + // + // for i in handler { + // let _ = i.await; + // } Ok(()) } diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index ad6a670..0c13e22 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -1,12 +1,9 @@ -use log::{error, info, warn}; -use tokio::net::{TcpListener, TcpStream}; -use anyhow::{Result as DynResult, Error}; +use log::{error, info}; +use tokio::net::{ UnixStream, UnixListener }; use tokio::time::{sleep, Duration}; -use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}}; -// use std::io::BufReader; -use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt}; +use std::fs; +use tokio::io::{ AsyncWriteExt, AsyncReadExt}; use noxis_cli::Cli; -use serde_json::from_str; /// # Fn `init_cli_pipeline` /// ## for catching all input requests from CLI @@ -21,49 +18,32 @@ use serde_json::from_str; /// /// *depends on* : - /// -pub async fn init_cli_pipeline() -> DynResult<()> { - match init_listener().await { - Some(list) => { +pub async fn init_cli_pipeline() -> anyhow::Result<()> { + let socket_path = "noxis.sock"; + let _ = fs::remove_file(socket_path); + + match UnixListener::bind(socket_path) { + Ok(list) => { + // TODO: remove `unwrap`s + info!("Listening on {}", socket_path); loop { - if let Ok((socket, addr)) = list.accept().await { - // isolation - if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() { - warn!("Declined attempt to connect TCP-socket from {}", addr); - continue; - } - process_connection(socket).await; + match list.accept().await { + Ok((socket, _)) => { + // tokio::spawn(); + process_connection(socket).await; + }, + Err(er) => { + error!("Cannot poll connection to CLI due to {}", er); + sleep(Duration::from_millis(300)).await; + }, } - sleep(Duration::from_millis(500)).await; } // Ok(()) }, - None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use")) - } -} - -/// # Fn `init_listener` -/// ## for creating TCP-listener for communicating with CLI -/// -/// *input* : - -/// -/// *output* : `Some` if port 7753 was opened | None if not -/// -/// *initiator* : fn `init_cli_pipeline` -/// -/// *managing* : `TcpListener` object to handle requests -/// -/// *depends on* : `tokio::net::TcpListener` -/// -async fn init_listener() -> Option { - match TcpListener::bind("127.0.0.1:7753").await { - Ok(listener) => { - info!("Runner is listening localhost:7753"); - Some(listener) + Err(er) => { + error!("Failed to open UnixListener for CLI"); + Err(er.into()) }, - Err(_) => { - error!("Cannot create TCP listener for CLI"); - None - } } } @@ -80,27 +60,29 @@ async fn init_listener() -> Option { /// /// *depends on* : `tokio::net::TcpStream` /// -async fn process_connection(mut stream: TcpStream) { - let buf_reader = BufReader::new(stream.borrow_mut()); - let mut rqst = buf_reader.lines(); - - - while let Ok(Some(line)) = rqst.next_line().await { - if line.is_empty() { - break - } - match from_str::(&line) { - Ok(req) => { - // TODO: func wrapper - dbg!(req); - }, - Err(_) => { - break - }, - } - println!("{}", line); +async fn process_connection(mut stream: UnixStream) { + let mut buf = vec![0; 1024]; + match stream.read(&mut buf).await { + Ok(0) => { + info!("Client disconnected "); + }, + Ok(n) => { + buf.truncate(n); + info!("CLI have sent {} bytes", n); + match serde_json::from_slice::(&buf) { + Ok(cli) => { + info!("Received CLI request: {:?}", cli); + let response = "OK"; + if let Err(e) = stream.write_all(response.as_bytes()).await { + error!("Failed to send response: {}", e); + } + } + Err(e) => { + error!("Failed to parse CLI request: {}", e); + } + } + }, + Err(e) => error!("Failed to read from socket: {}", e), } - - let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!"; - stream.write_all(response.as_bytes()).await.unwrap(); + let _ = stream.shutdown().await; } diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index d9451b9..9e21042 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -9,9 +9,393 @@ use std::sync::Arc; use std::{env, fs}; use super::preboot::PrebootParams; use tokio::time::{Duration, sleep}; +// use redis::PubSub; +use tokio::sync::{ + oneshot, + oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender }, + broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver }; +use crate::utils::files::create_watcher; +use std::fs::File; +use inotify::EventMask; // const CONFIG_PATH: &str = "settings.json"; +pub mod v2 { + use std::path::PathBuf; + use crate::utils::get_container_id; + + use super::*; + + pub async fn init_config_mechanism( + // to handle cli config changes + cli_oneshot: OneShotReciever, + // to share local config with PRCS, CLI_PIPELINE and CONFIG modules + brd_tx : BroadcastSender, + // preboot params (args) + params : Arc + /*...*/ + ) { + // channel for pubsub to handle local config pulling + let local_config_brd_reciever = brd_tx.subscribe(); + // channel between pub-sub mech and local config mech + let (tx_pb_lc, rx_pb_lc) = oneshot::channel::(); + // channel between cli mech and local config mech + let (tx_cli_lc, rx_cli_lc) = oneshot::channel::(); + + // dbg!("before lc"); + let params_clone = params.clone(); + let for_lc_path = params.clone(); + let lc_path = for_lc_path + .config + .to_str() + .unwrap_or("settings.json"); + + // future to init work with local config + let lc_future = tokio::spawn( + // let params = params.clone(); + local_config_reciever( + params_clone, + rx_pb_lc, + rx_cli_lc, + Arc::new(brd_tx) + ) + ); + // dbg!("before pb"); + // future to init work with pub sub mechanism + let pubsub_future = tokio::spawn( + pubsub_config_reciever( + tx_pb_lc, + params.clone(), + local_config_brd_reciever + ) + ); + + // dbg!("before cli"); + // future to catch new configs from cli pipeline + let cli_future = tokio::spawn( + from_cli_config_reciever( + cli_oneshot, + tx_cli_lc + ) + + ); + // let _ = lc_future.await; + // dbg!("before select"); + tokio::select! { + lc_result = lc_future => { + // dbg!("end of lc"); + match lc_result { + Ok(res) => { + if res.is_ok() { + info!("Local config warding mechanism stopped, waiting for others ..."); + sleep(Duration::from_millis(500)).await; + let _ = restart_main_thread(); + } + else { + error!("Local config warding mechanism crushed, restarting ..."); + let _ = restart_main_thread(); + } + }, + Err(_) => { + error!("Local config warding mechanism crushed, restarting ..."); + let _ = restart_main_thread(); + }, + } + }, + pb_result = pubsub_future => { + match pb_result { + Ok(res) => { + if res.is_ok() { + info!("New config was saved locally, restarting ..."); + } + else { + error!("Pubsub mechanism crushed, restarting ..."); + } + }, + Err(_) => { + error!("Pubsub mechanism crushed, restarting ..."); + }, + } + let _ = restart_main_thread(); + }, + cli_config_option = cli_future => { + match cli_config_option { + Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."), + Ok(option_config) => { + match option_config { + None => error!("CLI pulling new config mechanism crushed, restarting ..."), + Some(config) => { + info!("New config was pulled from CLI, saving and restarting ..."); + let _ = save_new_config(&config, lc_path); + }, + } + }, + } + let _ = restart_main_thread(); + }, + } + // dbg!("after select"); + // TODO! futures + select! [OK] + // TODO! tests config + } + pub async fn get_redis_connection(params: &str) -> Option { + for i in 1..=3 { + let redis_url = format!("redis://{}/", params); + info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i); + if let Ok(client) = Client::open(redis_url) { + if let Ok(conn) = client.get_connection() { + info!("Successfully opened Redis connection"); + return Some(conn); + } + } + error!("Error with subscribing Redis stream on update. Retrying in 5 secs..."); + sleep(Duration::from_secs(5)).await; + } + None + } + + // loop checking redis pubsub + async fn pubsub_config_reciever( + // to stop checking local config + local_conf_tx : OneShotSender, + params : Arc, + tx_brd_local : BroadcastReceiver, + ) -> anyhow::Result<()>{ + /*...*/ + // dbg!("start of pb"); + let mut tx_brd_local = tx_brd_local; + let local_config = if !tx_brd_local.is_empty() { + tx_brd_local.recv().await? + } else { + // Processes::default() + let mut tick = tokio::time::interval(Duration::from_millis(500)); + loop { + tick.tick().await; + break match tx_brd_local.recv().await { + Ok(conf) => conf, + Err(_) => continue, + }; + } + }; + match get_redis_connection(&local_config.config_server).await { + Some(mut conn) => { + let mut pub_sub = conn.as_pubsub(); + let channel_name = get_container_id().unwrap_or(String::from("default")); + let channel_name = channel_name.trim(); + match pub_sub.subscribe(channel_name) { + Err(er) => { + error!("Cannot subscribe pubsub channel due to {}", &er); + return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) + }, + Ok(_) => { + info!("Successfully subscribed to {} pubsub channel", channel_name); + let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3))); + loop { + if let Ok(msg) = pub_sub.get_message() { + // dbg!("ok on get message"); + let payload : Result = msg.get_payload(); + match payload { + Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), + Ok(payload) => { + if let Some(remote) = parse_extern_config(&payload) { + match config_comparing(&local_config, &remote) { + ConfigActuality::Local => { + warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); + }, + ConfigActuality::Remote => { + info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); + // to stop watching local config file mechanism + let _ = local_conf_tx.send(true); + let config_path = params.config.to_str().unwrap_or("settings.json"); + + if save_new_config(&remote, &config_path).is_err() { + error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); + return Err(anyhow::Error::msg( + format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) + )) + } + return Ok(()); + }, + } + } + else { + warn!("Invalid config was pulled from Redis channel") + } + }, + } + } + // delay + tokio::task::yield_now().await; + } + }, + } + }, + None => { + sleep(Duration::from_secs(20)).await; + } + } + Ok(()) + } + + // + async fn local_config_reciever( + params : Arc, + pubsub_oneshot : OneShotReciever, + cli_oneshot : OneShotReciever, + brd_tx : Arc>, + /*...*/ + ) -> anyhow::Result<()> { + /*...*/ + // shadowing as mut + let mut pubsub_oneshot = pubsub_oneshot; + let mut cli_oneshot = cli_oneshot; + // fill with default empty config, mut to change later + let mut _current_config = Processes::default(); + // PathBuf to &str to work with local config path as slice + let local_config_path = params + .config + .to_str() + .unwrap_or("settings.json"); + + match load_processes(local_config_path) { + // if local exists + Some(conf) => { + info!("Local config `{}` was found.", &conf.date_of_creation); + _current_config = conf; + if let Err(er) = brd_tx.send(_current_config.clone()) { + error!("Cannot share local config with broadcast due to {}", er); + } + }, + // if local is not exist + None => { + warn!("Local config wasn't found. Waiting for new ..."); + return Err(anyhow::Error::msg("No local config")); + // ... + }, + } + + // 100% local exists here + // create watcher on local config file + match create_watcher("", local_config_path) { + Ok(mut watcher) => { + loop { + let mut need_to_export_config = false; + // let mut need_to_recreate_watcher = false; + // return situations here + // 1) oneshot signal + // 2) if config was deleted -> recreate and fill with current config that is held here + // 3) if config was changed -> fill with current config that is held here + + // catching signal from pubsub + // it's because pubsub mech pulled new valid and actual config and now it's time to ... + // ... overwrite local config file and restart main thread + if let Ok(_) = pubsub_oneshot.try_recv() { + sleep(Duration::from_secs(1)).await; + return Ok(()); + } + + // catching signal from cli + // it's because cli mech pulled new valid and actual config and now it's time to ... + // ... overwrite local config file and restart main thread (like in previous mechanism) + if let Ok(_) = cli_oneshot.try_recv() { + sleep(Duration::from_secs(1)).await; + return Ok(()); + } + + // ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ... + // ! ... FROM WATCHER"S BUFFER + + // existing check + if !params.config.exists() { + warn!("Local config file was deleted or moved. Recreating new one with saved data ..."); + need_to_export_config = true; + // need_to_recreate_watcher = true; + } else { + // changes check + let mut buffer = [0; 128]; + let events = watcher.read_events(&mut buffer); + if events.is_ok() { + let events: Vec = events + .unwrap() + .map(|mask| mask.mask) + .filter(|mask| { + *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF + }) + .collect(); + if !events.is_empty() { + warn!("Local config file was overwritten. Discarding changes ..."); + need_to_export_config = true; + // events + // .iter() + // .any(|event| *event == EventMask::DELETE_SELF) + // .then(|| need_to_recreate_watcher = true); + } + } + } + // exporting data + if need_to_export_config { + if let Err(er) = export_saved_config_data_locally(¶ms.config, &_current_config).await { + error!("Cannot save actual imported config due to {}", er); + } else { + // recreation watcher (draining activity buffer mechanism) + // if local config file was deleted and recreated + // if local config file was modified locally + match create_watcher("", local_config_path) { + Ok(new) => watcher = new, + Err(er) => error!("Cannot create new watcher due to {}", er), + } + } + } + sleep(Duration::from_millis(300)).await; + // tokio::task::yield_now().await; + } + }, + Err(_) => { + error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path); + return Err(anyhow::Error::msg("Cannot create watcher on local config file")); + }, + } + + } + + // [:IN-TEST] + async fn from_cli_config_reciever( + cli_oneshot: OneShotReciever, + to_local_tx: OneShotSender + ) -> Option { + /* match awaits til channel*/ + // dbg!("start of cli"); + loop { + if !cli_oneshot.is_empty() { + match cli_oneshot.await { + Ok(config_from_cli) => { + info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); + let _ = to_local_tx.send(true); + return Some(config_from_cli) + }, + _ => return None, + } + } + sleep(Duration::from_millis(300)).await; + } + + } + + async fn export_saved_config_data_locally( + config_file_path: &PathBuf, + current_config: &Processes + ) -> anyhow::Result<()> { + + let mut file = File::create(config_file_path)?; + file.write_all( + serde_json::to_string_pretty(current_config)?.as_bytes() + )?; + Ok(()) + // Ok(()) + } +} + + /// # Fn `load_processes` /// ## for reading and parsing *local* storing config /// @@ -54,14 +438,14 @@ pub async fn get_actual_config(params : Arc) -> Option error!("Invalid character in config file. Config path was set to default"); "settings.json" }); - info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url); + info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url); match load_processes(config_path) { Some(local_conf) => { info!( "Found local configuration, version - {}", &local_conf.date_of_creation ); - if !params.no_remote_config { + if !params.no_sub { if let Some(remote_conf) = // TODO : rework with pubsub mech once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url)) @@ -85,7 +469,7 @@ pub async fn get_actual_config(params : Arc) -> Option } None => { warn!("No local valid conf was found. Trying to pull remote one..."); - if !params.no_remote_config { + if !params.no_sub { let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url))); if let Some(conf) = get_remote_conf_watcher(&mut conn).await { info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); @@ -322,7 +706,7 @@ fn restart_main_thread() -> std::io::Result<()> { pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); - if params.no_sub || params.no_remote_config { + if params.no_sub { return Err(CustomError::Fatal); } if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { @@ -397,6 +781,9 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc ConfigActuality { + if local.is_default() { + return ConfigActuality::Remote; + } let local_date: u64 = local.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap(); diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 8293f8a..d21cd57 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -13,7 +13,7 @@ enum EnvVars { NoxisNoHagent, NoxisNoLogs, NoxisRefreshLogs, - NoxisNoRemoteConfig, + // NoxisNoRemoteConfig, NoxisNoConfigSub, NoxisSocketPath, NoxisLogTo, @@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars { EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"), EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"), EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"), - EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"), + // EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"), EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"), EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"), EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"), @@ -48,7 +48,7 @@ impl<'a> EnvVars { EnvVars::NoxisNoHagent => "false", EnvVars::NoxisNoLogs => "false", EnvVars::NoxisRefreshLogs => "false", - EnvVars::NoxisNoRemoteConfig => "false", + // EnvVars::NoxisNoRemoteConfig => "false", EnvVars::NoxisNoConfigSub => "false", EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock", EnvVars::NoxisLogTo => "./", @@ -77,7 +77,7 @@ impl<'a> EnvVars { Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string()); Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string()); Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string()); - Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string()); + // Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string()); Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string()); Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap()); Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap()); @@ -147,12 +147,6 @@ impl std::fmt::Display for MetricsPrebootParams { /// noxis-rs ... --refresh-logs ... /// ``` /// -/// `--no-remote-config` - to disable work with Redis as config producer -/// ### usage : -/// ``` bash -/// noxis-rs ... --no-remote-config ... -/// ``` -/// /// `--no-sub` - to disable Redis subscribtion mechanism /// ### usage : /// ``` bash @@ -212,17 +206,18 @@ pub struct PrebootParams { help="To clear logs directory" )] pub refresh_logs : bool, - #[arg( - long = "no-remote-config", - action, - help="To disable work with remote config server", - conflicts_with="no_sub")] - pub no_remote_config : bool, + // #[arg( + // long = "no-remote-config", + // action, + // help="To disable work with remote config server", + // conflicts_with="no_sub")] + // pub no_remote_config : bool, #[arg( long = "no-sub", action, - help="To disable subscription mechanism", - conflicts_with="no_remote_config")] + help="To disable Redis subscription mechanism", + )] + // conflicts_with="no_remote_config" pub no_sub : bool, // params (socket_path, log_to, remote_server_url, config) @@ -243,7 +238,7 @@ pub struct PrebootParams { #[arg( long = "remote-server-url", default_value="localhost", - conflicts_with="no_remote_config", + conflicts_with="no_sub", help = "To set url of remote config server using in remote config pulling mechanism" )] pub remote_server_url : String, @@ -288,15 +283,17 @@ impl PrebootParams { // existing log dir if !self.log_to.exists() && !self.no_logs { eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default"); + self.refresh_logs = false; self.log_to = PathBuf::from("./"); // return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start")); } // existing sock file if !self.config.exists() { eprintln!("Error: Invalid character in config file. Config path was set to default"); - let config = PathBuf::from("/etc/settings.json"); - if !config.exists() && self.no_remote_config { - return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism")); + // TODO : ??? wtf is going with 2 paths + let config = PathBuf::from("/etc/enode/noxis/settings.json"); + if !config.exists() && self.no_sub { + return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism")); } self.config = PathBuf::from("settings.json"); // return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start")); @@ -353,20 +350,20 @@ mod preboot_unitests{ "runner-rs", "--no-sub", "--remote-server-url", "redis://127.0.0.1" - ]).is_ok()) - } - #[test] - fn parsing_config_invalid_args_noremote_nosub() { - assert!(PrebootParams::try_parse_from(vec![ - "runner-rs", - "--no-remote-config", "--no-sub" ]).is_err()) } + // #[test] + // fn parsing_config_invalid_args_noremote_nosub() { + // assert!(PrebootParams::try_parse_from(vec![ + // "runner-rs", + // "--no-remote-config", "--no-sub" + // ]).is_err()) + // } #[test] fn parsing_config_invalid_args_noremote_remoteurl() { assert!(PrebootParams::try_parse_from(vec![ "runner-rs", - "--no-remote-config", + "--no-sub", "--remote-server-url", "redis://127.0.0.1" ]).is_err()) } diff --git a/noxis-rs/src/options/signals.rs b/noxis-rs/src/options/signals.rs index 7604bde..f840510 100644 --- a/noxis-rs/src/options/signals.rs +++ b/noxis-rs/src/options/signals.rs @@ -22,7 +22,7 @@ type SendersVec = Arc>>>; /// /// *depends on* : Sig, Signals /// -pub async fn set_valid_destructor(senders: SendersVec) -> Result<(), CustomError> { +pub async fn set_valid_destructor(senders: SendersVec) -> anyhow::Result<()> { let (mut int, mut term, mut stop) = ( Sig::new(Signals::Sigint, senders.clone()), Sig::new(Signals::Sigterm, senders.clone()), diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 65c1a19..56be896 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -2,7 +2,114 @@ use std::net::Ipv4Addr; use serde::{Deserialize, Serialize}; +use async_trait::async_trait; +use std::sync::Arc; +#[derive(Debug)] +pub enum DependencyType { + File, + Service, +} + +#[derive(Debug)] +pub enum ServiceState { + Ok, + Unavailable +} +pub struct ServiceWaitConfig(u32); + +impl Default for ServiceWaitConfig { + fn default() -> Self { + Self(5) + } +} + +pub enum FileTriggerType { + OnChange, + OnDelete, +} + +impl std::fmt::Display for FileTriggerType { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return match self { + FileTriggerType::OnChange => write!(f, "File was changed"), + FileTriggerType::OnDelete => write!(f, "File was moved or deleted"), + } + } +} + +impl<'a> FileTriggerType { + pub fn event(&self, file_name: Arc, trigger: Arc) -> Events { + return match self { + FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger)), + FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger)), + } + } + pub fn event_from_file_trigger_controller(&self, file_name: Arc, trigger: &FileTriggersForController) -> Events { + return match self { + FileTriggerType::OnChange => Events::Negative(NegativeOutcomes::FileWasChanged(file_name, DependencyType::File, trigger.on_change.clone())), + FileTriggerType::OnDelete => Events::Negative(NegativeOutcomes::FileWasMovedOrDeleted(file_name, DependencyType::File, trigger.on_delete.clone())), + } + } +} + +#[derive(Debug)] +pub enum Triggers { + File { on_change: Arc, on_delete: Arc }, + Service {on_lost: Arc, wait: u32}, +} + +impl Triggers { + pub fn new_file(on_change: Arc, on_delete: Arc) -> Triggers { + Triggers::File { on_change, on_delete } + } + pub fn new_service(on_lost: Arc, wait_time: u32) -> Triggers { + Triggers::Service{on_lost, wait: wait_time} + } + pub fn to_service_negative_event(&self, service_name: Arc) -> Option { + if let Triggers::Service { on_lost, .. } = self { + return Some(Events::Negative(NegativeOutcomes::ServiceIsUnreachable(service_name, DependencyType::Service, on_lost.clone()))) + } + None + } +} + +#[derive(Debug)] +pub struct FileTriggersForController{ pub on_change: Arc, pub on_delete: Arc } +pub struct ServiceTriggersForController(Arc); + +impl std::fmt::Display for DependencyType { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return match self { + DependencyType::File => write!(f, "File"), + DependencyType::Service => write!(f, "Service"), + } + } +} + +#[derive(Debug)] +pub enum ProcessState { + Pending, + Holding, + Stopped, + StoppedByCli, +} +#[derive(Debug)] +pub enum Events { + Positive(Arc), + Negative(NegativeOutcomes) +} +#[derive(Debug)] +pub enum NegativeOutcomes { + FileWasChanged(Arc, DependencyType, Arc), + FileWasMovedOrDeleted(Arc, DependencyType, Arc), + ServiceIsUnreachable(Arc, DependencyType, Arc), +} + +#[async_trait] +pub trait ProcessUnit { + async fn process(&mut self); +} /// # an Error enum (next will be deleted and replaced) pub enum CustomError { Fatal, @@ -40,6 +147,22 @@ pub struct Processes { pub processes: Vec, } +impl Default for Processes { + fn default() -> Self { + Self { + date_of_creation : String::new(), + config_server : String::from("default"), + processes : Vec::new(), + } + } +} + +impl Processes { + pub fn is_default(&self) -> bool { + self.date_of_creation.is_empty() + } +} + /// # Struct for the 2nd level in json conf file /// ## for each process to contain info, such as name, path and dependencies /// @@ -135,7 +258,7 @@ pub struct Files { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Services { pub hostname: String, - pub port: u32, + pub port: Option, pub triggers: ServiceTriggers, } @@ -159,7 +282,6 @@ pub struct Services { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ServiceTriggers { pub wait: u32, - pub delay: u32, #[serde(rename = "onLost")] pub on_lost: String, } diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index 9ead34d..b1e315a 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -6,25 +6,207 @@ pub mod services; // TODO : saving current flags state -use crate::options::structs::CustomError; -use crate::options::structs::TrackingProcess; -use files::create_watcher; -use files::file_handler; -use inotify::Inotify; -use log::{error, warn}; +use crate::options::structs::{CustomError, TrackingProcess, Processes}; +// use files::create_watcher; +// use files::file_handler; +// use inotify::Inotify; +use log::{error, warn, info}; use prcs::{ freeze_process, is_active, is_frozen, restart_process, start_process, terminate_process, unfreeze_process, }; -use services::service_handler; +// use services::service_handler; use std::process::Command; use std::sync::Arc; -use tokio::join; +// use tokio::join; use tokio::sync::mpsc; use tokio::time::Duration; +// use tokio::sync::mpsc::{Receiver as MpscReciever, Sender as MpscSender}; +// controllers import +use prcs::v2::ProcessesController; +use files::v2::FilesController; +use services::v2::ServicesController; +use async_trait::async_trait; const GET_ID_CMD: &str = "hostname"; +pub mod v2 { + use std::collections::{BTreeMap, HashMap, LinkedList, VecDeque}; + use crate::options::structs::{Events, FileTriggersForController, ProcessUnit, Triggers}; + use super::*; + + #[derive(Debug)] + enum ControllerResult { + Process(Option), + File(Option), + Service(Option), + } + + #[derive(Debug)] + struct Supervisor { + prcs : LinkedList, + files : LinkedList, + services : LinkedList, + } + + impl Supervisor { + pub fn new() -> Supervisor { + Supervisor { prcs: LinkedList::new(), files: LinkedList::new(), services: LinkedList::new()} + } + pub async fn with_config(mut self, config: &Processes) -> Supervisor { + let _ = config.processes.iter() + .for_each(|prc| { + let (rx, tx) = mpsc::channel::(10); + let temp = ProcessesController::new(&prc.name, tx).with_exe(&prc.path); + if !self.prcs.contains(&temp) { + self.prcs.push_back(temp); + } + let rx = Arc::new(rx); + let proc_name: Arc = Arc::from(prc.name.clone()); + + let _ = prc.dependencies.files.iter() + .for_each(|file| { + let mut hm = HashMap::new(); + let triggers = FileTriggersForController { on_change: Arc::from(file.triggers.on_change.clone()), on_delete: Arc::from(file.triggers.on_delete.clone())}; + hm.insert(proc_name.clone(), (triggers, rx.clone())); + + let tempfile = FilesController::new(&file.filename.as_str(), hm) + .with_path(&file.src); + + + if let Ok(file) = tempfile { + if let Some(current_file) = self.files.iter_mut().find(|a| &&file == a) { + current_file.add_event(file); + } else { + self.files.push_back(file); + } + } + }); + + // servs + let _ = prc.dependencies.services.iter() + .for_each(|serv| { + let access_url = ServicesController::get_access_url(&serv.hostname, serv.port.as_ref()); + // preparations + let rx = rx.clone(); + let serv_cont = ServicesController::new().with_access_name( + &serv.hostname, + &access_url + ); + // triggers + let arc: Arc = Arc::from(serv.triggers.on_lost.clone()); + let triggers = Triggers::new_service(arc, serv.triggers.wait); + + if let Some(proc) = self.services.iter_mut().find(|a| &&serv_cont == a) { + proc.add_process(&prc.name, triggers, rx); + } else { + // vecdeque for queue + let mut vec: VecDeque> = VecDeque::new(); + vec.push_back(proc_name.clone()); + // connection_queue + let mut connection_queue: BTreeMap>> = BTreeMap::new(); + connection_queue.insert(serv.triggers.wait, vec); + // event_reg + let mut hm = HashMap::new(); + hm.insert(proc_name.clone(), (triggers, rx)); + + let serv_cont = serv_cont.with_params(connection_queue, hm); + self.services.push_back(serv_cont); + } + }); + }); + self + } + pub fn get_stats(&self) -> String { + format!("processes: {}, files: {}, services: {}", self.prcs.len(),self.files.len(), self.services.len()) + } + } + + #[async_trait] + impl ProcessUnit for Supervisor { + async fn process(&mut self) { + info!("Initializing monitoring ..."); + loop { + // dbg!(&self); + let mut tasks: Vec> = vec![]; + // let (mut prc, mut file, mut serv) = (self.prcs.pop_front().unwrap(), self.files.pop_front().unwrap(), self.services.pop_front().unwrap()); + // let res = tokio::join!(prc.process(), file.process(), serv.process()); + if let Some(mut val) = self.prcs.pop_front() { + tasks.push( + tokio::spawn( async move { + val.process().await; + ControllerResult::Process(Some(val)) + }) + ); + } + if let Some(mut val) = self.files.pop_front() { + tasks.push( + tokio::spawn( async move { + val.process().await; + ControllerResult::File(Some(val)) + }) + ); + } + if let Some(mut val) = self.services.pop_front() { + tasks.push( + tokio::spawn( async move { + val.process().await; + ControllerResult::Service(Some(val)) + }) + ); + } + for task in tasks { + match task.await { + Ok(ControllerResult::Process(Some(val))) => self.prcs.push_back(val), + Ok(ControllerResult::File(Some(val))) => self.files.push_back(val), + Ok(ControllerResult::Service(Some(val))) => self.services.push_back(val), + Err(er) => error!("Controller task crushed : {er}. Cannot push back to the exec queue ..."), + _ => { /* DEAD END (CAN NOT BE EXECUTED) */}, + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + + // spawn tasks + // spawn prc + // spawn files + // spawn services + // ## for ... i.await in loop + pub async fn init_monitoring( + config: Processes + ) -> anyhow::Result<()> { + let mut supervisor = Supervisor::new().with_config(&config).await; + info!("Monitoring: {} ", &supervisor.get_stats()); + supervisor.process().await; + Ok(()) + } + + + // async fn generate_controllers<'a>(config: Processes) -> (HashSet>, HashSet>, HashSet>) { + // let mut prcs: HashSet> = HashSet::new(); + // let mut files: HashSet> = HashSet::new(); + // let mut services: HashSet> = HashSet::new(); + // for prc in config.processes { + // let (rx, tx) = mpsc::channel::>(10); + // // let new_prc = ProcessesController::new(&prc.name, tx).with_exe(prc.path); + // let mut new_prc = ProcessesController::new("&prc.name", tx).with_exe(prc.path); + // let a = new_prc.process().await; + + // } + // (prcs, files, services) + // } + // spawn prc check with semaphore check + async fn prcs_monitoriing() -> anyhow::Result<()> { Ok(()) } + + // spawn file check with semaphore check + async fn files_monitoriing() -> anyhow::Result<()> { Ok(()) } + + // spawn service check with semaphore check + async fn services_monitoriing() -> anyhow::Result<()> { Ok(()) } +} + /// # Fn `run_daemons` /// ## async func to run 3 main daemons: process, service and file monitors and manage process state according to given messages into channel /// @@ -40,37 +222,37 @@ const GET_ID_CMD: &str = "hostname"; /// /// > *hint* : give mpsc with capacity 1 to jump over potential errors during running process /// -pub async fn run_daemons( - proc: Arc, - tx: Arc>, - rx: &mut mpsc::Receiver, -) { - // creating watchers + ---buffers--- - let mut watchers: Vec = vec![]; - for file in proc.dependencies.files.clone().into_iter() { - if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { - watchers.push(watcher); - } else { - let _ = tx.send(121).await; - } - // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); - } - let watchers_clone: Arc>> = - Arc::new(tokio::sync::Mutex::new(watchers)); +// pub async fn run_daemons( +// proc: Arc, +// tx: Arc>, +// rx: &mut mpsc::Receiver, +// ) { +// // creating watchers + ---buffers--- +// let mut watchers: Vec = vec![]; +// for file in proc.dependencies.files.clone().into_iter() { +// if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { +// watchers.push(watcher); +// } else { +// let _ = tx.send(121).await; +// } +// // watchers.push(create_watcher(&file.filename, &file.src).await.unwrap()); +// } +// let watchers_clone: Arc>> = +// Arc::new(tokio::sync::Mutex::new(watchers)); - loop { - let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); - tokio::select! { - _ = run_hand => continue, - _val = rx.recv() => { - if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { - return; - } - }, - } - tokio::task::yield_now().await; - } -} +// loop { +// let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); +// tokio::select! { +// _ = run_hand => continue, +// _val = rx.recv() => { +// if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { +// return; +// } +// }, +// } +// tokio::task::yield_now().await; +// } +// } async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ match val { @@ -133,7 +315,6 @@ async fn process_protocol_symbol(proc: Arc, val: u8) -> Result< }, // // 9 - File-dependency change -> staying (after check) 9 => { - // no need to trash logs warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); tokio::time::sleep(Duration::from_millis(100)).await; }, @@ -201,36 +382,36 @@ async fn process_protocol_symbol(proc: Arc, val: u8) -> Result< /// /// *depends on* : fn `utils::files::file_handler`, fn `utils::services::service_handler`, fn `utils::prcs::{is_active, is_frozen, start_process}` /// -pub async fn running_handler( - prc: Arc, - tx: Arc>, - watchers: Arc>>, -) { - // services and files check (once) - let files_check = file_handler( - &prc.name, - &prc.dependencies.files, - tx.clone(), - watchers.clone(), - ); - let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); +// pub async fn running_handler( +// prc: Arc, +// tx: Arc>, +// watchers: Arc>>, +// ) { +// // services and files check (once) +// let files_check = file_handler( +// &prc.name, +// &prc.dependencies.files, +// tx.clone(), +// watchers.clone(), +// ); +// let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - let res = join!(files_check, services_check); - // if inactive -> spawn checks -> active is true - if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { - if start_process(&prc.name, &prc.path).await.is_err() { - tx.send(3).await.unwrap(); - return; - } - } - // if frozen -> spawn checks -> unfreeze is true - else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { - tx.send(10).await.unwrap(); - return; - } - // tokio::time::sleep(Duration::from_millis(100)).await; - tokio::task::yield_now().await; -} +// let res = join!(files_check, services_check); +// // if inactive -> spawn checks -> active is true +// if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok() { +// if start_process(&prc.name, &prc.path).await.is_err() { +// tx.send(3).await.unwrap(); +// return; +// } +// } +// // if frozen -> spawn checks -> unfreeze is true +// else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok() { +// tx.send(10).await.unwrap(); +// return; +// } +// // tokio::time::sleep(Duration::from_millis(100)).await; +// tokio::task::yield_now().await; +// } // todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}' /// # Fn `get_container_id` diff --git a/noxis-rs/src/utils/files.rs b/noxis-rs/src/utils/files.rs index 639ced2..da87b0f 100644 --- a/noxis-rs/src/utils/files.rs +++ b/noxis-rs/src/utils/files.rs @@ -1,183 +1,321 @@ -use crate::options::structs::{CustomError, Files}; -use super::prcs::{is_active, is_frozen}; -use inotify::{EventMask, Inotify, WatchMask}; -use std::borrow::BorrowMut; -use std::path::Path; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::time::Duration; + use crate::options::structs::{CustomError, Files}; + use super::prcs::{is_active, is_frozen}; + use inotify::{EventMask, Inotify, WatchMask}; + use std::borrow::BorrowMut; + use std::path::Path; + use std::sync::Arc; + use tokio::sync::mpsc; + use tokio::sync::mpsc::Sender as Sender; + use tokio::time::Duration; + use crate::options::structs::Events; + use async_trait::async_trait; -/// # Fn `create_watcher` -/// ## for creating watcher on file's delete | update events -/// -/// *input* : `&str`, `&str` -/// -/// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction -/// -/// *initiator* : fn `file_handler`, fn `utils::run_daemons` -/// -/// *managing* : current file's name: &str, path in local storage to current file: &str -/// -/// *depends on* : - -/// -pub async fn create_watcher(filename: &str, path: &str) -> Result { - let src = format!("{}{}", path, filename); - let inotify: Inotify = Inotify::init()?; - inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; - Ok(inotify) -} + pub mod v2 { + use log::{error, info, warn}; + use crate::options::structs::{FileTriggerType, FileTriggersForController as Triggers, ProcessUnit}; + use super::*; + use std::{collections::HashMap, path::Path}; -/// # Fn `create_watcher` -/// ## for managing processes by checking dep files' states -/// -/// *input* : `&str`, `&[Files]`, `Arc>`, `Arc>>` -/// -/// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check -/// -/// *initiator* : fn `utils::running_handler` -/// -/// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` -/// -/// *depends on* : Files -/// -pub async fn file_handler( - name: &str, - files: &[Files], - tx: Arc>, - watchers: Arc>>, -) -> Result<(), CustomError> { - for (i, file) in files.iter().enumerate() { - // let src = format!("{}{}", file.src, file.filename); - if check_file(&file.filename, &file.src).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(CustomError::Fatal); + type MpscSender = Arc>; + type EventHandlers = HashMap, (Triggers, MpscSender)>; + + #[derive(Debug)] + enum FileState { + Ok, + NotFound, + } + + #[derive(Debug)] + pub struct FilesController { + name : Arc, + path : String, + code_name : Arc, + state : FileState, + watcher : Option, + triggers : EventHandlers, + } + + impl PartialEq for FilesController { + fn eq(&self, other: &Self) -> bool { + self.code_name == other.code_name } - match file.triggers.on_delete.as_str() { - "stay" => { - tx.send(9).await.unwrap(); - continue; - } - "stop" => { - if is_active(name).await { - tx.send(1).await.unwrap(); - } - return Err(CustomError::Fatal); - } - "hold" => { - if is_active(name).await { - tx.send(2).await.unwrap(); - return Err(CustomError::Fatal); - } - } - _ => { - tokio::time::sleep(Duration::from_millis(50)).await; - tx.send(101).await.unwrap(); - return Err(CustomError::Fatal); + } + + impl FilesController { + pub fn new(name: &str, triggers: EventHandlers) -> FilesController { + let name: Arc = Arc::from(name); + Self { + name : name.clone(), + path : String::new(), + state : FileState::Ok, + watcher : None, + triggers, + code_name : name.clone(), } } - } else if is_active(name).await && !is_frozen(name).await { - let watchers = watchers.clone(); - // println!("mutex: {:?}", watchers); - let mut buffer = [0; 128]; - let mut mutex_guard = watchers.lock().await; - if let Some(notify) = mutex_guard.get_mut(i) { - let events = notify.read_events(&mut buffer); - // println!("{:?}", events); - if events.is_ok() { - let events: Vec = events - .unwrap() - .map(|mask| mask.mask) - .filter(|mask| { - *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF - }) - .collect(); - for event in events { - if let EventMask::DELETE_SELF = event { - // ! warning (DELETE_SELF event) ! - // println!("! warning (DELETE_SELF event) !"); - // * watcher recreation after dealing with file recreation mechanism in text editors - let mutex = notify.borrow_mut(); - - // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); - if let Ok(watcher) = create_watcher(&file.filename, &file.src).await { - *mutex = watcher; - } + pub fn with_path(mut self, path: impl AsRef) -> anyhow::Result { + self.path = path.as_ref().to_string_lossy().into_owned(); + self.watcher = { + match create_watcher(&self.name, &self.path) { + Ok(val) => Some(val), + Err(er) => { + error!("Cannot create watcher for {} ({}) due to {}", self.name, &self.path, er); + return Err(er) } - match file.triggers.on_change.as_str() { - "stop" => { - let _ = tx.send(7).await; + } + }; + self.code_name = Arc::from(format!("{}{}", &self.path, &self.code_name)); + Ok(self) + } + pub fn add_event(&mut self, file_controller : FilesController) { + for (k, v) in file_controller.triggers { + self.triggers.entry(k).or_insert(v); + } + } + async fn trigger_on(&mut self, trigger_type: Option) { + for (prc_name, (triggers, channel)) in &self.triggers { + let msg = match &trigger_type { + None => { + Events::Positive(self.code_name.clone()) + }, + Some(event) => { + info!("Event on file {} ({}) : {}. Notifying `{}` ...", &self.name, &self.path, event, &prc_name); + event.event_from_file_trigger_controller(self.code_name.clone(), &triggers) + }, + }; + let _ = channel.send(msg).await; + } + } + } + #[async_trait] + impl ProcessUnit for FilesController { + async fn process(&mut self) { + // polling file check + // 1) existing check + // dbg!(&self); + if let Ok(_) = check_file(&self.name, &self.path).await { + if let FileState::NotFound = self.state { + info!("File {} ({}) was found in determined scope. Notifying ...", self.name, self.code_name); + self.state = FileState::Ok; + // reseting negative outcome in prc + self.trigger_on(None).await; + } + match &mut self.watcher { + Some(notify) => { + let mut buffer = [0; 1024]; + if let Ok(mut notif_events) = notify.read_events(&mut buffer) { + // notif_events.into_iter().for_each(|mask| {dbg!(&mask.mask);}); + // todo!(); + if let (recreate_watcher, true) = ( + notif_events.any(|mask| mask.mask == EventMask::DELETE_SELF), + notif_events.any(|mask| mask.mask == EventMask::MODIFY) + ) { + warn!("File {} ({}) was changed", self.name, &self.path); + if recreate_watcher { + self.watcher = match create_watcher(&self.name, &self.path) { + Ok(notifier) => Some(notifier), + Err(er) => { + error!("Failed to recreate watcher for {} ({}) due to {}", + self.name, + &self.path, + er + ); + None + }, + } + } + self.trigger_on(Some(FileTriggerType::OnChange)).await; + return; + } } - "restart" => { - let _ = tx.send(8).await; + }, + None => { /* DEAD END */}, + } + } else { + if let FileState::Ok = self.state { + warn!("File {} ({}) was not found in determined scope", self.name, &self.path); + self.state = FileState::NotFound; + self.trigger_on(Some(FileTriggerType::OnDelete)).await; + } + return; + } + self.trigger_on(None).await; + // 2) change check + } + } + } + + /// # Fn `create_watcher` + /// ## for creating watcher on file's delete | update events + /// + /// *input* : `&str`, `&str` + /// + /// *output* : `Err` if it cant create file watcher | `Ok(watcher)` on successfull construction + /// + /// *initiator* : fn `file_handler`, fn `utils::run_daemons` + /// + /// *managing* : current file's name: &str, path in local storage to current file: &str + /// + /// *depends on* : - + /// + pub fn create_watcher(filename: &str, path: &str) -> anyhow::Result { + let src = format!("{}{}", path, filename); + let inotify: Inotify = Inotify::init()?; + inotify.watches().add(&src, WatchMask::ALL_EVENTS)?; + Ok(inotify) + } + + /// # Fn `create_watcher` + /// ## for managing processes by checking dep files' states + /// + /// *input* : `&str`, `&[Files]`, `Arc>`, `Arc>>` + /// + /// *output* : `Err` if something with dep file is wrong | `Ok(())` on successfull dep file check + /// + /// *initiator* : fn `utils::running_handler` + /// + /// *managing* : current process's name: &str, list of dep files : `&[Files]`, atomic ref counter on sender main channel for current process `Arc>`, mut list of file watchers`Arc>>` + /// + /// *depends on* : Files + /// + pub async fn file_handler( + name: &str, + files: &[Files], + tx: Arc>, + watchers: Arc>>, + ) -> anyhow::Result<()> { + for (i, file) in files.iter().enumerate() { + // let src = format!("{}{}", file.src, file.filename); + if check_file(&file.filename, &file.src).await.is_err() { + if !is_active(name).await || is_frozen(name).await { + return Err(anyhow::Error::msg("Process is frozen or stopped")); + } + match file.triggers.on_delete.as_str() { + "stay" => { + tx.send(9).await.unwrap(); + continue; + } + "stop" => { + if is_active(name).await { + tx.send(1).await.unwrap(); + } + return Err(anyhow::Error::msg("Process was stopped")); + } + "hold" => { + if is_active(name).await { + tx.send(2).await.unwrap(); + return Err(anyhow::Error::msg("Process was frozen")); + } + } + _ => { + tokio::time::sleep(Duration::from_millis(50)).await; + tx.send(101).await.unwrap(); + return Err(anyhow::Error::msg("Impermissible character or word in file trigger")); + } + } + } else if is_active(name).await && !is_frozen(name).await { + let watchers = watchers.clone(); + // println!("mutex: {:?}", watchers); + let mut buffer = [0; 128]; + let mut mutex_guard = watchers.lock().await; + if let Some(notify) = mutex_guard.get_mut(i) { + let events = notify.read_events(&mut buffer); + // println!("{:?}", events); + if events.is_ok() { + let events: Vec = events + .unwrap() + .map(|mask| mask.mask) + .filter(|mask| { + *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF + }) + .collect(); + for event in events { + if let EventMask::DELETE_SELF = event { + // ! warning (DELETE_SELF event) ! + // println!("! warning (DELETE_SELF event) !"); + // * watcher recreation after dealing with file recreation mechanism in text editors + let mutex = notify.borrow_mut(); + + // *mutex = create_watcher(&file.filename, &file.src).await.unwrap(); + if let Ok(watcher) = create_watcher(&file.filename, &file.src) { + *mutex = watcher; + } } - "stay" => { - let _ = tx.send(9).await; - } - _ => { - let _ = tx.send(101).await; + match file.triggers.on_change.as_str() { + "stop" => { + let _ = tx.send(7).await; + } + "restart" => { + let _ = tx.send(8).await; + } + "stay" => { + let _ = tx.send(9).await; + } + _ => { + let _ = tx.send(101).await; + } } } } } } } + tokio::task::yield_now().await; + Ok(()) } - tokio::task::yield_now().await; - Ok(()) -} -/// # Fn `check_file` -/// ## for checking existance of current file -/// -/// *input* : `&str`, `&str` -/// -/// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error -/// -/// *initiator* : fn `file_handler` -/// -/// *managing* : current file's name: `&str` and current file's path in local storage: `&str` -/// -/// *depends on* : network activity -/// -pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { - let arc_name = Arc::new(filename.to_string()); - let arc_path = Arc::new(path.to_string()); - tokio::task::spawn_blocking(move || { - let file_concat = format!("{}{}", arc_path, arc_name); - let path = Path::new(&file_concat); - if path.exists() { - Ok(()) - } else { - Err(CustomError::Fatal) + /// # Fn `check_file` + /// ## for checking existance of current file + /// + /// *input* : `&str`, `&str` + /// + /// *output* : `Ok(())` if file exists | `Err(_)` if not | panic on fs error + /// + /// *initiator* : fn `file_handler` + /// + /// *managing* : current file's name: `&str` and current file's path in local storage: `&str` + /// + /// *depends on* : network activity + /// + pub async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { + let arc_name = Arc::new(filename.to_string()); + let arc_path = Arc::new(path.to_string()); + tokio::task::spawn_blocking(move || { + let file_concat = format!("{}{}", arc_path, arc_name); + let path = Path::new(&file_concat); + if path.exists() { + Ok(()) + } else { + Err(CustomError::Fatal) + } + }) + .await + .unwrap_or_else(|_| { + panic!("Corrupted while file check process"); + }) + } + + #[cfg(test)] + mod files_unittests { + use super::*; + #[tokio::test] + async fn try_to_create_watcher() { + let res = create_watcher("dep-file", "./tests/examples/"); + assert!(res.is_ok()); + } + #[tokio::test] + async fn try_to_create_invalid_watcher() { + let res = create_watcher("invalid-file", "/path/to/the/no/dir"); + assert!(res.is_err()); + } + #[tokio::test] + async fn check_existing_file() { + let res = check_file("dep-file", "./tests/examples/").await; + assert!(res.is_ok()); + } + #[tokio::test] + async fn check_non_existing_file() { + let res = check_file("invalid-file", "/path/to/the/no/dir").await; + assert!(res.is_err()); } - }) - .await - .unwrap_or_else(|_| { - panic!("Corrupted while file check process"); - }) -} - -#[cfg(test)] -mod files_unittests { - use super::*; - #[tokio::test] - async fn try_to_create_watcher() { - let res = create_watcher("dep-file", "./tests/examples/").await; - assert!(res.is_ok()); } - #[tokio::test] - async fn try_to_create_invalid_watcher() { - let res = create_watcher("invalid-file", "/path/to/the/no/dir").await; - assert!(res.is_err()); - } - #[tokio::test] - async fn check_existing_file() { - let res = check_file("dep-file", "./tests/examples/").await; - assert!(res.is_ok()); - } - #[tokio::test] - async fn check_non_existing_file() { - let res = check_file("invalid-file", "/path/to/the/no/dir").await; - assert!(res.is_err()); - } -} diff --git a/noxis-rs/src/utils/prcs.rs b/noxis-rs/src/utils/prcs.rs index a434c43..3f10903 100644 --- a/noxis-rs/src/utils/prcs.rs +++ b/noxis-rs/src/utils/prcs.rs @@ -1,8 +1,132 @@ -use crate::options::structs::CustomError; use log::{error, warn}; use std::process::{Command, Output}; use std::sync::Arc; use tokio::time::Duration; +use crate::options::structs::{ProcessState, Events, NegativeOutcomes, ProcessUnit}; +use std::collections::HashSet; +use tokio::sync::mpsc::Receiver as MpscReciever; +use async_trait::async_trait; + +pub mod v2 { + use log::info; + use crate::options::structs::DependencyType; + use std::path::Path; + + use super::*; + + #[derive(Debug)] + pub struct ProcessesController { + name: Arc, + bin: String, + // obj: Arc, + state: ProcessState, + event_reader: MpscReciever, + negative_events: HashSet>, + } + + impl PartialEq for ProcessesController { + fn eq(&self, other: &Self) -> bool { + self.bin == other.bin + } + } + + impl ProcessesController { + pub fn new(name: &str, event_reader: MpscReciever) -> ProcessesController { + ProcessesController { + name : Arc::from(name), + bin : String::new(), + state : ProcessState::Stopped, + event_reader, + negative_events : HashSet::new(), + } + } + pub fn with_exe(mut self, bin: impl AsRef) -> ProcessesController { + self.bin = bin.as_ref().to_string_lossy().into_owned(); + self + } + + async fn trigger_on(&mut self, dep_name: &str, trigger: &str, dep_type: DependencyType) { + match trigger { + "stay" => { + info!("Event on {} `{}` for {}. Ignoring ...", dep_type, dep_name, self.name); + }, + "stop" => { + if is_active(&self.name).await { + info!("Event on {} `{}` for {}. Stopping ...", dep_type, dep_name, self.name); + terminate_process(&self.name).await; + self.state = ProcessState::Stopped; + } + }, + "hold" => { + if !is_frozen(&self.name).await { + info!("Event on {} `{}` for {}. Freezing ...", dep_type, dep_name, self.name); + freeze_process(&self.name).await; + self.state = ProcessState::Holding; + } + }, + "restart" => { + info!("Event on {} `{}` for {}. Restarting ...", dep_type, dep_name, self.name); + let _ = restart_process(&self.name, &self.bin).await; + }, + _ => error!("Impermissible trigger in file-trigger for {}. Ignoring event ...", self.name), + } + tokio::time::sleep(Duration::from_micros(100)).await; + } + } + + #[async_trait] + impl ProcessUnit for ProcessesController { + async fn process(&mut self) { + if self.negative_events.len() == 0 { + match self.state { + ProcessState::Holding => { + info!("No negative dependecies events on {} process. Unfreezing ...", self.name); + if let Err(er) = unfreeze_process(&self.name).await { + error!("Cannot unfreeze process {} : {}", self.name, er); + } else { + self.state = ProcessState::Pending; + } + }, + ProcessState::Stopped => { + info!("No negative dependecies events on {} process. Starting ...", self.name); + if let Err(er) = start_process(&self.name, &self.bin).await { + error!("Cannot start process {} : {}", self.name, er); + } else { + self.state = ProcessState::Pending; + } + }, + _ => {}, + } + } + while let Ok(event) = self.event_reader.try_recv() { + match event { + Events::Positive(target) => { + if self.negative_events.contains(&target) { + self.negative_events.remove(&target); + } + }, + Events::Negative(event) => { + match event { + NegativeOutcomes::FileWasChanged(target, dep_type, trigger) | + NegativeOutcomes::FileWasMovedOrDeleted(target, dep_type, trigger) | + NegativeOutcomes::ServiceIsUnreachable(target, dep_type, trigger) => { + if !self.negative_events.contains(&target) { + self.negative_events.insert(target.clone()); + + self.trigger_on( + &target, + &trigger, + dep_type + ).await; + } + }, + } + }, + } + } + } + } +} /// # Fn `get_pid` /// ## for initializing process of unstoppable grubbing metrics. @@ -162,14 +286,11 @@ pub async fn freeze_process(name: &str) { /// /// *depends on* : - /// -pub async fn unfreeze_process(name: &str) { +pub async fn unfreeze_process(name: &str) -> anyhow::Result<()> { let _ = Command::new("pkill") .args(["-CONT", name]) - .output() - .unwrap_or_else(|_| { - error!("Failed to unfreeze process"); - std::process::exit(101); - }); + .output()?; + Ok(()) } /// # Fn `restart_process` @@ -185,7 +306,7 @@ pub async fn unfreeze_process(name: &str) { /// /// *depends on* : fn `start_process`, fn `terminate_process` /// -pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> { +pub async fn restart_process(name: &str, path: &str) -> anyhow::Result<()> { terminate_process(name).await; tokio::time::sleep(Duration::from_millis(100)).await; start_process(name, path).await @@ -204,7 +325,7 @@ pub async fn restart_process(name: &str, path: &str) -> Result<(), CustomError> /// /// *depends on* : - /// -pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { +pub async fn start_process(name: &str, path: &str) -> anyhow::Result<()> { // let runsh = format!("{} {}", "exec", path); let mut command = Command::new(path); // command.arg(path); @@ -215,8 +336,7 @@ pub async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { Ok(()) } Err(er) => { - println!("{:?}", er); - Err(CustomError::Fatal) + Err(anyhow::Error::msg(format!("Cannot start process {} due to {}", name, er))) } } } diff --git a/noxis-rs/src/utils/services.rs b/noxis-rs/src/utils/services.rs index fb51f7d..a381cb1 100644 --- a/noxis-rs/src/utils/services.rs +++ b/noxis-rs/src/utils/services.rs @@ -1,10 +1,193 @@ -use crate::options::structs::{CustomError, Services}; -use super::prcs::{is_active, is_frozen}; +use crate::options::structs::CustomError; use log::{error, warn}; use std::net::{TcpStream, ToSocketAddrs}; use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; +use tokio::sync::mpsc::Sender as Sender; +use async_trait::async_trait; + +pub mod v2 { + use log::info; + + use crate::options::structs::{Triggers, ProcessUnit, Events, ServiceState}; + + use super::*; + use std::collections::{HashMap, BTreeMap, VecDeque}; + + type MpscSender = Arc>; + // type EventHandlers<'a> = Vec>>; + type EventHandlers = HashMap, (Triggers, MpscSender)>; + // type wrapper for service wait queue + type ConnectionQueue = BTreeMap>>; + + #[derive(Debug)] + pub struct ServicesController { + // i.e. yandex.ru + #[allow(unused)] + name : String, + // i.e. yandex.ru:443 + access_url : Arc, + // "OK" or "Unavailable" + state: ServiceState, + // btree map with key as max wait time and it's key to hashmap + config: ConnectionQueue, + // Map of processes with their (trigger and mpsc sender) + event_registrator : EventHandlers, + } + + impl PartialEq for ServicesController { + fn eq(&self, other: &Self) -> bool { + self.access_url == other.access_url + } + } + + impl ServicesController { + pub fn new() -> ServicesController { + ServicesController { + name : String::new(), + access_url : Arc::from(String::new()), + state : ServiceState::Unavailable, + config: ConnectionQueue::new(), + event_registrator : EventHandlers::new(), + } + } + pub fn with_access_name( + mut self, + hostname: &str, + access_url: &str, + ) -> ServicesController { + self.name = hostname.to_string(); + self.access_url = Arc::from(access_url); + self + } + + pub fn with_params( + mut self, + conn_queue: ConnectionQueue, + event_reg: EventHandlers, + ) -> ServicesController { + self.config = conn_queue; + self.event_registrator = event_reg; + self + } + + pub fn get_access_url(hostname: &str, port: Option<&u32>) -> String { + format!("{}{}", hostname, port.map_or_else(|| "".to_string(), |p| format!(":{}", p))) + } + pub fn add_process( + &mut self, + proc_name: &str, + trigger: Triggers, + sender: MpscSender, + ) { + let proc_name: Arc = Arc::from(proc_name); + // queue add + if let Triggers::Service { wait, .. } = trigger { + self.config.entry(wait) + .and_modify(|el| el.push_back(proc_name.clone())) + .or_insert({ + let mut temp = VecDeque::new(); + temp.push_back(proc_name.clone()); + temp + }); + } + // event add + self.event_registrator.entry(proc_name).or_insert((trigger, sender)); + } + async fn check_state(&self) -> anyhow::Result<()> { + let mut addrs = self.access_url.to_socket_addrs()?; + if !addrs.any(|a| TcpStream::connect_timeout(&a, Duration::new(1, 0)).is_ok()) { + return Err(anyhow::Error::msg(format!("No access to service `{}`", &self.access_url))) + } + Ok(()) + } + async fn trigger_on(&mut self) { + match self.state { + ServiceState::Ok => { + let _ = self.event_registrator + .iter() + .map(|(_, (_, el))| async { + let _ = el.send(Events::Positive(self.access_url.clone())).await; + }); + }, + ServiceState::Unavailable => { + // looped check and notifying + self.looped_check().await; + }, + } + } + async fn looped_check(self: &mut Self) { + let longest = self.config.last_entry().unwrap(); + let longest = longest.key(); + let mut interapter = tokio::time::interval(tokio::time::Duration::from_secs(1)); + let timer = tokio::time::Instant::now(); + let mut attempt: u32 = 1; + let access_url = Arc::new(self.access_url.clone()); + // let event_registrator = &mut self.event_registrator; + + if let Err(_) = tokio::time::timeout(tokio::time::Duration::from_secs((longest + 1) as u64), async { + // let access_url = access_url.clone(); + loop { + interapter.tick().await; + info!("Trying to connect to {} (attempt: {}) ...", &access_url, attempt); + attempt += 1; + + let state_check_result = self.check_state().await; + + if state_check_result.is_ok() { + info!("Connection to {} is `OK` now", &access_url); + self.state = ServiceState::Ok; + break; + } else { + let now = timer.elapsed(); + let iterator = self.config.iter() + .filter(|(&a, _)| tokio::time::Duration::from_secs(a as u64) <= now) + .flat_map(|(_, a)| a.iter().cloned()) + .collect::>>(); + + for name in iterator { + let proc_name = name.to_string(); + info!("Trying to notify process `{}` ...", &proc_name); + let sender_opt = self.event_registrator.get(&name) + .map(|(trigger, sender)| + (trigger.to_service_negative_event(name.clone()), sender) + ); + + if let Some((tr, tx)) = sender_opt { + let _ = tx.send(tr.unwrap()).await; + } else { + error!("Cannot find {} channel sender in {} service", name.clone(), &self.access_url) + } + } + } + } + }).await { + info!("Timeout of establishing connection to {}. ", &access_url); + } + } + } + #[async_trait] + impl ProcessUnit for ServicesController { + async fn process(&mut self) { + // check_service(hostname, port) + let current_state = self.check_state().await; + match (&self.state, current_state) { + (ServiceState::Unavailable, Ok(_)) => { + warn!("Connection with `{}` service was established. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); + self.state = ServiceState::Ok; + self.trigger_on().await; + }, + (ServiceState::Ok, Err(_)) => { + warn!("Unreachable for connection service `{}`. Notifying {} process(es) ...", &self.access_url, &self.event_registrator.len()); + self.state = ServiceState::Unavailable; + self.trigger_on().await; + }, + (ServiceState::Unavailable, Err(_)) => warn!("Service {} is still unreachable", &self.access_url), + _ => { /* DEAD END WITH NO INTEREST */ }, + } + } + } +} /// # Fn `service_handler` /// ## function to realize mechanism of current process' dep services monitoring @@ -19,53 +202,53 @@ use tokio::time::{Duration, Instant}; /// /// *depends on* : fn `check_service`, fn `utils::prcs::is_active`, fn `utils::prcs::is_frozen`, fn `looped_service_connecting` /// -pub async fn service_handler( - name: &str, - services: &Vec, - tx: Arc>, -) -> Result<(), CustomError> { - // println!("service daemon on {}", name); - for serv in services { - if check_service(&serv.hostname, &serv.port).await.is_err() { - if !is_active(name).await || is_frozen(name).await { - return Err(CustomError::Fatal); - } - error!( - "Service {}:{} is unreachable for process {}", - &serv.hostname, &serv.port, &name - ); - match serv.triggers.on_lost.as_str() { - "stay" => { - tx.send(4).await.unwrap(); - continue; - } - "stop" => { - if looped_service_connecting(name, serv).await.is_err() { - tx.send(5).await.unwrap(); - tokio::task::yield_now().await; - return Err(CustomError::Fatal); - } - } - "hold" => { - // if is_frozen(name).await { - // return Err(CustomError::Fatal); - // } - if looped_service_connecting(name, serv).await.is_err() { - tx.send(6).await.unwrap(); - tokio::task::yield_now().await; - return Err(CustomError::Fatal); - } - } - _ => { - tx.send(101).await.unwrap(); - return Err(CustomError::Fatal); - } - } - } - } - tokio::time::sleep(Duration::from_millis(100)).await; - Ok(()) -} +// pub async fn service_handler( +// name: &str, +// services: &Vec, +// tx: Arc>, +// ) -> Result<(), CustomError> { +// // println!("service daemon on {}", name); +// for serv in services { +// if check_service(&serv.hostname, &serv.port).await.is_err() { +// if !is_active(name).await || is_frozen(name).await { +// return Err(CustomError::Fatal); +// } +// error!( +// "Service {}:{} is unreachable for process {}", +// &serv.hostname, &serv.port, &name +// ); +// match serv.triggers.on_lost.as_str() { +// "stay" => { +// tx.send(4).await.unwrap(); +// continue; +// } +// "stop" => { +// if looped_service_connecting(name, serv).await.is_err() { +// tx.send(5).await.unwrap(); +// tokio::task::yield_now().await; +// return Err(CustomError::Fatal); +// } +// } +// "hold" => { +// // if is_frozen(name).await { +// // return Err(CustomError::Fatal); +// // } +// if looped_service_connecting(name, serv).await.is_err() { +// tx.send(6).await.unwrap(); +// tokio::task::yield_now().await; +// return Err(CustomError::Fatal); +// } +// } +// _ => { +// tx.send(101).await.unwrap(); +// return Err(CustomError::Fatal); +// } +// } +// } +// } +// tokio::time::sleep(Duration::from_millis(100)).await; +// Ok(()) +// } /// # Fn `looped_service_connecting` /// ## for service's state check in loop (with delay and restriction of attempts) @@ -80,54 +263,54 @@ pub async fn service_handler( /// /// *depends on* : fn `check_service` /// -async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { - if serv.triggers.wait == 0 { - loop { - tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!( - "Attempting to connect from {} process to {}:{}", - &name, &serv.hostname, &serv.port - ); - match check_service(&serv.hostname, &serv.port).await { - Ok(_) => { - log::info!( - "Successfully connected to {} from {} process!", - &serv.hostname, - &name - ); - break; - } - Err(_) => { - tokio::task::yield_now().await; - } - } - } - Ok(()) - } else { - let start = Instant::now(); - while start.elapsed().as_secs() < serv.triggers.wait.into() { - tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; - warn!( - "Attempting to connect from {} process to {}:{}", - &name, &serv.hostname, &serv.port - ); - match check_service(&serv.hostname, &serv.port).await { - Ok(_) => { - log::info!( - "Successfully connected to {} from {} process!", - &serv.hostname, - &name - ); - return Ok(()); - } - Err(_) => { - tokio::task::yield_now().await; - } - } - } - Err(CustomError::Fatal) - } -} +// async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { +// if serv.triggers.wait == 0 { +// loop { +// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; +// warn!( +// "Attempting to connect from {} process to {}:{}", +// &name, &serv.hostname, &serv.port +// ); +// match check_service(&serv.hostname, &serv.port).await { +// Ok(_) => { +// log::info!( +// "Successfully connected to {} from {} process!", +// &serv.hostname, +// &name +// ); +// break; +// } +// Err(_) => { +// tokio::task::yield_now().await; +// } +// } +// } +// Ok(()) +// } else { +// let start = Instant::now(); +// while start.elapsed().as_secs() < serv.triggers.wait.into() { +// tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; +// warn!( +// "Attempting to connect from {} process to {}:{}", +// &name, &serv.hostname, &serv.port +// ); +// match check_service(&serv.hostname, &serv.port).await { +// Ok(_) => { +// log::info!( +// "Successfully connected to {} from {} process!", +// &serv.hostname, +// &name +// ); +// return Ok(()); +// } +// Err(_) => { +// tokio::task::yield_now().await; +// } +// } +// } +// Err(CustomError::Fatal) +// } +// } /// # Fn `check_service` /// ## for check current service's availiability