diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index dd53611..3deeb80 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -1,6 +1,6 @@ use super::structs::*; use log::{error, info, warn}; -use redis::{Client, Connection, PubSub}; +use redis::{Client, Connection}; use std::fs::OpenOptions; use std::io::Write; use std::os::unix::process::CommandExt; @@ -10,12 +10,19 @@ use std::{env, fs}; use super::preboot::PrebootParams; use tokio::time::{Duration, sleep}; // use redis::PubSub; -use tokio::sync::oneshot::Receiver as OneShotReciever; -use tokio::sync::broadcast::Sender as BroadcastSender; +use tokio::sync::{ + oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender }, + broadcast::Sender as BroadcastSender }; +use crate::utils::files::create_watcher; +use std::fs::File; +use inotify::EventMask; // const CONFIG_PATH: &str = "settings.json"; pub mod v2 { + use core::error; + use std::path::PathBuf; + use super::*; pub async fn init_config_mechanism( @@ -34,62 +41,151 @@ pub mod v2 { let mut connection_delay: u64 = 1; loop { if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { - if let Ok(mut conn) = client.get_connection() { - match crate::utils::get_container_id() { - Some(channel_name) => { - // let channel_name = channel_name.trim(); - let mut pubsub = conn.as_pubsub(); - if pubsub.subscribe(&channel_name.trim()).is_ok() { - - todo!() - - } else { - error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name); - } - }, - None => { - error!("Cannot get channel name"); - } - } + if let Ok(conn) = client.get_connection() { + return Some(conn); } } error!("Error with subscribing Redis stream on update. Retrying in {} secs...", connection_delay); sleep(Duration::from_secs(connection_delay)).await; connection_delay *= 2; } + } - None + // loop checking redis pubsub + async fn pubsub_config_reciever( + // to subscribe redis pubsub channel and recieve configs + _redis_connection : Option, + // to stop checking local config + _local_conf_tx: Arc> + ) { + /*...*/ } // - pub async fn local_config_reciever( + async fn local_config_reciever( params : Arc, pubsub_oneshot : OneShotReciever, brd_tx : Arc>, /*...*/ ) { - /*...*/ - - // {:1} if local config is not exist -> cannot create watcher -> None - // {:2} if local config exists -> load_processes - // | - // | [Ok(Processes)] - // -> 1) broadcast sending parsed config to PRCS and CLI_PIPELINE - // 2) watcher in loop to deny local changes - // | - // | Err(_) - // -> - // ???? + /*...*/ + + // borrowing as mut + let mut pubsub_oneshot = pubsub_oneshot; + // fill with default empty config, mut to change later + let mut current_config = Processes::default(); + // PathBuf to &str to work with local config path as slice + let local_config_path = params + .config + .to_str() + .unwrap_or("settings.json"); + + match load_processes(local_config_path) { + // if local exists + Some(conf) => { + info!("Local config `{}` was found.", &conf.date_of_creation); + current_config = conf; + if let Err(er) = brd_tx.send(current_config.clone()) { + error!("Cannot share local config with broadcast due to {}", er); + } + }, + // if local is not exist + None => { + warn!("Local config wasn't found. Waiting for new ..."); + return; + // ... + }, + } + + // 100% local exists here + // create watcher on local config file + match create_watcher("", local_config_path).await { + Ok(mut watcher) => { + loop { + let mut need_to_export_config = false; + // return situations here + // 1) oneshot signal + // 2) if config was deleted -> recreate and fill with current config that is held here + // 3) if config was changed -> fill with current config that is held here + + // catching signal from pubsub + // it's because pubsub mech pulled new valid and actual config and now it's time to ... + // ... overwrite local config file and restart main thread + if let Ok(_) = pubsub_oneshot.try_recv() { + return; + } + + // ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ... + // ! ... FROM WATCHER"S BUFFER + + // existing check + if !params.config.exists() { + warn!("Local config file was deleted or moved. Recreating new one with saved data ..."); + need_to_export_config = true; + } else { + // changes check + let mut buffer = [0; 128]; + let events = watcher.read_events(&mut buffer); + if events.is_ok() { + let events: Vec = events + .unwrap() + .map(|mask| mask.mask) + .filter(|mask| { + *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF + }) + .collect(); + if !events.is_empty() { + warn!("Local config file was overwritten. Discarding changes ..."); + need_to_export_config = true; + } + } + } + // exporting data + if need_to_export_config { + if let Err(er) = export_saved_config_data_locally(¶ms.config, ¤t_config).await { + error!("Cannot save actual imported config due to {}", er); + } else { + // recreation watcher + // if local config file was deleted and recreated + // if local config file was modified locally + match create_watcher("", local_config_path).await { + Ok(new) => watcher = new, + Err(er) => error!("Cannot create new watcher due to {}", er), + } + } + } + sleep(Duration::from_millis(500)).await; + } + }, + Err(_) => { + error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path); + }, + } + } // [:IN-TEST] - pub async fn cli_config_reciever(cli_oneshot: OneShotReciever) -> Option { + async fn from_cli_config_reciever(cli_oneshot: OneShotReciever) -> Option { /* match awaits til channel*/ match cli_oneshot.await { Ok(config_from_cli) => return Some(config_from_cli), _ => None, } } + + async fn export_saved_config_data_locally( + config_file_path: &PathBuf, + current_config: &Processes + ) -> anyhow::Result<()> { + + let mut file = File::create_new(config_file_path)?; + Ok( + file.write_all( + serde_json::to_string_pretty(current_config)?.as_bytes() + )? + ) + // Ok(()) + } }