diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 68a179d..3f1c6fd 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -1,28 +1,20 @@ mod options; mod utils; -use clap::Parser; use log::{error, info}; -use options::config::*; use options::logger::setup_logger; use options::signals::set_valid_destructor; use options::structs::Processes; use options::cli_pipeline::init_cli_pipeline; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc; -use utils::*; use options::preboot::PrebootParams; use tokio::sync::{broadcast, oneshot}; use options::config::v2::init_config_mechanism; use utils::v2::init_monitoring; -use metrics::init_metrics_grubber; #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()>{ - // init_metrics_grubber().await; - // todo!(); - let preboot = Arc::new(PrebootParams::validate()); let _ = setup_logger(); @@ -59,7 +51,11 @@ async fn main() -> anyhow::Result<()>{ } } }; - if let Err(er) = init_cli_pipeline(preboot_cli, Arc::new(config)).await { + if let Err(er) = init_cli_pipeline( + preboot_cli, + Arc::new(config), + tx_oneshot + ).await { error!("CLI pipeline failed due to {}", er) } }); diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index f1fbf7b..f711ac9 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -1,5 +1,6 @@ use log::{error, info, warn}; use tokio::net::{ UnixStream, UnixListener }; +use tokio::sync::{Mutex, OnceCell}; use tokio::time::{sleep, Duration}; use std::fs; use std::sync::Arc; @@ -9,6 +10,9 @@ use super::structs::Processes; use super::preboot::PrebootParams; +type ConfigGateway = tokio::sync::oneshot::Sender; +type ProcessedConfigGateway = Arc>>; + /// # Fn `init_cli_pipeline` /// ## for catching all input requests from CLI /// @@ -22,10 +26,20 @@ use super::preboot::PrebootParams; /// /// *depends on* : - /// -pub async fn init_cli_pipeline(params: Arc, config : Arc) -> anyhow::Result<()> { +pub async fn init_cli_pipeline( + params: Arc, + config : Arc, + config_gateway : ConfigGateway, +) -> anyhow::Result<()> { let socket_path = ¶ms.self_socket; let _ = fs::remove_file(socket_path); + let config_gateway = Arc::new( + Mutex::new( + OnceCell::new_with(Some(config_gateway)) + ) + ); + match UnixListener::bind(socket_path) { Ok(list) => { // TODO: remove `unwrap`s @@ -34,7 +48,13 @@ pub async fn init_cli_pipeline(params: Arc, config : Arc { - process_connection(socket, params.clone(), config.clone()).await; + // ??? maybe errors on async work with data transfering between modules + let params = params.clone(); + let config = config.clone(); + let config_gateway = config_gateway.clone(); + tokio::spawn(async move { + process_connection(socket, params.clone(), config.clone(), config_gateway.clone()).await; + }); }, Err(er) => { error!("Cannot poll connection to CLI due to {}", er); @@ -64,7 +84,7 @@ pub async fn init_cli_pipeline(params: Arc, config : Arc, config : Arc) { +async fn process_connection(mut stream: UnixStream, params: Arc, config : Arc, cfg_gateway : ProcessedConfigGateway) { let mut buf = vec![0; 1024]; match stream.read(&mut buf).await { Ok(0) => { @@ -76,15 +96,18 @@ async fn process_connection(mut stream: UnixStream, params: Arc, match serde_json::from_slice::(&buf) { Ok(cli) => { info!("Received CLI request: {:?}", cli); - match process_cli_cmd(cli, params.clone(), config).await { + let response = match process_cli_cmd(cli, params.clone(), config, cfg_gateway.clone()).await { Ok(response) => { - if let Err(e) = stream.write_all(response.as_bytes()).await { - error!("Failed to send response: {}", e); - } + response }, Err(er) => { - error!("Can't send response from cli_pipeline: {}", er); + let error_msg = format!("Error: {}", er); + error!("{}", &error_msg); + error_msg }, + }; + if let Err(e) = stream.write_all(response.as_bytes()).await { + error!("Failed to send response: {}", e); } } Err(e) => { @@ -98,7 +121,7 @@ async fn process_connection(mut stream: UnixStream, params: Arc, } -async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Arc) -> anyhow::Result { +async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Arc, cfg_gateway: ProcessedConfigGateway) -> anyhow::Result { use noxis_cli::{Commands, ConfigAction}; return match cli.command { Commands::Config(config) => { @@ -111,8 +134,40 @@ async fn process_cli_cmd(cli : Cli, params: Arc, global_config : Ok(serde_json::to_string_pretty(global_config.as_ref())?) } }, + ConfigAction::Reset => { + Err(anyhow::Error::msg("It's temporarly forbidden to reset current config using CLI-util")) + }, + ConfigAction::Local(cfg) => { + if cfg.is_json { + /* */ + let new_config = serde_json::from_str::(&cfg.config)?; + let new_version = new_config.get_version().to_string(); + + use super::{config::config_comparing, structs::ConfigActuality}; + + return match config_comparing(&global_config, &new_config) { + ConfigActuality::Remote => { + let cfg_gateway = cfg_gateway.clone(); + tokio::spawn(async move { + let mut lock = cfg_gateway.lock().await; + match lock.take() { + Some(channel) => { + let _ = channel.send(new_config); + }, + None => error!("Cannot update confif due to channel sender loss"), + } + }); + Ok(format!("Ok. Saving and reloading with version {}", new_version)) + }, + _ => Err(anyhow::Error::msg(format!("Local config (version: {}) is more actual", global_config.get_version()))), + } + } else { + Err(anyhow::Error::msg("It's temporarly forbidden to set config in non-json mode")) + } + }, + ConfigAction::Remote => {Ok(params.remote_server_url.clone())}, /* */ - _ => Ok(String::from("Ok")) + _ => Err(anyhow::Error::msg("Unrecognized command from CLI")) } }, /* */ diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 00e3e73..11350cc 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -780,7 +780,7 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc ConfigActuality { +pub fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { if local.is_default() { return ConfigActuality::Remote; } diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 3423d25..51855d5 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -173,6 +173,9 @@ impl Processes { pub fn is_default(&self) -> bool { self.date_of_creation.is_empty() } + pub fn get_version(&self) -> &str { + &self.date_of_creation + } } /// # Struct for the 2nd level in json conf file