diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index 29886c8..f29d2c0 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -25,73 +25,73 @@ async fn main() -> anyhow::Result<()>{ // setting up redis connection \ // then conf checks to choose the most actual \ - let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { - error!("No actual configuration for runner. Stopping..."); - std::process::exit(1); - }); - - info!( - "Current runner configuration: {}", - &processes.date_of_creation - ); - info!("Runner is ready. Initializing..."); - - if processes.processes.is_empty() { - error!("Processes list is null, runner-rs initialization is stopped"); - return Err(Error::msg("Empty processes segment in config")); - } - let mut handler: Vec> = vec![]; - // is in need to send to the signals handler thread - let mut senders: Vec>> = vec![]; - - for proc in processes.processes.iter() { - info!( - "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", - proc.name, - proc.path, - proc.dependencies.files.len(), - proc.dependencies.services.len() - ); - - // creating msg channel - // can or should be executed in new thread - let (tx, mut rx) = mpsc::channel::(1); - let proc = Arc::new(proc.clone()); - let tx = Arc::new(tx.clone()); - - senders.push(Arc::clone(&tx.clone())); - - let event = tokio::spawn(async move { - run_daemons(proc.clone(), tx.clone(), &mut rx).await; - }); - handler.push(event); - } - - // destructor addition - handler.push(tokio::spawn(async move { - if set_valid_destructor(Arc::new(senders)).await.is_err() { - error!("Linux signals handler creation failed. Terminating main thread..."); - return; - } - - tokio::time::sleep(Duration::from_millis(200)).await; - info!("End of job. Terminating main thread..."); - std::process::exit(0); - })); - - // remote config update subscription - handler.push(tokio::spawn(async move { - let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; - })); - - // cli pipeline - handler.push(tokio::spawn(async move { - let _ = init_cli_pipeline().await; - })); - - for i in handler { - let _ = i.await; - } + // let processes: Processes = get_actual_config(preboot.clone()).await.unwrap_or_else(|| { + // error!("No actual configuration for runner. Stopping..."); + // std::process::exit(1); + // }); + // + // info!( + // "Current runner configuration: {}", + // &processes.date_of_creation + // ); + // info!("Runner is ready. Initializing..."); + // + // if processes.processes.is_empty() { + // error!("Processes list is null, runner-rs initialization is stopped"); + // return Err(Error::msg("Empty processes segment in config")); + // } + // let mut handler: Vec> = vec![]; + // // is in need to send to the signals handler thread + // let mut senders: Vec>> = vec![]; + // + // for proc in processes.processes.iter() { + // info!( + // "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", + // proc.name, + // proc.path, + // proc.dependencies.files.len(), + // proc.dependencies.services.len() + // ); + // + // // creating msg channel + // // can or should be executed in new thread + // let (tx, mut rx) = mpsc::channel::(1); + // let proc = Arc::new(proc.clone()); + // let tx = Arc::new(tx.clone()); + // + // senders.push(Arc::clone(&tx.clone())); + // + // let event = tokio::spawn(async move { + // run_daemons(proc.clone(), tx.clone(), &mut rx).await; + // }); + // handler.push(event); + // } + // + // // destructor addition + // handler.push(tokio::spawn(async move { + // if set_valid_destructor(Arc::new(senders)).await.is_err() { + // error!("Linux signals handler creation failed. Terminating main thread..."); + // return; + // } + // + // tokio::time::sleep(Duration::from_millis(200)).await; + // info!("End of job. Terminating main thread..."); + // std::process::exit(0); + // })); + // + // // remote config update subscription + // handler.push(tokio::spawn(async move { + // let _ = subscribe_config_stream(Arc::new(processes), preboot.clone()).await; + // })); + // + // // cli pipeline + // handler.push(tokio::spawn(async move { + // let _ = init_cli_pipeline().await; + // })); + // + // for i in handler { + // let _ = i.await; + // } Ok(()) } diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index d9451b9..1439ae7 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -1,6 +1,6 @@ use super::structs::*; use log::{error, info, warn}; -use redis::{Client, Connection}; +use redis::{Client, Connection, PubSub}; use std::fs::OpenOptions; use std::io::Write; use std::os::unix::process::CommandExt; @@ -9,9 +9,47 @@ use std::sync::Arc; use std::{env, fs}; use super::preboot::PrebootParams; use tokio::time::{Duration, sleep}; +// use redis::PubSub; +use tokio::sync::oneshot::Receiver; // const CONFIG_PATH: &str = "settings.json"; +pub mod v2 { + use super::*; + pub async fn init_config_mechanism(cli_oneshot: Arc> /*...*/) { /* local + pubsub + cli oneshot check */ } + pub async fn get_pubsub<'a>(params: Arc) -> Option> { + let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); + + if params.no_sub || params.no_sub { + return None; + } + if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { + if let Ok(mut conn) = client.get_connection() { + match crate::utils::get_container_id() { + Some(channel_name) => { + let channel_name = channel_name.trim(); + let mut pubsub = conn.as_pubsub(); + if pubsub.subscribe(&channel_name).is_ok() { + + } else { + error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name); + } + }, + None => { + error!("Cannot get channel name"); + } + } + } + } + error!("Error with subscribing Redis stream on update. Working only with selected config..."); + None + } + pub async fn get_local_config_watcher(/*...*/) { /*...*/ } + // + pub async fn cli_config_reciever(cli_oneshot: Arc>) { /*...*/ } +} + + /// # Fn `load_processes` /// ## for reading and parsing *local* storing config /// @@ -54,14 +92,14 @@ pub async fn get_actual_config(params : Arc) -> Option error!("Invalid character in config file. Config path was set to default"); "settings.json" }); - info!("Configurating config module with params: no-remote-config={}, no-sub={}, local config path={:?}, remote server={}", params.no_remote_config, params.no_sub, params.config, params.remote_server_url); + info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url); match load_processes(config_path) { Some(local_conf) => { info!( "Found local configuration, version - {}", &local_conf.date_of_creation ); - if !params.no_remote_config { + if !params.no_sub { if let Some(remote_conf) = // TODO : rework with pubsub mech once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url)) @@ -85,7 +123,7 @@ pub async fn get_actual_config(params : Arc) -> Option } None => { warn!("No local valid conf was found. Trying to pull remote one..."); - if !params.no_remote_config { + if !params.no_sub { let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url))); if let Some(conf) = get_remote_conf_watcher(&mut conn).await { info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); @@ -322,7 +360,7 @@ fn restart_main_thread() -> std::io::Result<()> { pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); - if params.no_sub || params.no_remote_config { + if params.no_sub || params.no_sub { return Err(CustomError::Fatal); } if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { diff --git a/noxis-rs/src/options/preboot.rs b/noxis-rs/src/options/preboot.rs index 8293f8a..d21cd57 100644 --- a/noxis-rs/src/options/preboot.rs +++ b/noxis-rs/src/options/preboot.rs @@ -13,7 +13,7 @@ enum EnvVars { NoxisNoHagent, NoxisNoLogs, NoxisRefreshLogs, - NoxisNoRemoteConfig, + // NoxisNoRemoteConfig, NoxisNoConfigSub, NoxisSocketPath, NoxisLogTo, @@ -29,7 +29,7 @@ impl std::fmt::Display for EnvVars { EnvVars::NoxisNoHagent => write!(f, "NOXIS_NO_HAGENT"), EnvVars::NoxisNoLogs => write!(f, "NOXIS_NO_LOGS"), EnvVars::NoxisRefreshLogs => write!(f, "NOXIS_REFRESH_LOGS"), - EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"), + // EnvVars::NoxisNoRemoteConfig => write!(f, "NOXIS_NO_REMOTE_CONFIG"), EnvVars::NoxisNoConfigSub => write!(f, "NOXIS_NO_CONFIG_SUB"), EnvVars::NoxisSocketPath => write!(f, "NOXIS_SOCKET_PATH"), EnvVars::NoxisLogTo => write!(f, "NOXIS_LOG_TO"), @@ -48,7 +48,7 @@ impl<'a> EnvVars { EnvVars::NoxisNoHagent => "false", EnvVars::NoxisNoLogs => "false", EnvVars::NoxisRefreshLogs => "false", - EnvVars::NoxisNoRemoteConfig => "false", + // EnvVars::NoxisNoRemoteConfig => "false", EnvVars::NoxisNoConfigSub => "false", EnvVars::NoxisSocketPath => "/var/run/enode/hostagent.sock", EnvVars::NoxisLogTo => "./", @@ -77,7 +77,7 @@ impl<'a> EnvVars { Self::NoxisNoHagent.process_env_var(&preboot.no_hostagent.to_string()); Self::NoxisNoLogs.process_env_var(&preboot.no_logs.to_string()); Self::NoxisRefreshLogs.process_env_var(&preboot.refresh_logs.to_string()); - Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string()); + // Self::NoxisNoRemoteConfig.process_env_var(&preboot.no_remote_config.to_string()); Self::NoxisNoConfigSub.process_env_var(&preboot.no_sub.to_string()); Self::NoxisSocketPath.process_env_var(preboot.socket_path.to_str().unwrap()); Self::NoxisLogTo.process_env_var(preboot.log_to.to_str().unwrap()); @@ -147,12 +147,6 @@ impl std::fmt::Display for MetricsPrebootParams { /// noxis-rs ... --refresh-logs ... /// ``` /// -/// `--no-remote-config` - to disable work with Redis as config producer -/// ### usage : -/// ``` bash -/// noxis-rs ... --no-remote-config ... -/// ``` -/// /// `--no-sub` - to disable Redis subscribtion mechanism /// ### usage : /// ``` bash @@ -212,17 +206,18 @@ pub struct PrebootParams { help="To clear logs directory" )] pub refresh_logs : bool, - #[arg( - long = "no-remote-config", - action, - help="To disable work with remote config server", - conflicts_with="no_sub")] - pub no_remote_config : bool, + // #[arg( + // long = "no-remote-config", + // action, + // help="To disable work with remote config server", + // conflicts_with="no_sub")] + // pub no_remote_config : bool, #[arg( long = "no-sub", action, - help="To disable subscription mechanism", - conflicts_with="no_remote_config")] + help="To disable Redis subscription mechanism", + )] + // conflicts_with="no_remote_config" pub no_sub : bool, // params (socket_path, log_to, remote_server_url, config) @@ -243,7 +238,7 @@ pub struct PrebootParams { #[arg( long = "remote-server-url", default_value="localhost", - conflicts_with="no_remote_config", + conflicts_with="no_sub", help = "To set url of remote config server using in remote config pulling mechanism" )] pub remote_server_url : String, @@ -288,15 +283,17 @@ impl PrebootParams { // existing log dir if !self.log_to.exists() && !self.no_logs { eprintln!("Error: Log-Dir not found or Noxis can't read it. LogDir was set to default"); + self.refresh_logs = false; self.log_to = PathBuf::from("./"); // return Err(Error::msg("Log Directory Not Found or Noxis can't read it. Cannot start")); } // existing sock file if !self.config.exists() { eprintln!("Error: Invalid character in config file. Config path was set to default"); - let config = PathBuf::from("/etc/settings.json"); - if !config.exists() && self.no_remote_config { - return Err(Error::msg("Noxis cannot run without config. Create local config or enable remote-config mechanism")); + // TODO : ??? wtf is going with 2 paths + let config = PathBuf::from("/etc/enode/noxis/settings.json"); + if !config.exists() && self.no_sub { + return Err(Error::msg("Noxis cannot run without config. Create local config or enable pubsub mechanism")); } self.config = PathBuf::from("settings.json"); // return Err(Error::msg("Local Config Not Found or Noxis can't read it. Cannot start")); @@ -353,20 +350,20 @@ mod preboot_unitests{ "runner-rs", "--no-sub", "--remote-server-url", "redis://127.0.0.1" - ]).is_ok()) - } - #[test] - fn parsing_config_invalid_args_noremote_nosub() { - assert!(PrebootParams::try_parse_from(vec![ - "runner-rs", - "--no-remote-config", "--no-sub" ]).is_err()) } + // #[test] + // fn parsing_config_invalid_args_noremote_nosub() { + // assert!(PrebootParams::try_parse_from(vec![ + // "runner-rs", + // "--no-remote-config", "--no-sub" + // ]).is_err()) + // } #[test] fn parsing_config_invalid_args_noremote_remoteurl() { assert!(PrebootParams::try_parse_from(vec![ "runner-rs", - "--no-remote-config", + "--no-sub", "--remote-server-url", "redis://127.0.0.1" ]).is_err()) } diff --git a/noxis-rs/src/options/structs.rs b/noxis-rs/src/options/structs.rs index 65c1a19..12c6019 100644 --- a/noxis-rs/src/options/structs.rs +++ b/noxis-rs/src/options/structs.rs @@ -40,6 +40,8 @@ pub struct Processes { pub processes: Vec, } + + /// # Struct for the 2nd level in json conf file /// ## for each process to contain info, such as name, path and dependencies ///