pubsub_config_reciever + cli-local adj

feature/configv2
prplV 2025-02-05 17:38:41 +03:00
parent 8c1998c93f
commit 3d88967281
2 changed files with 137 additions and 21 deletions

View File

@ -14,6 +14,8 @@ use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::*; use utils::*;
use options::preboot::PrebootParams; use options::preboot::PrebootParams;
use tokio::sync::{broadcast, oneshot};
use options::config::v2::init_config_mechanism;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()>{ async fn main() -> anyhow::Result<()>{
@ -21,7 +23,25 @@ async fn main() -> anyhow::Result<()>{
let _ = setup_logger(); let _ = setup_logger();
info!("Runner is configurating..."); info!("Noxis is configurating...");
let (tx_brd, mut _rx_brd) = broadcast::channel::<Processes>(1);
let (_tx_oneshot, rx_oneshot) = oneshot::channel::<Processes>();
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
let config_module = tokio::spawn(async move {
let _ = init_config_mechanism(
rx_oneshot,
tx_brd,
preboot.clone()
).await;
});
handler.push(config_module);
for i in handler {
let _ = i.await;
}
// setting up redis connection \ // setting up redis connection \
// then conf checks to choose the most actual \ // then conf checks to choose the most actual \

View File

@ -12,7 +12,7 @@ use tokio::time::{Duration, sleep};
// use redis::PubSub; // use redis::PubSub;
use tokio::sync::{ use tokio::sync::{
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender }, oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
broadcast::Sender as BroadcastSender }; broadcast::Sender as BroadcastSender, broadcast::Receiver as BroadcastReceiver };
use crate::utils::files::create_watcher; use crate::utils::files::create_watcher;
use std::fs::File; use std::fs::File;
use inotify::EventMask; use inotify::EventMask;
@ -20,18 +20,22 @@ use inotify::EventMask;
// const CONFIG_PATH: &str = "settings.json"; // const CONFIG_PATH: &str = "settings.json";
pub mod v2 { pub mod v2 {
use core::error; use std::{fmt::format, path::PathBuf};
use std::path::PathBuf; use crate::utils::get_container_id;
use super::*; use super::*;
pub async fn init_config_mechanism( pub async fn init_config_mechanism(
// to handle cli config changes // to handle cli config changes
_cli_oneshot: OneShotReciever<Processes>, _cli_oneshot: OneShotReciever<Processes>,
// to share local config with PRCS and CLI_PIPELINE modules // to share local config with PRCS, CLI_PIPELINE and CONFIG modules
_brd_tx : Arc<BroadcastSender<Processes>> brd_tx : BroadcastSender<Processes>,
// preboot params (args)
params : Arc<PrebootParams>
/*...*/ /*...*/
) { ) {
// channel for pubsub to handle local config pulling
let _local_config_brd_reciever = brd_tx.subscribe();
/* local + pubsub + cli oneshot check */ /* local + pubsub + cli oneshot check */
} }
pub async fn get_redis_connection(params: Arc<PrebootParams>) -> Option<Connection> { pub async fn get_redis_connection(params: Arc<PrebootParams>) -> Option<Connection> {
@ -42,6 +46,7 @@ pub mod v2 {
loop { loop {
if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) { if let Ok(client) = Client::open(format!("redis://{}/", &params.remote_server_url)) {
if let Ok(conn) = client.get_connection() { if let Ok(conn) = client.get_connection() {
info!("Successfully opened Redis connection");
return Some(conn); return Some(conn);
} }
} }
@ -53,27 +58,98 @@ pub mod v2 {
// loop checking redis pubsub // loop checking redis pubsub
async fn pubsub_config_reciever( async fn pubsub_config_reciever(
// to subscribe redis pubsub channel and recieve configs
_redis_connection : Option<Connection>,
// to stop checking local config // to stop checking local config
_local_conf_tx: Arc<OneShotSender<bool>> local_conf_tx : OneShotSender<bool>,
) { params : Arc<PrebootParams>,
tx_brd_local : BroadcastReceiver<Processes>,
) -> anyhow::Result<()>{
/*...*/ /*...*/
let mut tx_brd_local = tx_brd_local;
let mut _local_config = Processes::default();
return match get_redis_connection(params.clone()).await {
Some(mut conn) => {
//
let mut pub_sub = conn.as_pubsub();
match pub_sub.subscribe(get_container_id().unwrap_or(String::from("default"))) {
Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er);
Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
},
Ok(_) => {
loop {
// brd check
// if let Ok(new_lc) = tx_brd_local.recv().await {
// }
if !tx_brd_local.is_empty() {
match tx_brd_local.recv().await {
Ok(lc) => _local_config = lc,
Err(er) => {
error!("Cannot get imported local config due to {}", &er);
return Err(anyhow::Error::msg(
format!("Cannot get imported local config due to {}", er))
)
}
}
}
// pubsub check
if let Ok(msg) = pub_sub.get_message() {
let payload : Result<String, _> = msg.get_payload();
match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
Ok(payload) => {
if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&_local_config, &remote) {
ConfigActuality::Local => {
warn!("Pulled new config from Redis channel. Current config is more actual ...");
},
ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
// to stop watching local config file mechanism
let _ = local_conf_tx.send(true);
let config_path = params.config.to_str().unwrap_or("settings.json");
if save_new_config(&remote, &config_path).is_err() {
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
return Err(anyhow::Error::msg(
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
))
}
return Ok(());
},
}
}
else {
warn!("Invalid config was pulled from Redis channel")
}
},
}
}
// delay
sleep(Duration::from_millis(500)).await;
}
},
}
},
None => Err(anyhow::Error::msg("Cannot create Redis connection"))
}
} }
// //
async fn local_config_reciever( async fn local_config_reciever(
params : Arc<PrebootParams>, params : Arc<PrebootParams>,
pubsub_oneshot : OneShotReciever<bool>, pubsub_oneshot : OneShotReciever<bool>,
cli_oneshot : OneShotReciever<bool>,
brd_tx : Arc<BroadcastSender<Processes>>, brd_tx : Arc<BroadcastSender<Processes>>,
/*...*/ /*...*/
) { ) -> anyhow::Result<()> {
/*...*/ /*...*/
// borrowing as mut // borrowing as mut
let mut pubsub_oneshot = pubsub_oneshot; let mut pubsub_oneshot = pubsub_oneshot;
let mut cli_oneshot = cli_oneshot;
// fill with default empty config, mut to change later // fill with default empty config, mut to change later
let mut current_config = Processes::default(); let mut _current_config = Processes::default();
// PathBuf to &str to work with local config path as slice // PathBuf to &str to work with local config path as slice
let local_config_path = params let local_config_path = params
.config .config
@ -84,15 +160,15 @@ pub mod v2 {
// if local exists // if local exists
Some(conf) => { Some(conf) => {
info!("Local config `{}` was found.", &conf.date_of_creation); info!("Local config `{}` was found.", &conf.date_of_creation);
current_config = conf; _current_config = conf;
if let Err(er) = brd_tx.send(current_config.clone()) { if let Err(er) = brd_tx.send(_current_config.clone()) {
error!("Cannot share local config with broadcast due to {}", er); error!("Cannot share local config with broadcast due to {}", er);
} }
}, },
// if local is not exist // if local is not exist
None => { None => {
warn!("Local config wasn't found. Waiting for new ..."); warn!("Local config wasn't found. Waiting for new ...");
return; return Err(anyhow::Error::msg("No local config"));
// ... // ...
}, },
} }
@ -112,7 +188,16 @@ pub mod v2 {
// it's because pubsub mech pulled new valid and actual config and now it's time to ... // it's because pubsub mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread // ... overwrite local config file and restart main thread
if let Ok(_) = pubsub_oneshot.try_recv() { if let Ok(_) = pubsub_oneshot.try_recv() {
return; sleep(Duration::from_secs(1)).await;
return Ok(());
}
// catching signal from cli
// it's because cli mech pulled new valid and actual config and now it's time to ...
// ... overwrite local config file and restart main thread (like in previous mechanism)
if let Ok(_) = cli_oneshot.try_recv() {
sleep(Duration::from_secs(1)).await;
return Ok(());
} }
// ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ... // ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ...
@ -142,10 +227,10 @@ pub mod v2 {
} }
// exporting data // exporting data
if need_to_export_config { if need_to_export_config {
if let Err(er) = export_saved_config_data_locally(&params.config, &current_config).await { if let Err(er) = export_saved_config_data_locally(&params.config, &_current_config).await {
error!("Cannot save actual imported config due to {}", er); error!("Cannot save actual imported config due to {}", er);
} else { } else {
// recreation watcher // recreation watcher (draining activity buffer mechanism)
// if local config file was deleted and recreated // if local config file was deleted and recreated
// if local config file was modified locally // if local config file was modified locally
match create_watcher("", local_config_path).await { match create_watcher("", local_config_path).await {
@ -159,16 +244,24 @@ pub mod v2 {
}, },
Err(_) => { Err(_) => {
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path); error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
return Err(anyhow::Error::msg("Cannot create watcher on local config file"));
}, },
} }
} }
// [:IN-TEST] // [:IN-TEST]
async fn from_cli_config_reciever(cli_oneshot: OneShotReciever<Processes>) -> Option<Processes> { async fn from_cli_config_reciever(
cli_oneshot: OneShotReciever<Processes>,
to_local_tx: OneShotSender<bool>
) -> Option<Processes> {
/* match awaits til channel*/ /* match awaits til channel*/
match cli_oneshot.await { match cli_oneshot.await {
Ok(config_from_cli) => return Some(config_from_cli), Ok(config_from_cli) => {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
let _ = to_local_tx.send(true);
Some(config_from_cli)
},
_ => None, _ => None,
} }
} }
@ -178,7 +271,7 @@ pub mod v2 {
current_config: &Processes current_config: &Processes
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut file = File::create_new(config_file_path)?; let mut file = File::create(config_file_path)?;
Ok( Ok(
file.write_all( file.write_all(
serde_json::to_string_pretty(current_config)?.as_bytes() serde_json::to_string_pretty(current_config)?.as_bytes()
@ -574,6 +667,9 @@ pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>, params: Arc<Pr
/// *depends on* : `Processes`, `ConfigActuality` /// *depends on* : `Processes`, `ConfigActuality`
/// ///
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
if local.is_default() {
return ConfigActuality::Remote;
}
let local_date: u64 = local.date_of_creation.parse().unwrap(); let local_date: u64 = local.date_of_creation.parse().unwrap();
let remote_date: u64 = remote.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap();