From b6ecb10a77e8fc558a4cea61f65dd715371aa6ad Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 10 Dec 2024 18:22:16 +0300 Subject: [PATCH 01/12] utils rework --- noxis-rs/src/options/preboot.rs | 1 + noxis-rs/src/utils.rs | 238 ++++++++++++++++---------------- 2 files changed, 123 insertions(+), 116 deletions(-) diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index af071ab..6ecb242 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -117,6 +117,7 @@ impl PrebootParams { eprintln!("Local config file {} doesn't exist", &self.config.display()); return Err(Error::msg("Local Config Not Found. Cannot start")); } + // redis server check Ok(self) } } diff --git a/noxis-rs/src/utils.rs b/noxis-rs/src/utils.rs index ac87f1e..86e63f5 100644 --- a/noxis-rs/src/utils.rs +++ b/noxis-rs/src/utils.rs @@ -4,8 +4,9 @@ pub mod metrics; pub mod prcs; pub mod services; -// +// TODO : saving current flags state +use crate::options::structs::CustomError; use crate::options::structs::TrackingProcess; use files::create_watcher; use files::file_handler; @@ -60,125 +61,130 @@ pub async fn run_daemons( loop { let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone()); tokio::select! { - _ = run_hand => {}, - _val = rx.recv() => { - match _val.unwrap() { - // 1 - File-dependency handling error -> terminating (after waiting) - 1 => { - if is_active(&proc.name).await { - error!("File-dependency handling error: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - return; - }, - // 2 - File-dependency handling error -> holding (after waiting) - 2 => { - if !is_frozen(&proc.name).await { - error!("File-dependency handling error: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - }, - // 3 - Running process error - 3 => { - error!("Error due to starting {} process", &proc.name); - break; - }, - // 4 - Timeout of waiting service-dependency -> staying (after waiting) - 4 => { - // warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 5 - Timeout of waiting service-dependency -> terminating (after waiting) - 5 => { - if is_active(&proc.name).await { - error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(1000)).await; - } - }, - // 6 - Timeout of waiting service-dependency -> holding (after waiting) - 6 => { - // println!("holding {}-{}", proc.name, is_active(&proc.name).await); - if !is_frozen(&proc.name).await { - error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; - tokio::time::sleep(Duration::from_secs(1)).await; - } - }, - // // 7 - File-dependency change -> terminating (after check) - 7 => { - error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); - terminate_process(&proc.name).await; - tokio::time::sleep(Duration::from_millis(100)).await; - return; - }, - // // 8 - File-dependency change -> restarting (after check) - 8 => { - warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); - let _ = restart_process(&proc.name, &proc.path).await; - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // // 9 - File-dependency change -> staying (after check) - 9 => { - // no need to trash logs - warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); - tokio::time::sleep(Duration::from_millis(100)).await; - }, - - // 10 - Process unfreaze call via file handler (or service handler) - 10 | 11 => { - if is_frozen(&proc.name).await { - warn!("Unfreezing process {} call...", &proc.name); - unfreeze_process(&proc.name).await; - } - tokio::time::sleep(Duration::from_millis(100)).await; - }, - // 11 - Process unfreaze call via service handler - // 11 => { - // if is_frozen(&proc.name).await { - // warn!("Unfreezing process {} call...", &proc.name); - // unfreeze_process(&proc.name).await; - // } - // tokio::time::sleep(Duration::from_millis(100)).await; - // }, - // 101 - Impermissible trigger values in JSON - 101 => { - error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", proc.name); - if is_active(&proc.name).await { - terminate_process(&proc.name).await; - } - break; - }, - // - // 121 - Cannot create valid watcher for file dependency - 121 => { - error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name); - let _ = terminate_process("runner-rs").await; - break; - }, - // 111 - global thread termination with killing current child in a face - // of a current process - 111 => { - warn!("Terminating {}'s child processes...", &proc.name); - match is_active(&proc.name).await { - true => { - terminate_process(&proc.name).await; - }, - false => { - log::info!("Process {} is already terminated!", proc.name); - }, - } - break; - }, - _ => {}, - } + _ = run_hand => continue, + _val = rx.recv() => { + if process_protocol_symbol(proc.clone(), _val.unwrap()).await.is_err() { + return; + } }, } tokio::task::yield_now().await; } - tokio::task::yield_now().await; +} + +async fn process_protocol_symbol(proc: Arc, val: u8) -> Result<(), CustomError>{ + match val { + // 1 - File-dependency handling error -> terminating (after waiting) + 1 => { + if is_active(&proc.name).await { + error!("File-dependency handling error: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(500)).await; + } + // return; + }, + // 2 - File-dependency handling error -> holding (after waiting) + 2 => { + if !is_frozen(&proc.name).await { + error!("File-dependency handling error: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + }, + // 3 - Running process error + 3 => { + error!("Error due to starting {} process", &proc.name); + return Err(CustomError::Fatal) + }, + // 4 - Timeout of waiting service-dependency -> staying (after waiting) + 4 => { + // warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name); + tokio::time::sleep(Duration::from_millis(100)).await; + }, + // 5 - Timeout of waiting service-dependency -> terminating (after waiting) + 5 => { + if is_active(&proc.name).await { + error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(500)).await; + } + }, + // 6 - Timeout of waiting service-dependency -> holding (after waiting) + 6 => { + // println!("holding {}-{}", proc.name, is_active(&proc.name).await); + if !is_frozen(&proc.name).await { + error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + tokio::time::sleep(Duration::from_secs(1)).await; + } + }, + // // 7 - File-dependency change -> terminating (after check) + 7 => { + error!("File-dependency warning (file changed). Terminating {} process...", &proc.name); + terminate_process(&proc.name).await; + tokio::time::sleep(Duration::from_millis(100)).await; + return Err(CustomError::Fatal) + }, + // // 8 - File-dependency change -> restarting (after check) + 8 => { + warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name); + let _ = restart_process(&proc.name, &proc.path).await; + tokio::time::sleep(Duration::from_millis(100)).await; + }, + // // 9 - File-dependency change -> staying (after check) + 9 => { + // no need to trash logs + warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); + tokio::time::sleep(Duration::from_millis(100)).await; + }, + + // 10 - Process unfreaze call via file handler (or service handler) + 10 | 11 => { + if is_frozen(&proc.name).await { + warn!("Unfreezing process {} call...", &proc.name); + unfreeze_process(&proc.name).await; + } + tokio::time::sleep(Duration::from_millis(100)).await; + }, + // 11 - Process unfreaze call via service handler + // 11 => { + // if is_frozen(&proc.name).await { + // warn!("Unfreezing process {} call...", &proc.name); + // unfreeze_process(&proc.name).await; + // } + // tokio::time::sleep(Duration::from_millis(100)).await; + // }, + // 101 - Impermissible trigger values in JSON + 101 => { + error!("Impermissible trigger values in JSON in {}'s block. Killing thread...", proc.name); + if is_active(&proc.name).await { + terminate_process(&proc.name).await; + } + return Err(CustomError::Fatal) + }, + // + // 121 - Cannot create valid watcher for file dependency + 121 => { + error!("Cannot create valid watcher for {}'s file dependency. Terminating thread...", proc.name); + let _ = terminate_process("runner-rs").await; + return Err(CustomError::Fatal) + }, + // 111 - global thread termination with killing current child in a face + // of a current process + 111 => { + warn!("Terminating {}'s child processes...", &proc.name); + match is_active(&proc.name).await { + true => { + terminate_process(&proc.name).await; + }, + false => { + log::info!("Process {} is already terminated!", proc.name); + }, + } + }, + _ => {}, + } + Ok(()) } // check process status daemon /// # Fn `run_daemons` -- 2.40.1 From 64691306620f554fcd04eae9a9782d6b9e5dec5e Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 10 Dec 2024 18:23:02 +0300 Subject: [PATCH 02/12] added lib to export structs + func to init listener in noxis-rs --- noxis-cli/src/lib.rs | 3 +++ noxis-rs/src/main.rs | 9 ++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 noxis-cli/src/lib.rs diff --git a/noxis-cli/src/lib.rs b/noxis-cli/src/lib.rs new file mode 100644 index 0000000..163fc0a --- /dev/null +++ b/noxis-cli/src/lib.rs @@ -0,0 +1,3 @@ +mod cli; + +pub use cli::*; \ No newline at end of file diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 1c0288d..97d9d8b 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -3,14 +3,16 @@ mod utils; use clap::Parser; use log::{error, info}; -use options::{config::*, preboot}; +use options::config::*; use options::logger::setup_logger; use options::signals::set_valid_destructor; use options::structs::Processes; +use options::cli_pipeline::init_cli_pipeline; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use utils::*; + #[allow(unused_imports)] use options::preboot::PrebootParams; @@ -87,6 +89,11 @@ async fn main() { let _ = subscribe_config_stream(Arc::new(processes)).await; })); + // cli pipeline + handler.push(tokio::spawn(async move { + let _ = init_cli_pipeline().await; + })); + for i in handler { let _ = i.await; } -- 2.40.1 From 56769f54b97858ef62f4a1961dc186555ff7bd77 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 10 Dec 2024 18:23:17 +0300 Subject: [PATCH 03/12] new listener func --- noxis-rs/src/options.rs | 3 +- noxis-rs/src/options/cli_pipeline.rs | 61 ++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 noxis-rs/src/options/cli_pipeline.rs diff --git a/noxis-rs/src/options.rs b/noxis-rs/src/options.rs index 0b4b3cf..e53e2d0 100644 --- a/noxis-rs/src/options.rs +++ b/noxis-rs/src/options.rs @@ -4,4 +4,5 @@ pub mod config; pub mod logger; pub mod signals; pub mod structs; -pub mod preboot; \ No newline at end of file +pub mod preboot; +pub mod cli_pipeline; \ No newline at end of file diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs new file mode 100644 index 0000000..fbeb74d --- /dev/null +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -0,0 +1,61 @@ +use log::{error, info, warn}; +use tokio::net::{TcpListener, TcpStream}; +use anyhow::{Result as DynResult, Error}; +use tokio::time::{sleep, Duration}; +use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}}; +// use std::io::BufReader; +use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt}; + + +pub async fn init_cli_pipeline() -> DynResult<()> { + return match init_listener().await { + Some(list) => { + loop { + if let Ok((socket, addr)) = list.accept().await { + // isolation + if IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) != addr.ip() { + warn!("Declined attempt to connect TCP-socket from {}", addr); + continue; + } + process_connection(socket).await; + } + sleep(Duration::from_millis(500)).await; + } + // Ok(()) + }, + None => Err(Error::msg("Addr 127.0.0.1:7753 is already in use")) + } +} + +async fn init_listener() -> Option { + return match TcpListener::bind("127.0.0.1:7753").await { + Ok(listener) => { + info!("Runner is listening localhost:7753"); + Some(listener) + }, + Err(_) => { + error!("Cannot create TCP listener for CLI"); + None + } + } +} + +async fn process_connection(mut stream: TcpStream) { + // loop{ + // stream. + // } + let buf_reader = BufReader::new(stream.borrow_mut()); + let mut rqst = buf_reader.lines(); + + while let Ok(Some(line)) = rqst.next_line().await { + if line.is_empty() { + break; + } + println!("{}", line); + } + // .map(|result| result.unwrap()) + // .take_while(|line| !line.is_empty()) + // .collect(); + let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!"; + stream.write_all(response.as_bytes()).await.unwrap(); +} -- 2.40.1 From 2d225f4c09d28a78b3fdfbbba36fa60b6b82f543 Mon Sep 17 00:00:00 2001 From: prplV Date: Tue, 10 Dec 2024 18:50:30 +0300 Subject: [PATCH 04/12] versions fix --- noxis-cli/Cargo.toml | 2 +- noxis-rs/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/noxis-cli/Cargo.toml b/noxis-cli/Cargo.toml index 2b21f0a..fa4af59 100644 --- a/noxis-cli/Cargo.toml +++ b/noxis-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "noxis-cli" -version = "0.1.5" +version = "0.1.6" edition = "2021" [dependencies] diff --git a/noxis-rs/Cargo.toml b/noxis-rs/Cargo.toml index ed2f9c3..cf86d18 100644 --- a/noxis-rs/Cargo.toml +++ b/noxis-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "noxis-rs" -version = "0.10.11" +version = "0.11.3" edition = "2021" [dependencies] -- 2.40.1 From e7817a97b67f4eeab49d9f550525ff9fcea30d97 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 11 Dec 2024 13:11:15 +0300 Subject: [PATCH 05/12] () -> anyhow::result<()> --- noxis-rs/src/main.rs | 17 +++++++++-------- noxis-rs/src/options/preboot.rs | 9 +++------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 97d9d8b..616802b 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -1,6 +1,7 @@ mod options; mod utils; +use anyhow::Error; use clap::Parser; use log::{error, info}; use options::config::*; @@ -17,12 +18,12 @@ use utils::*; use options::preboot::PrebootParams; #[tokio::main(flavor = "multi_thread")] -async fn main() { - let preboot = PrebootParams::parse().validate(); +async fn main() -> anyhow::Result<()>{ + let preboot = PrebootParams::parse().validate()?; - if let Err(_) = preboot { - return; - } + // if let Err(_) = preboot { + // return; + // } let _ = setup_logger(); @@ -32,7 +33,7 @@ async fn main() { // then conf checks to choose the most actual \ let processes: Processes = get_actual_config().await.unwrap_or_else(|| { error!("No actual configuration for runner. Stopping..."); - std::process::exit(101); + std::process::exit(1); }); info!( @@ -43,7 +44,7 @@ async fn main() { if processes.processes.is_empty() { error!("Processes list is null, runner-rs initialization is stopped"); - return; + return Err(Error::msg("Empty processes segment in config")); } let mut handler: Vec> = vec![]; // is in need to send to the signals handler thread @@ -97,7 +98,7 @@ async fn main() { for i in handler { let _ = i.await; } - return; + Ok(()) } // todo: integration tests diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 6ecb242..c2241e0 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -104,18 +104,15 @@ pub struct PrebootParams { impl PrebootParams { pub fn validate(self) -> Result { if !self.socket_path.exists() { - eprintln!("Socket-file {} doesn't exist. Cannot start", &self.socket_path.display()); - return Err(Error::msg("Socket-file Not Found")); + return Err(Error::msg("Socket-file not found or Noxis can't read it. Cannot start")); } // existing log dir if !self.log_to.exists() { - eprintln!("Log directory {} doesn't exist", &self.log_to.display()); - return Err(Error::msg("Log Directory Not Found. Cannot start")); + return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start")); } // existing sock file if !self.config.exists() { - eprintln!("Local config file {} doesn't exist", &self.config.display()); - return Err(Error::msg("Local Config Not Found. Cannot start")); + return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start")); } // redis server check Ok(self) -- 2.40.1 From da3d8cd12912020d76ab07920dd9c79ab2092094 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 11 Dec 2024 13:38:29 +0300 Subject: [PATCH 06/12] config preboot adj --- noxis-rs/src/main.rs | 7 ++-- noxis-rs/src/options/config.rs | 60 +++++++++++++++++++-------------- noxis-rs/src/options/preboot.rs | 14 ++++---- 3 files changed, 46 insertions(+), 35 deletions(-) diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 616802b..7ca43bc 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -19,11 +19,12 @@ use options::preboot::PrebootParams; #[tokio::main(flavor = "multi_thread")] async fn main() -> anyhow::Result<()>{ - let preboot = PrebootParams::parse().validate()?; + let preboot = Arc::new(PrebootParams::parse().validate()?); // if let Err(_) = preboot { // return; // } + // let preboot = Arc::new(preboot); let _ = setup_logger(); @@ -31,7 +32,7 @@ async fn main() -> anyhow::Result<()>{ // setting up redis connection \ // then conf checks to choose the most actual \ - let processes: Processes = get_actual_config().await.unwrap_or_else(|| { + let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { error!("No actual configuration for runner. Stopping..."); std::process::exit(1); }); @@ -87,7 +88,7 @@ async fn main() -> anyhow::Result<()>{ // remote config update subscription handler.push(tokio::spawn(async move { - let _ = subscribe_config_stream(Arc::new(processes)).await; + let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; })); // cli pipeline diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 5f865ea..2088729 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -8,6 +8,7 @@ use std::process::Command; use std::sync::Arc; use std::{env, fs}; use tokio::time::Duration; +use super::preboot::PrebootParams; const CONFIG_PATH: &str = "settings.json"; @@ -46,43 +47,49 @@ fn load_processes(json_filename: &str) -> Option { /// /// *depends on* : struct `Processes` /// -pub async fn get_actual_config() -> Option { +pub async fn get_actual_config(params : Arc) -> Option { // * if no local conf -> loop and +inf getting conf from redis server // * if local conf -> once getting conf from redis server - match load_processes(CONFIG_PATH) { + let config_path = params.config.to_str()?; + info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url); + match load_processes(config_path) { Some(local_conf) => { info!( "Found local configuration, version - {}", &local_conf.date_of_creation ); - if let Some(remote_conf) = - // TODO : rework with pubsub mech - once_get_remote_configuration(&format!("redis://{}/", local_conf.config_server)) - { - return match config_comparing(&local_conf, &remote_conf) { - ConfigActuality::Local => { - info!("Local config is actual"); - Some(local_conf) - } - ConfigActuality::Remote => { - info!("Pulled config is more actual. Saving changes!"); - if save_new_config(&remote_conf, CONFIG_PATH).is_err() { - error!("Saving changes process failed due to unexpected error...") + if !params.no_remote_config { + if let Some(remote_conf) = + // TODO : rework with pubsub mech + once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url)) + { + return match config_comparing(&local_conf, &remote_conf) { + ConfigActuality::Local => { + info!("Local config is actual"); + Some(local_conf) } - Some(remote_conf) - } - }; + ConfigActuality::Remote => { + info!("Pulled config is more actual. Saving changes!"); + if save_new_config(&remote_conf, CONFIG_PATH).is_err() { + error!("Saving changes process failed due to unexpected error...") + } + Some(remote_conf) + } + }; + } } Some(local_conf) } None => { warn!("No local valid conf was found. Trying to pull remote one..."); - let mut conn = get_connection_watcher(&open_watcher("redis://localhost/")); - let remote_config = get_remote_conf_watcher(&mut conn).await; - if let Some(conf) = remote_config { - info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); - let _ = save_new_config(&conf, CONFIG_PATH); - return Some(conf); + if !params.no_remote_config { + let mut conn = get_connection_watcher(&open_watcher("redis://localhost/")); + let remote_config = get_remote_conf_watcher(&mut conn).await; + if let Some(conf) = remote_config { + info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); + let _ = save_new_config(&conf, CONFIG_PATH); + return Some(conf); + } } None } @@ -311,7 +318,10 @@ fn restart_main_thread() -> std::io::Result<()> { /// /// *depends on* : `Processes` /// -pub async fn subscribe_config_stream(actual_prcs: Arc) -> Result<(), CustomError> { +pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { + if !(params.no_sub && params.no_remote_config) { + return Err(CustomError::Fatal); + } if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) { if let Ok(mut conn) = client.get_connection() { match crate::utils::get_container_id() { diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index c2241e0..7c9f687 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -4,7 +4,7 @@ use clap::Parser; use std::path::PathBuf; #[derive(clap::ValueEnum, Debug, Clone)] -enum MetricsPrebootParams { +pub enum MetricsPrebootParams { Full, System, Processes, @@ -68,28 +68,28 @@ pub struct PrebootParams { conflicts_with="no_hostagent", help="To set .sock file's path used in communication with host-agent" )] - socket_path : PathBuf, + pub socket_path : PathBuf, #[arg( long = "log-to", default_value="./", conflicts_with="no_logs", help="To set a path to logs directory" )] - log_to : PathBuf, + pub log_to : PathBuf, #[arg( long = "remote-server-url", - default_value="redis://localhost", + default_value="localhost", conflicts_with="no_remote_config", help = "To set url of remote config server using in remote config pulling mechanism" )] - remote_server_url : String, + pub remote_server_url : String, #[arg( long = "config", short, default_value="settings.json", help="To set local config file path" )] - config : PathBuf, + pub config : PathBuf, // value enum params (metrics) #[arg( @@ -98,7 +98,7 @@ pub struct PrebootParams { default_value_t=MetricsPrebootParams::Full, help="To set metrics grubbing mode" )] - metrics: MetricsPrebootParams, + pub metrics: MetricsPrebootParams, } impl PrebootParams { -- 2.40.1 From 6e86dcbf09d40dfbad5809acae5776f82802f4b0 Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 11 Dec 2024 13:42:11 +0300 Subject: [PATCH 07/12] config adj fix --- noxis-rs/src/options/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 2088729..d3cb106 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -319,7 +319,7 @@ fn restart_main_thread() -> std::io::Result<()> { /// *depends on* : `Processes` /// pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { - if !(params.no_sub && params.no_remote_config) { + if params.no_sub || params.no_remote_config { return Err(CustomError::Fatal); } if let Ok(client) = Client::open(format!("redis://{}/", &actual_prcs.config_server)) { -- 2.40.1 From a75160c3e2e8bf542fb5d9a3abd72a83f4c43827 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 12 Dec 2024 17:03:57 +0300 Subject: [PATCH 08/12] cli communication added --- noxis-cli/Cargo.toml | 2 ++ noxis-cli/src/cli.rs | 3 ++- noxis-cli/src/net.rs | 26 ++++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 noxis-cli/src/net.rs diff --git a/noxis-cli/Cargo.toml b/noxis-cli/Cargo.toml index fa4af59..22666eb 100644 --- a/noxis-cli/Cargo.toml +++ b/noxis-cli/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.6" edition = "2021" [dependencies] +anyhow = "1.0.94" clap = { version = "4.5.22", features = ["derive"] } serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.133" +tokio = { version = "1.42.0", features = ["full", "net"] } diff --git a/noxis-cli/src/cli.rs b/noxis-cli/src/cli.rs index 7be94f6..677e721 100644 --- a/noxis-cli/src/cli.rs +++ b/noxis-cli/src/cli.rs @@ -7,7 +7,8 @@ pub struct Cli { help = "to manage Noxis work", )] command : Commands, -} +} + #[derive(Debug, Subcommand)] pub enum Commands { #[command( diff --git a/noxis-cli/src/net.rs b/noxis-cli/src/net.rs new file mode 100644 index 0000000..d607c8c --- /dev/null +++ b/noxis-cli/src/net.rs @@ -0,0 +1,26 @@ +use tokio::net::TcpStream; +use tokio::io::AsyncWriteExt; +use tokio::time::{Duration, sleep}; +use anyhow::Result; +use super::Cli; + + +pub async fn create_tcp_stream() -> Result { + let stream = TcpStream::connect("127.0.0.1:7753").await?; + Ok(stream) +} + +pub async fn try_send(stream: Result, params: Cli) -> Result<()> { + let mut stream = stream?; + loop { + if stream.writable().await.is_err() { + sleep(Duration::from_millis(100)).await; + continue; + } + let msg = format!("{:?}", params); + // let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@"; + stream.write_all(msg.as_bytes()).await?; + // ... + } + Ok(()) +} \ No newline at end of file -- 2.40.1 From b51a3fb0f003fb709364c7a5b979b2133f60b035 Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 12 Dec 2024 17:04:20 +0300 Subject: [PATCH 09/12] submodules adj --- noxis-cli/src/main.rs | 9 +++++++-- noxis-rs/src/main.rs | 2 -- noxis-rs/src/options/cli_pipeline.rs | 5 ++--- noxis-rs/src/options/config.rs | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/noxis-cli/src/main.rs b/noxis-cli/src/main.rs index 023c977..718ec2f 100644 --- a/noxis-cli/src/main.rs +++ b/noxis-cli/src/main.rs @@ -1,11 +1,16 @@ mod cli; +mod net; use clap::Parser; use cli::Cli; +use net::{create_tcp_stream, try_send}; +use anyhow::Result; -fn main() -> Result<(), std::io::Error>{ +#[tokio::main] +async fn main() -> Result<()>{ let cli = Cli::parse(); dbg!(&cli); - println!("{:?}", cli); + // println!("{:?}", cli); + try_send(create_tcp_stream().await, cli).await?; Ok(()) } diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 7ca43bc..556521e 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -13,8 +13,6 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use utils::*; - -#[allow(unused_imports)] use options::preboot::PrebootParams; #[tokio::main(flavor = "multi_thread")] diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index fbeb74d..b189a36 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -47,15 +47,14 @@ async fn process_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(stream.borrow_mut()); let mut rqst = buf_reader.lines(); + while let Ok(Some(line)) = rqst.next_line().await { if line.is_empty() { break; } println!("{}", line); } - // .map(|result| result.unwrap()) - // .take_while(|line| !line.is_empty()) - // .collect(); + let response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nContent-Type: text/plain\r\n\r\nHello, World!"; stream.write_all(response.as_bytes()).await.unwrap(); } diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index d3cb106..4d95aa1 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -322,7 +322,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc { -- 2.40.1 From 77a1e24a47734397338518acb923742d91b6c2dc Mon Sep 17 00:00:00 2001 From: prplV Date: Thu, 12 Dec 2024 17:55:41 +0300 Subject: [PATCH 10/12] break added --- noxis-cli/src/net.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/noxis-cli/src/net.rs b/noxis-cli/src/net.rs index d607c8c..1c2e470 100644 --- a/noxis-cli/src/net.rs +++ b/noxis-cli/src/net.rs @@ -21,6 +21,7 @@ pub async fn try_send(stream: Result, params: Cli) -> Result<()> { // let msg = r"HTTP/1.1 POST\r\nContent-Length: 14\r\nContent-Type: text/plain\r\n\r\nHello, World!@"; stream.write_all(msg.as_bytes()).await?; // ... + break; } Ok(()) } \ No newline at end of file -- 2.40.1 From 2dbfb4a93afc9910856348ec2446a5d636584c17 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 16 Dec 2024 11:34:09 +0300 Subject: [PATCH 11/12] preboot fix work with params --- noxis-rs/src/options/preboot.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 7c9f687..02e013f 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -103,11 +103,11 @@ pub struct PrebootParams { impl PrebootParams { pub fn validate(self) -> Result { - if !self.socket_path.exists() { + if !self.socket_path.exists() && !self.no_hostagent { return Err(Error::msg("Socket-file not found or Noxis can't read it. Cannot start")); } // existing log dir - if !self.log_to.exists() { + if !self.log_to.exists() && !self.no_logs { return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start")); } // existing sock file -- 2.40.1 From 56a20eb65c7c2b0163b0324e49ef3be912e1f1e4 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 16 Dec 2024 11:44:34 +0300 Subject: [PATCH 12/12] preboot fix work with config params + refactor --- noxis-rs/src/options/config.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 4d95aa1..f7b5f26 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -7,10 +7,11 @@ use std::os::unix::process::CommandExt; use std::process::Command; use std::sync::Arc; use std::{env, fs}; -use tokio::time::Duration; +use std::fmt::format; use super::preboot::PrebootParams; +use tokio::time::{Duration, sleep}; -const CONFIG_PATH: &str = "settings.json"; +// const CONFIG_PATH: &str = "settings.json"; /// # Fn `load_processes` /// ## for reading and parsing *local* storing config @@ -70,7 +71,7 @@ pub async fn get_actual_config(params : Arc) -> Option } ConfigActuality::Remote => { info!("Pulled config is more actual. Saving changes!"); - if save_new_config(&remote_conf, CONFIG_PATH).is_err() { + if save_new_config(&remote_conf, config_path).is_err() { error!("Saving changes process failed due to unexpected error...") } Some(remote_conf) @@ -83,11 +84,10 @@ pub async fn get_actual_config(params : Arc) -> Option None => { warn!("No local valid conf was found. Trying to pull remote one..."); if !params.no_remote_config { - let mut conn = get_connection_watcher(&open_watcher("redis://localhost/")); - let remote_config = get_remote_conf_watcher(&mut conn).await; - if let Some(conf) = remote_config { + let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url))); + if let Some(conf) = get_remote_conf_watcher(&mut conn).await { info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); - let _ = save_new_config(&conf, CONFIG_PATH); + let _ = save_new_config(&conf, config_path); return Some(conf); } } @@ -189,23 +189,23 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { if remote.is_none() { error!("Pulled config is invalid. Check it in Redis Server"); } - return remote; + remote }, Err(_) => { error!("Cannot extract payload from new message. Check Redis Server state"); - return None; + None }, } }, Err(_) => { warn!("Cannot get config from Redis Server. Empty channel"); - return None; + None }, } }, Err(_) => { error!("Redis subscription process failed. Check Redis configuration!"); - return None; + None } } } @@ -319,6 +319,8 @@ fn restart_main_thread() -> std::io::Result<()> { /// *depends on* : `Processes` /// pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { + let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); + if params.no_sub || params.no_remote_config { return Err(CustomError::Fatal); } @@ -340,8 +342,8 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc { warn!("Pulled config is actual. Saving and restarting..."); - if save_new_config(&remote_config, CONFIG_PATH).is_err() { - error!("Error with saving new config to {}. Stopping sub mechanism...", &CONFIG_PATH); + if save_new_config(&remote_config, config_path).is_err() { + error!("Error with saving new config to {}. Stopping sub mechanism...", config_path); return Err(CustomError::Fatal); } if restart_main_thread().is_err() { @@ -362,7 +364,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc Result<(), CustomEr Err(_) => Err(CustomError::Fatal), } } - Err(_) => return Err(CustomError::Fatal), + Err(_) => Err(CustomError::Fatal), } } Err(_) => Err(CustomError::Fatal), -- 2.40.1