diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index f830fce..f5fab28 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -1,5 +1,5 @@ { - "dateOfCreation": "1721381809104", + "dateOfCreation": "1721381809106", "configServer": "localhost", "processes": [ { @@ -30,4 +30,4 @@ } } ] -} +} \ No newline at end of file diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index b55ff33..44cebcd 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -21,14 +21,14 @@ use inotify::EventMask; // const CONFIG_PATH: &str = "settings.json"; pub mod v2 { - use std::{fmt::format, path::PathBuf}; + use std::path::PathBuf; use crate::utils::get_container_id; use super::*; pub async fn init_config_mechanism( // to handle cli config changes - _cli_oneshot: OneShotReciever, + cli_oneshot: OneShotReciever, // to share local config with PRCS, CLI_PIPELINE and CONFIG modules brd_tx : BroadcastSender, // preboot params (args) @@ -36,20 +36,53 @@ pub mod v2 { /*...*/ ) { // channel for pubsub to handle local config pulling - let _local_config_brd_reciever = brd_tx.subscribe(); + let local_config_brd_reciever = brd_tx.subscribe(); // channel between pub-sub mech and local config mech let (tx_pb_lc, rx_pb_lc) = oneshot::channel::(); // channel between cli mech and local config mech let (tx_cli_lc, rx_cli_lc) = oneshot::channel::(); + // dbg!("before lc"); + let params_clone = params.clone(); + // future to init work with local config - let lc_future = local_config_reciever( - params, - rx_pb_lc, - rx_cli_lc, - Arc::new(brd_tx) - ); - // TODO! futures + select! + let lc_future = tokio::spawn(async move { + // let params = params.clone(); + let _ = local_config_reciever( + params_clone, + rx_pb_lc, + rx_cli_lc, + Arc::new(brd_tx) + ).await; + }); + // dbg!("before pb"); + // future to init work with pub sub mechanism + let pubsub_future = tokio::spawn(async move { + let _ = pubsub_config_reciever( + tx_pb_lc, + params.clone(), + local_config_brd_reciever + ).await; + }); + + // dbg!("before cli"); + // future to catch new configs from cli pipeline + let cli_future = tokio::spawn(async move { + from_cli_config_reciever( + cli_oneshot, + tx_cli_lc + ).await; + + }); + // let _ = lc_future.await; + // dbg!("before select"); + tokio::select! { + lc_result = lc_future => {dbg!("end of lc");}, + pb_result = pubsub_future => {dbg!("end of pb");}, + cli_config_option = cli_future => {dbg!("end of cli");}, + } + // dbg!("after select"); + // TODO! futures + select! [OK] // TODO! tests config } pub async fn get_redis_connection(params: Arc) -> Option { @@ -78,18 +111,22 @@ pub mod v2 { tx_brd_local : BroadcastReceiver, ) -> anyhow::Result<()>{ /*...*/ + // dbg!("start of pb"); let mut tx_brd_local = tx_brd_local; let mut _local_config = Processes::default(); return match get_redis_connection(params.clone()).await { Some(mut conn) => { // let mut pub_sub = conn.as_pubsub(); - match pub_sub.subscribe(get_container_id().unwrap_or(String::from("default"))) { + let channel_name = get_container_id().unwrap_or(String::from("default")); + let channel_name = channel_name.trim(); + match pub_sub.subscribe(channel_name) { Err(er) => { error!("Cannot subscribe pubsub channel due to {}", &er); Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) }, Ok(_) => { + info!("Successfully subscribed to {} pubsub channel", channel_name); loop { // brd check // if let Ok(new_lc) = tx_brd_local.recv().await { @@ -108,6 +145,7 @@ pub mod v2 { } // pubsub check if let Ok(msg) = pub_sub.get_message() { + // dbg!("ok on get message"); let payload : Result = msg.get_payload(); match payload { Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), @@ -115,7 +153,7 @@ pub mod v2 { if let Some(remote) = parse_extern_config(&payload) { match config_comparing(&_local_config, &remote) { ConfigActuality::Local => { - warn!("Pulled new config from Redis channel. Current config is more actual ..."); + warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); }, ConfigActuality::Remote => { info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); @@ -140,6 +178,7 @@ pub mod v2 { } } // delay + // dbg!("before sleep pubsub"); sleep(Duration::from_millis(500)).await; } }, @@ -158,7 +197,6 @@ pub mod v2 { /*...*/ ) -> anyhow::Result<()> { /*...*/ - // borrowing as mut let mut pubsub_oneshot = pubsub_oneshot; let mut cli_oneshot = cli_oneshot; @@ -193,6 +231,7 @@ pub mod v2 { Ok(mut watcher) => { loop { let mut need_to_export_config = false; + // let mut need_to_recreate_watcher = false; // return situations here // 1) oneshot signal // 2) if config was deleted -> recreate and fill with current config that is held here @@ -221,6 +260,7 @@ pub mod v2 { if !params.config.exists() { warn!("Local config file was deleted or moved. Recreating new one with saved data ..."); need_to_export_config = true; + // need_to_recreate_watcher = true; } else { // changes check let mut buffer = [0; 128]; @@ -236,6 +276,10 @@ pub mod v2 { if !events.is_empty() { warn!("Local config file was overwritten. Discarding changes ..."); need_to_export_config = true; + // events + // .iter() + // .any(|event| *event == EventMask::DELETE_SELF) + // .then(|| need_to_recreate_watcher = true); } } } @@ -253,7 +297,7 @@ pub mod v2 { } } } - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(300)).await; } }, Err(_) => { @@ -270,6 +314,7 @@ pub mod v2 { to_local_tx: OneShotSender ) -> Option { /* match awaits til channel*/ + dbg!("start of cli"); match cli_oneshot.await { Ok(config_from_cli) => { info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);