use super::structs::*; use log::{error, info, warn}; use redis::{Client, Connection}; use std::fs::OpenOptions; use std::io::Write; use std::os::unix::process::CommandExt; use std::process::Command; use std::sync::Arc; use std::{env, fs}; use super::preboot::PrebootParams; use tokio::time::{Duration, sleep}; // use redis::PubSub; use tokio::sync::{ oneshot, oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender }, broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver }; use crate::utils::files::create_watcher; use std::fs::File; use inotify::EventMask; // const CONFIG_PATH: &str = "settings.json"; pub mod v2 { use std::path::PathBuf; use crate::utils::get_container_id; use super::*; pub async fn init_config_mechanism( // to handle cli config changes cli_oneshot: OneShotReciever, // to share local config with PRCS, CLI_PIPELINE and CONFIG modules brd_tx : BroadcastSender, // preboot params (args) params : Arc /*...*/ ) { // channel for pubsub to handle local config pulling let local_config_brd_reciever = brd_tx.subscribe(); // channel between pub-sub mech and local config mech let (tx_pb_lc, rx_pb_lc) = oneshot::channel::(); // channel between cli mech and local config mech let (tx_cli_lc, rx_cli_lc) = oneshot::channel::(); // dbg!("before lc"); let params_clone = params.clone(); let for_lc_path = params.clone(); let lc_path = for_lc_path .config .to_str() .unwrap_or("settings.json"); // future to init work with local config let lc_future = tokio::spawn( // let params = params.clone(); local_config_reciever( params_clone, rx_pb_lc, rx_cli_lc, Arc::new(brd_tx) ) ); // dbg!("before pb"); // future to init work with pub sub mechanism let pubsub_future = tokio::spawn( pubsub_config_reciever( tx_pb_lc, params.clone(), local_config_brd_reciever ) ); // dbg!("before cli"); // future to catch new configs from cli pipeline let cli_future = tokio::spawn( from_cli_config_reciever( cli_oneshot, tx_cli_lc ) ); // let _ = lc_future.await; // dbg!("before select"); tokio::select! { lc_result = lc_future => { // dbg!("end of lc"); match lc_result { Ok(res) => { if res.is_ok() { info!("Local config warding mechanism stopped, waiting for others ..."); sleep(Duration::from_millis(500)).await; let _ = restart_main_thread(); } else { error!("Local config warding mechanism crushed, restarting ..."); let _ = restart_main_thread(); } }, Err(_) => { error!("Local config warding mechanism crushed, restarting ..."); let _ = restart_main_thread(); }, } }, pb_result = pubsub_future => { match pb_result { Ok(res) => { if res.is_ok() { info!("New config was saved locally, restarting ..."); } else { error!("Pubsub mechanism crushed, restarting ..."); } }, Err(_) => { error!("Pubsub mechanism crushed, restarting ..."); }, } let _ = restart_main_thread(); }, cli_config_option = cli_future => { match cli_config_option { Err(_) => error!("CLI pulling new config mechanism crushed, restarting ..."), Ok(option_config) => { match option_config { None => error!("CLI pulling new config mechanism crushed, restarting ..."), Some(config) => { info!("New config was pulled from CLI, saving and restarting ..."); let _ = save_new_config(&config, lc_path); }, } }, } let _ = restart_main_thread(); }, } // dbg!("after select"); // TODO! futures + select! [OK] // TODO! tests config } pub async fn get_redis_connection(params: &str) -> Option { for i in 1..=3 { let redis_url = format!("redis://{}/", params); info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i); if let Ok(client) = Client::open(redis_url) { if let Ok(conn) = client.get_connection() { info!("Successfully opened Redis connection"); return Some(conn); } } error!("Error with subscribing Redis stream on update. Retrying in 5 secs..."); sleep(Duration::from_secs(5)).await; } None } // loop checking redis pubsub async fn pubsub_config_reciever( // to stop checking local config local_conf_tx : OneShotSender, params : Arc, tx_brd_local : BroadcastReceiver, ) -> anyhow::Result<()>{ /*...*/ // dbg!("start of pb"); let mut tx_brd_local = tx_brd_local; let local_config = if !tx_brd_local.is_empty() { tx_brd_local.recv().await? } else { // Processes::default() let mut tick = tokio::time::interval(Duration::from_millis(500)); loop { tick.tick().await; break match tx_brd_local.recv().await { Ok(conf) => conf, Err(_) => continue, }; } }; match get_redis_connection(¶ms.remote_server_url).await { Some(mut conn) => { let mut pub_sub = conn.as_pubsub(); let channel_name = get_container_id().unwrap_or(String::from("default")); let channel_name = channel_name.trim(); match pub_sub.subscribe(channel_name) { Err(er) => { error!("Cannot subscribe pubsub channel due to {}", &er); return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) }, Ok(_) => { info!("Successfully subscribed to {} pubsub channel", channel_name); let _ = pub_sub.set_read_timeout(Some(Duration::from_secs(3))); loop { if let Ok(msg) = pub_sub.get_message() { // dbg!("ok on get message"); let payload : Result = msg.get_payload(); match payload { Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), Ok(payload) => { if let Some(remote) = parse_extern_config(&payload) { match config_comparing(&local_config, &remote) { ConfigActuality::Local => { warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); }, ConfigActuality::Remote => { info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); // to stop watching local config file mechanism let _ = local_conf_tx.send(true); let config_path = params.config.to_str().unwrap_or("settings.json"); if save_new_config(&remote, &config_path).is_err() { error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); return Err(anyhow::Error::msg( format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) )) } return Ok(()); }, } } else { warn!("Invalid config was pulled from Redis channel") } }, } } // delay tokio::task::yield_now().await; } }, } }, None => { sleep(Duration::from_secs(20)).await; } } Ok(()) } // async fn local_config_reciever( params : Arc, pubsub_oneshot : OneShotReciever, cli_oneshot : OneShotReciever, brd_tx : Arc>, /*...*/ ) -> anyhow::Result<()> { /*...*/ // shadowing as mut let mut pubsub_oneshot = pubsub_oneshot; let mut cli_oneshot = cli_oneshot; // fill with default empty config, mut to change later let mut _current_config = Processes::default(); // PathBuf to &str to work with local config path as slice let local_config_path = params .config .to_str() .unwrap_or("settings.json"); match load_processes(local_config_path) { // if local exists Some(conf) => { info!("Local config `{}` was found.", &conf.date_of_creation); _current_config = conf; if let Err(er) = brd_tx.send(_current_config.clone()) { error!("Cannot share local config with broadcast due to {}", er); } }, // if local is not exist None => { warn!("Local config wasn't found. Waiting for new ..."); return Err(anyhow::Error::msg("No local config")); // ... }, } // 100% local exists here // create watcher on local config file match create_watcher("", local_config_path) { Ok(mut watcher) => { loop { let mut need_to_export_config = false; // let mut need_to_recreate_watcher = false; // return situations here // 1) oneshot signal // 2) if config was deleted -> recreate and fill with current config that is held here // 3) if config was changed -> fill with current config that is held here // catching signal from pubsub // it's because pubsub mech pulled new valid and actual config and now it's time to ... // ... overwrite local config file and restart main thread if let Ok(_) = pubsub_oneshot.try_recv() { sleep(Duration::from_secs(1)).await; return Ok(()); } // catching signal from cli // it's because cli mech pulled new valid and actual config and now it's time to ... // ... overwrite local config file and restart main thread (like in previous mechanism) if let Ok(_) = cli_oneshot.try_recv() { sleep(Duration::from_secs(1)).await; return Ok(()); } // ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ... // ! ... FROM WATCHER"S BUFFER // existing check if !params.config.exists() { warn!("Local config file was deleted or moved. Recreating new one with saved data ..."); need_to_export_config = true; // need_to_recreate_watcher = true; } else { // changes check let mut buffer = [0; 128]; let events = watcher.read_events(&mut buffer); if events.is_ok() { let events: Vec = events .unwrap() .map(|mask| mask.mask) .filter(|mask| { *mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF }) .collect(); if !events.is_empty() { warn!("Local config file was overwritten. Discarding changes ..."); need_to_export_config = true; // events // .iter() // .any(|event| *event == EventMask::DELETE_SELF) // .then(|| need_to_recreate_watcher = true); } } } // exporting data if need_to_export_config { if let Err(er) = export_saved_config_data_locally(¶ms.config, &_current_config).await { error!("Cannot save actual imported config due to {}", er); } else { // recreation watcher (draining activity buffer mechanism) // if local config file was deleted and recreated // if local config file was modified locally match create_watcher("", local_config_path) { Ok(new) => watcher = new, Err(er) => error!("Cannot create new watcher due to {}", er), } } } sleep(Duration::from_millis(300)).await; // tokio::task::yield_now().await; } }, Err(_) => { error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path); return Err(anyhow::Error::msg("Cannot create watcher on local config file")); }, } } // [:IN-TEST] async fn from_cli_config_reciever( cli_oneshot: OneShotReciever, to_local_tx: OneShotSender ) -> Option { /* match awaits til channel*/ // dbg!("start of cli"); loop { if !cli_oneshot.is_empty() { match cli_oneshot.await { Ok(config_from_cli) => { info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation); let _ = to_local_tx.send(true); return Some(config_from_cli) }, _ => return None, } } sleep(Duration::from_millis(300)).await; } } async fn export_saved_config_data_locally( config_file_path: &PathBuf, current_config: &Processes ) -> anyhow::Result<()> { let mut file = File::create(config_file_path)?; file.write_all( serde_json::to_string_pretty(current_config)?.as_bytes() )?; Ok(()) // Ok(()) } } /// # Fn `load_processes` /// ## for reading and parsing *local* storing config /// /// *input* : `&str` /// /// *output* : `None` if local conf file doesn't exist or invalid | `Some(conf)` on finish reading and parsing /// /// *initiator* : func `get_actual_config` /// /// *managing* : conf file name in `&str` format /// /// *depends on* : struct `Processes` /// fn load_processes(json_filename: &str) -> Option { if let Ok(res) = fs::read_to_string(json_filename) { if let Ok(conf) = serde_json::from_str::(&res) { return Some(conf); } } None } /// # Fn `get_actual_config` /// ## for getting actual Monitor's config from local and remote storages /// /// *input* : - /// /// *output* : `None` on fatal error in mechanisms | `Some(conf)` on finish reading and parsing /// /// *initiator* : main thread /// /// *managing* : - /// /// *depends on* : struct `Processes` /// pub async fn get_actual_config(params : Arc) -> Option { // * if no local conf -> loop and +inf getting conf from redis server // * if local conf -> once getting conf from redis server let config_path = params.config.to_str().unwrap_or_else(|| { error!("Invalid character in config file. Config path was set to default"); "settings.json" }); info!("Configurating config module with params: no-sub={}, local config path={:?}, remote server={}", params.no_sub, params.config, params.remote_server_url); match load_processes(config_path) { Some(local_conf) => { info!( "Found local configuration, version - {}", &local_conf.date_of_creation ); if !params.no_sub { if let Some(remote_conf) = // TODO : rework with pubsub mech once_get_remote_configuration(&format!("redis://{}/", ¶ms.remote_server_url)) { return match config_comparing(&local_conf, &remote_conf) { ConfigActuality::Local => { info!("Local config is actual"); Some(local_conf) } ConfigActuality::Remote => { info!("Pulled config is more actual. Saving changes!"); if save_new_config(&remote_conf, config_path).is_err() { error!("Saving changes process failed due to unexpected error...") } Some(remote_conf) } }; } } Some(local_conf) } None => { warn!("No local valid conf was found. Trying to pull remote one..."); if !params.no_sub { let mut conn = get_connection_watcher(&open_watcher(&format!("redis://{}/", ¶ms.remote_server_url))); if let Some(conf) = get_remote_conf_watcher(&mut conn).await { info!("Config {} was pulled from Redis-Server. Starting...", &conf.date_of_creation); let _ = save_new_config(&conf, config_path); return Some(conf); } } None } } } /// # Fn `get_remote_conf_watcher` /// ## for infinitive pulling remote config /// /// *input* : `&mut Connection` /// /// *output* : `None` on fatal error | `Some(conf)` on succesfull pulling /// /// *initiator* : fn `get_actual_config` /// /// *managing* : mut ref `Connection` object /// /// *depends on* : struct `Processes` /// async fn get_remote_conf_watcher(conn : &mut Connection) -> Option { let mut conn = conn.as_pubsub(); let cont = crate::utils::get_container_id(); loop { match cont { Some(ref cont) => { let cont = cont.trim(); if conn.subscribe(cont).is_err() { // todo : delay continue; } match conn.get_message() { Ok(msg) => { let msg: Result = msg.get_payload(); if let Ok(payload) = msg { if let Some(remote) = parse_extern_config(&payload) { return Some(remote) } else { error!("Pulled invalid config, cannot start. Waiting for remote conf..."); } } else { error!("Cannot get Redis message payload. Waiting for remote conf..."); } // todo : delay continue; }, Err(_) => { // todo : delay continue; }, } }, None => { error!("Cannot get container id. Returning"); break }, } } None } /// # Fn `get_remote_conf_watcher` /// ## for trying to pull remote config /// /// > only for situation when local isn't None (no need to fck redis server) /// /// *input* : `&str` /// /// *output* : `None` on empty pubsub or error | `Some(conf)` on succesfull pulling /// /// *initiator* : fn `get_actual_config` /// /// *managing* : &str of Redis Server credentials /// /// *depends on* : struct `Processes` /// fn once_get_remote_configuration(serv_info: &str) -> Option { let cont = crate::utils::get_container_id(); match Client::open(serv_info) { Ok(client) => { match client.get_connection() { Ok(mut conn) => { let mut conn = conn.as_pubsub(); match conn.subscribe(cont) { Ok(_) => { if conn.set_read_timeout(Some(Duration::from_millis(100))).is_err() { error!("Cannot set reading pubsub timeout and pull remote config"); return None; } match conn.get_message() { Ok(msg) => { info!("Pulled config from Redis Server"); let get_payload: Result = msg.get_payload(); match get_payload { Ok(payload) => { let remote = parse_extern_config(&payload); if remote.is_none() { error!("Pulled config is invalid. Check it in Redis Server"); } remote }, Err(_) => { error!("Cannot extract payload from new message. Check Redis Server state"); None }, } }, Err(_) => { None }, } }, Err(_) => { error!("Redis subscription process failed. Check Redis configuration!"); None } } } Err(_) => { error!("Redis connection attempt is failed. Check Redis configuration!"); None } } } Err(_) => { error!("Redis-Client opening attempt is failed. Check network configuration!"); None } } } // ! watchers /// # Fn `open_watcher` /// ## for infinitive opening Redis client /// /// > only for situation when local isn't None (no need to fck redis server) /// /// *input* : `Option` /// /// *output* : redis::Client on successful opening client /// /// *initiator* : fn `get_actual_config` /// /// *managing* : &str of Redis Server credentials /// /// *depends on* : struct `redis::Client` /// fn open_watcher(serv_info: &str) -> Client { loop { match Client::open(serv_info) { Ok(redis) => { info!("Successfully opened Redis-Client"); return redis; } Err(_) => { error!("Redis-Client opening attempt is failed. Check network configuration! Retrying..."); std::thread::sleep(Duration::from_secs(4)); } } } } /// # Fn `get_connection_watcher` /// ## for infinitive establishing Redis connection on existing client /// /// > only for situation when local isn't None (no need to fck redis server) /// /// *input* : `&Client` /// /// *output* : `Connection` /// /// *initiator* : fn `get_actual_config` /// /// *managing* : &Client for opening connection /// /// *depends on* : struct `redis::Connection` /// fn get_connection_watcher(client: &Client) -> Connection { loop { match client.get_connection() { Ok(conn) => { info!("Successfully got Redis connection object"); return conn; } Err(_) => { error!( "Redis connection attempt is failed. Check Redis configuration! Retrying..." ); std::thread::sleep(Duration::from_secs(4)); } } } } /// # Fn `restart_main_thread` /// ## for restart monitor with new config /// /// *input* : - /// /// *output* : `Ok(())` on valid restart | `Err(er)` on error /// /// *initiator* : fn `subscribe_config_stream` /// /// *managing* : - /// /// *depends on* : - /// fn restart_main_thread() -> std::io::Result<()> { let current_exe = env::current_exe()?; Command::new(current_exe).exec(); Ok(()) } /// # Fn `subscribe_config_stream` /// ## for subscribe on changes, pulling to Redis pubsub to get more actual config /// /// *input* : `Arc` /// /// *output* : `Ok(())` on end of work | `Err(er)` on error with subscribing mechanism /// /// *initiator* : fn `subscribe_config_stream` /// /// *managing* : `Arc` to compare old config with new pulled /// /// *depends on* : `Processes` /// pub async fn subscribe_config_stream(actual_prcs: Arc, params: Arc) -> Result<(), CustomError> { let config_path = params.config.to_str().unwrap_or_else(|| "settings.json"); if params.no_sub { return Err(CustomError::Fatal); } if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { if let Ok(mut conn) = client.get_connection() { match crate::utils::get_container_id() { Some(channel_name) => { let channel_name = channel_name.trim(); let mut pubsub = conn.as_pubsub(); if pubsub.subscribe(&channel_name).is_ok() { info!("Runner subscribed on config update publishing in channel {}", &channel_name); loop { if let Ok(msg) = pubsub.get_message() { let get_remote_config: Result = msg.get_payload(); match get_remote_config { Ok(payload) => { if let Some(remote_config) = parse_extern_config(&payload) { match config_comparing(&actual_prcs, &remote_config) { ConfigActuality::Remote => { warn!("Pulled config is actual. Saving and restarting..."); if save_new_config(&remote_config, config_path).is_err() { error!("Error with saving new config to {}. Stopping sub mechanism...", config_path); return Err(CustomError::Fatal); } if restart_main_thread().is_err() { error!("Error with restarting Runner. Stopping sub mechanism..."); return Err(CustomError::Fatal); } } _ => { warn!("Pulled new config. Current config is more actual ..."); continue }, } } else { error!("Invalid conig was pulled"); } }, Err(_) => { error!("Cannot extract new config from message"); break; }, } } sleep(Duration::from_secs(30)).await; } } else { error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name); } }, None => { error!("Cannot get channel name"); } } } } error!("Error with subscribing Redis stream on update. Working only with selected config..."); Err(CustomError::Fatal) } /// # Fn `config_comparing` /// ## for compare old and new configs /// /// *input* : local: `&Processes`, remote: `&Processes` /// /// *output* : `ConfigActuality::Local` or `ConfigActuality::Remote` /// /// *initiator* : fn `subscribe_config_stream`, fn `get_actual_config` /// /// *managing* : two objects `&Processes` /// /// *depends on* : `Processes`, `ConfigActuality` /// pub fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { if local.is_default() { return ConfigActuality::Remote; } let local_date: u64 = local.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap(); match local_date.cmp(&remote_date) { std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => ConfigActuality::Local, std::cmp::Ordering::Less => ConfigActuality::Remote, } } // ! TEMPORARILY DEPRECATED ! // fn native_date_from_millis(mls: &str) -> Option> { // match mls.parse::(){ // Ok(val) => return chrono::DateTime::from_timestamp_millis(val), // Err(_) => return None, // } // } /// # Fn `save_new_config` /// ## mechanism for saving new config in local storage /// /// *input* : `&Processes`, `&str` /// /// *output* : `Ok(())` on succesfull saving | Err(er) on fs error /// /// *initiator* : fn `subscribe_config_stream`, fn `get_actual_config` /// /// *managing* : new config object: `&Processes` and config file name: `&str` /// /// *depends on* : `Processes` /// fn save_new_config(config: &Processes, config_file: &str) -> Result<(), CustomError> { match serde_json::to_string_pretty(&config) { // Ok(st) => match fs::write(config_file, st) { // Ok(_) => Ok(()), // Err(_) => Err(CustomError::Fatal), // }, Ok(st) => { let file = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(false) .open(config_file); match file { Ok(fs) => { let mut writer = fs; match writeln!(writer, "{}", st) { Ok(_) => Ok(()), Err(_) => Err(CustomError::Fatal), } } Err(_) => Err(CustomError::Fatal), } } Err(_) => Err(CustomError::Fatal), } } /// # Fn `parse_extern_config` /// ## for parsing &str to Processes /// /// *input* : `&str` /// /// *output* : parsed config in Some(Processes) | None on error with parsing /// /// *initiator* : fn `subscribe_config_stream`, fn `once_get_remote_configuration`, fn `get_remote_conf` /// /// *managing* : unparsed config `&str` /// /// *depends on* : `Processes` /// fn parse_extern_config(json_string: &str) -> Option { if let Ok(des) = serde_json::from_str::(json_string) { return Some(des); } None } // unit tests #[cfg(test)] mod config_unittests { use super::*; #[test] fn parsing_valid_conf() { assert!(load_processes("tests/examples/settings.json").is_some()); } #[test] fn parsing_invalid_conf() { assert!(load_processes("tests/examples/invalid_config.json").is_none()); } #[test] fn configuration_comparing() { // old one (kinda local) let a = Processes { date_of_creation: String::from("1"), processes: vec![], }; // new one (kinda remote) let b = Processes { date_of_creation: String::from("2"), processes: vec![], }; assert_eq!(config_comparing(&a, &b), ConfigActuality::Remote); } // TODO : strange output // #[test] // fn get_actual_config_mechanism() { // assert!(get_actual_config().is_some()) // } #[test] fn save_config() { let a = Processes { date_of_creation: String::from("1"), processes: vec![], }; assert!(save_new_config(&a, "tests/examples/save-conf.json").is_ok()); } #[test] fn save_to_zero_file() { let a = Processes { date_of_creation: String::from("1"), processes: vec![], }; assert!(save_new_config(&a, "tests/examples/none.json").is_ok()); } }