local_config_reciever created with fn's helpers
parent
f560dfebc5
commit
8c1998c93f
|
|
@ -1,6 +1,6 @@
|
|||
use super::structs::*;
|
||||
use log::{error, info, warn};
|
||||
use redis::{Client, Connection, PubSub};
|
||||
use redis::{Client, Connection};
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Write;
|
||||
use std::os::unix::process::CommandExt;
|
||||
|
|
@ -10,12 +10,19 @@ use std::{env, fs};
|
|||
use super::preboot::PrebootParams;
|
||||
use tokio::time::{Duration, sleep};
|
||||
// use redis::PubSub;
|
||||
use tokio::sync::oneshot::Receiver as OneShotReciever;
|
||||
use tokio::sync::broadcast::Sender as BroadcastSender;
|
||||
use tokio::sync::{
|
||||
oneshot::{ Receiver as OneShotReciever, Sender as OneShotSender },
|
||||
broadcast::Sender as BroadcastSender };
|
||||
use crate::utils::files::create_watcher;
|
||||
use std::fs::File;
|
||||
use inotify::EventMask;
|
||||
|
||||
// const CONFIG_PATH: &str = "settings.json";
|
||||
|
||||
pub mod v2 {
|
||||
use core::error;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::*;
|
||||
|
||||
pub async fn init_config_mechanism(
|
||||
|
|
@ -34,62 +41,151 @@ pub mod v2 {
|
|||
let mut connection_delay: u64 = 1;
|
||||
loop {
|
||||
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||
if let Ok(mut conn) = client.get_connection() {
|
||||
match crate::utils::get_container_id() {
|
||||
Some(channel_name) => {
|
||||
// let channel_name = channel_name.trim();
|
||||
let mut pubsub = conn.as_pubsub();
|
||||
if pubsub.subscribe(&channel_name.trim()).is_ok() {
|
||||
|
||||
todo!()
|
||||
|
||||
} else {
|
||||
error!("Cannot subscribe channel {}. Check Redis Server status", &channel_name);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
error!("Cannot get channel name");
|
||||
}
|
||||
}
|
||||
if let Ok(conn) = client.get_connection() {
|
||||
return Some(conn);
|
||||
}
|
||||
}
|
||||
error!("Error with subscribing Redis stream on update. Retrying in {} secs...", connection_delay);
|
||||
sleep(Duration::from_secs(connection_delay)).await;
|
||||
connection_delay *= 2;
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
// loop checking redis pubsub
|
||||
async fn pubsub_config_reciever(
|
||||
// to subscribe redis pubsub channel and recieve configs
|
||||
_redis_connection : Option<Connection>,
|
||||
// to stop checking local config
|
||||
_local_conf_tx: Arc<OneShotSender<bool>>
|
||||
) {
|
||||
/*...*/
|
||||
}
|
||||
|
||||
//
|
||||
pub async fn local_config_reciever(
|
||||
async fn local_config_reciever(
|
||||
params : Arc<PrebootParams>,
|
||||
pubsub_oneshot : OneShotReciever<bool>,
|
||||
brd_tx : Arc<BroadcastSender<Processes>>,
|
||||
/*...*/
|
||||
) {
|
||||
/*...*/
|
||||
|
||||
// {:1} if local config is not exist -> cannot create watcher -> None
|
||||
// {:2} if local config exists -> load_processes
|
||||
// |
|
||||
// | [Ok(Processes)]
|
||||
// -> 1) broadcast sending parsed config to PRCS and CLI_PIPELINE
|
||||
// 2) watcher in loop to deny local changes
|
||||
// |
|
||||
// | Err(_)
|
||||
// ->
|
||||
// ????
|
||||
/*...*/
|
||||
|
||||
// borrowing as mut
|
||||
let mut pubsub_oneshot = pubsub_oneshot;
|
||||
// fill with default empty config, mut to change later
|
||||
let mut current_config = Processes::default();
|
||||
// PathBuf to &str to work with local config path as slice
|
||||
let local_config_path = params
|
||||
.config
|
||||
.to_str()
|
||||
.unwrap_or("settings.json");
|
||||
|
||||
match load_processes(local_config_path) {
|
||||
// if local exists
|
||||
Some(conf) => {
|
||||
info!("Local config `{}` was found.", &conf.date_of_creation);
|
||||
current_config = conf;
|
||||
if let Err(er) = brd_tx.send(current_config.clone()) {
|
||||
error!("Cannot share local config with broadcast due to {}", er);
|
||||
}
|
||||
},
|
||||
// if local is not exist
|
||||
None => {
|
||||
warn!("Local config wasn't found. Waiting for new ...");
|
||||
return;
|
||||
// ...
|
||||
},
|
||||
}
|
||||
|
||||
// 100% local exists here
|
||||
// create watcher on local config file
|
||||
match create_watcher("", local_config_path).await {
|
||||
Ok(mut watcher) => {
|
||||
loop {
|
||||
let mut need_to_export_config = false;
|
||||
// return situations here
|
||||
// 1) oneshot signal
|
||||
// 2) if config was deleted -> recreate and fill with current config that is held here
|
||||
// 3) if config was changed -> fill with current config that is held here
|
||||
|
||||
// catching signal from pubsub
|
||||
// it's because pubsub mech pulled new valid and actual config and now it's time to ...
|
||||
// ... overwrite local config file and restart main thread
|
||||
if let Ok(_) = pubsub_oneshot.try_recv() {
|
||||
return;
|
||||
}
|
||||
|
||||
// ! IF NOXIS NEEDS TO RECREATE OR CHANGE LOCAL CONFIG NEED TO DRAIN THIS ACTIVITY ...
|
||||
// ! ... FROM WATCHER"S BUFFER
|
||||
|
||||
// existing check
|
||||
if !params.config.exists() {
|
||||
warn!("Local config file was deleted or moved. Recreating new one with saved data ...");
|
||||
need_to_export_config = true;
|
||||
} else {
|
||||
// changes check
|
||||
let mut buffer = [0; 128];
|
||||
let events = watcher.read_events(&mut buffer);
|
||||
if events.is_ok() {
|
||||
let events: Vec<EventMask> = events
|
||||
.unwrap()
|
||||
.map(|mask| mask.mask)
|
||||
.filter(|mask| {
|
||||
*mask == EventMask::MODIFY || *mask == EventMask::DELETE_SELF
|
||||
})
|
||||
.collect();
|
||||
if !events.is_empty() {
|
||||
warn!("Local config file was overwritten. Discarding changes ...");
|
||||
need_to_export_config = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// exporting data
|
||||
if need_to_export_config {
|
||||
if let Err(er) = export_saved_config_data_locally(¶ms.config, ¤t_config).await {
|
||||
error!("Cannot save actual imported config due to {}", er);
|
||||
} else {
|
||||
// recreation watcher
|
||||
// if local config file was deleted and recreated
|
||||
// if local config file was modified locally
|
||||
match create_watcher("", local_config_path).await {
|
||||
Ok(new) => watcher = new,
|
||||
Err(er) => error!("Cannot create new watcher due to {}", er),
|
||||
}
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Cannot create watcher on local config file `{}`. Deinitializing warding local config mechanism...", local_config_path);
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// [:IN-TEST]
|
||||
pub async fn cli_config_reciever(cli_oneshot: OneShotReciever<Processes>) -> Option<Processes> {
|
||||
async fn from_cli_config_reciever(cli_oneshot: OneShotReciever<Processes>) -> Option<Processes> {
|
||||
/* match awaits til channel*/
|
||||
match cli_oneshot.await {
|
||||
Ok(config_from_cli) => return Some(config_from_cli),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn export_saved_config_data_locally(
|
||||
config_file_path: &PathBuf,
|
||||
current_config: &Processes
|
||||
) -> anyhow::Result<()> {
|
||||
|
||||
let mut file = File::create_new(config_file_path)?;
|
||||
Ok(
|
||||
file.write_all(
|
||||
serde_json::to_string_pretty(current_config)?.as_bytes()
|
||||
)?
|
||||
)
|
||||
// Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue