From 3d88967281615c3ece0979fd415565d1acb5048a Mon Sep 17 00:00:00 2001 From: prplV Date: Wed, 5 Feb 2025 17:38:41 +0300 Subject: [PATCH] pubsub_config_reciever + cli-local adj --- noxis-rs/src/main.rs | 22 +++++- noxis-rs/src/options/config.rs | 136 ++++++++++++++++++++++++++++----- 2 files changed, 137 insertions(+), 21 deletions(-) diff --git a/noxis-rs/src/main.rs b/noxis-rs/src/main.rs index f29d2c0..232d37f 100644 --- a/noxis-rs/src/main.rs +++ b/noxis-rs/src/main.rs @@ -14,6 +14,8 @@ use std::time::Duration; use tokio::sync::mpsc; use utils::*; use options::preboot::PrebootParams; +use tokio::sync::{broadcast, oneshot}; +use options::config::v2::init_config_mechanism; #[tokio::main(flavor = "multi_thread")] async fn main() -> anyhow::Result<()>{ @@ -21,7 +23,25 @@ async fn main() -> anyhow::Result<()>{ let _ = setup_logger(); - info!("Runner is configurating..."); + info!("Noxis is configurating..."); + + let (tx_brd, mut _rx_brd) = broadcast::channel::(1); + let (_tx_oneshot, rx_oneshot) = oneshot::channel::(); + let mut handler: Vec> = vec![]; + + let config_module = tokio::spawn(async move { + let _ = init_config_mechanism( + rx_oneshot, + tx_brd, + preboot.clone() + ).await; + }); + + handler.push(config_module); + + for i in handler { + let _ = i.await; + } // setting up redis connection \ // then conf checks to choose the most actual \ diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 3deeb80..95f59ee 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -12,7 +12,7 @@ use tokio::time::{Duration, sleep}; // use redis::PubSub; use tokio::sync::{ oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender }, - broadcast::Sender as BroadcastSender }; + broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver }; use crate::utils::files::create_watcher; use std::fs::File; use inotify::EventMask; @@ -20,18 +20,22 @@ use inotify::EventMask; // const CONFIG_PATH: &str = "settings.json"; pub mod v2 { - use core::error; - use std::path::PathBuf; + use std::{fmt::format, path::PathBuf}; + use crate::utils::get_container_id; use super::*; pub async fn init_config_mechanism( // to handle cli config changes _cli_oneshot: OneShotReciever, - // to share local config with PRCS and CLI_PIPELINE modules - _brd_tx : Arc> + // to share local config with PRCS, CLI_PIPELINE and CONFIG modules + brd_tx : BroadcastSender, + // preboot params (args) + params : Arc /*...*/ ) { + // channel for pubsub to handle local config pulling + let _local_config_brd_reciever = brd_tx.subscribe(); /* local + pubsub + cli oneshot check */ } pub async fn get_redis_connection(params: Arc) -> Option { @@ -42,6 +46,7 @@ pub mod v2 { loop { if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { if let Ok(conn) = client.get_connection() { + info!("Successfully opened Redis connection"); return Some(conn); } } @@ -53,27 +58,98 @@ pub mod v2 { // loop checking redis pubsub async fn pubsub_config_reciever( - // to subscribe redis pubsub channel and recieve configs - _redis_connection : Option, // to stop checking local config - _local_conf_tx: Arc> - ) { + local_conf_tx : OneShotSender, + params : Arc, + tx_brd_local : BroadcastReceiver, + ) -> anyhow::Result<()>{ /*...*/ + let mut tx_brd_local = tx_brd_local; + let mut _local_config = Processes::default(); + return match get_redis_connection(params.clone()).await { + Some(mut conn) => { + // + let mut pub_sub = conn.as_pubsub(); + match pub_sub.subscribe(get_container_id().unwrap_or(String::from("default"))) { + Err(er) => { + error!("Cannot subscribe pubsub channel due to {}", &er); + Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) + }, + Ok(_) => { + loop { + // brd check + // if let Ok(new_lc) = tx_brd_local.recv().await { + + // } + if !tx_brd_local.is_empty() { + match tx_brd_local.recv().await { + Ok(lc) => _local_config = lc, + Err(er) => { + error!("Cannot get imported local config due to {}", &er); + return Err(anyhow::Error::msg( + format!("Cannot get imported local config due to {}", er)) + ) + } + } + } + // pubsub check + if let Ok(msg) = pub_sub.get_message() { + let payload : Result = msg.get_payload(); + match payload { + Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), + Ok(payload) => { + if let Some(remote) = parse_extern_config(&payload) { + match config_comparing(&_local_config, &remote) { + ConfigActuality::Local => { + warn!("Pulled new config from Redis channel. Current config is more actual ..."); + }, + ConfigActuality::Remote => { + info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); + // to stop watching local config file mechanism + let _ = local_conf_tx.send(true); + let config_path = params.config.to_str().unwrap_or("settings.json"); + + if save_new_config(&remote, &config_path).is_err() { + error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); + return Err(anyhow::Error::msg( + format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) + )) + } + return Ok(()); + }, + } + } + else { + warn!("Invalid config was pulled from Redis channel") + } + }, + } + } + // delay + sleep(Duration::from_millis(500)).await; + } + }, + } + }, + None => Err(anyhow::Error::msg("Cannot create Redis connection")) + } } // async fn local_config_reciever( params : Arc, pubsub_oneshot : OneShotReciever, + cli_oneshot : OneShotReciever, brd_tx : Arc>, /*...*/ - ) { + ) -> anyhow::Result<()> { /*...*/ // borrowing as mut let mut pubsub_oneshot = pubsub_oneshot; + let mut cli_oneshot = cli_oneshot; // fill with default empty config, mut to change later - let mut current_config = Processes::default(); + let mut _current_config = Processes::default(); // PathBuf to &str to work with local config path as slice let local_config_path = params .config @@ -84,15 +160,15 @@ pub mod v2 { // if local exists Some(conf) => { info!("Local config `{}` was found.", &conf.date_of_creation); - current_config = conf; - if let Err(er) = brd_tx.send(current_config.clone()) { + _current_config = conf; + if let Err(er) = brd_tx.send(_current_config.clone()) { error!("Cannot share local config with broadcast due to {}", er); } }, // if local is not exist None => { warn!("Local config wasn't found. Waiting for new ..."); - return; + return Err(anyhow::Error::msg("No local config")); // ... }, } @@ -112,7 +188,16 @@ pub mod v2 { // it's because pubsub mech pulled new valid and actual config and now it's time to ... // ... overwrite local config file and restart main thread if let Ok(_) = pubsub_oneshot.try_recv() { - return; + sleep(Duration::from_secs(1)).await; + return Ok(()); + } + + // catching signal from cli + // it's because cli mech pulled new valid and actual config and now it's time to ... + // ... overwrite local config file and restart main thread (like in previous mechanism) + if let Ok(_) = cli_oneshot.try_recv() { + sleep(Duration::from_secs(1)).await; + return Ok(()); } // ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ... @@ -142,10 +227,10 @@ pub mod v2 { } // exporting data if need_to_export_config { - if let Err(er) = export_saved_config_data_locally(¶ms.config, ¤t_config).await { + if let Err(er) = export_saved_config_data_locally(¶ms.config, &_current_config).await { error!("Cannot save actual imported config due to {}", er); } else { - // recreation watcher + // recreation watcher (draining activity buffer mechanism) // if local config file was deleted and recreated // if local config file was modified locally match create_watcher("", local_config_path).await { @@ -159,16 +244,24 @@ pub mod v2 { }, Err(_) => { error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path); + return Err(anyhow::Error::msg("Cannot create watcher on local config file")); }, } } // [:IN-TEST] - async fn from_cli_config_reciever(cli_oneshot: OneShotReciever) -> Option { + async fn from_cli_config_reciever( + cli_oneshot: OneShotReciever, + to_local_tx: OneShotSender + ) -> Option { /* match awaits til channel*/ match cli_oneshot.await { - Ok(config_from_cli) => return Some(config_from_cli), + Ok(config_from_cli) => { + info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); + let _ = to_local_tx.send(true); + Some(config_from_cli) + }, _ => None, } } @@ -178,7 +271,7 @@ pub mod v2 { current_config: &Processes ) -> anyhow::Result<()> { - let mut file = File::create_new(config_file_path)?; + let mut file = File::create(config_file_path)?; Ok( file.write_all( serde_json::to_string_pretty(current_config)?.as_bytes() @@ -574,6 +667,9 @@ pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc ConfigActuality { + if local.is_default() { + return ConfigActuality::Remote; + } let local_date: u64 = local.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap();