lc check delay decreased

feature/configv2
prplV 2025-02-06 13:50:18 +03:00
parent a8a7fd8a72
commit 98da769dd3
2 changed files with 61 additions and 16 deletions

View File

@ -1,5 +1,5 @@
{
"dateOfCreation": "1721381809104",
"dateOfCreation": "1721381809106",
"configServer": "localhost",
"processes": [
{
@ -30,4 +30,4 @@
}
}
]
}
}

View File

@ -21,14 +21,14 @@ use inotify::EventMask;
// const CONFIG_PATH: &str = "settings.json";
pub mod v2 {
use std::{fmt::format, path::PathBuf};
use std::path::PathBuf;
use crate::utils::get_container_id;
use super::*;
pub async fn init_config_mechanism(
// to handle cli config changes
_cli_oneshot: OneShotReciever<Processes>,
cli_oneshot: OneShotReciever<Processes>,
// to share local config with PRCS, CLI_PIPELINE and CONFIG modules
brd_tx : BroadcastSender<Processes>,
// preboot params (args)
@ -36,20 +36,53 @@ pub mod v2 {
/*...*/
) {
// channel for pubsub to handle local config pulling
let _local_config_brd_reciever = brd_tx.subscribe();
let local_config_brd_reciever = brd_tx.subscribe();
// channel between pub-sub mech and local config mech
let (tx_pb_lc, rx_pb_lc) = oneshot::channel::<bool>();
// channel between cli mech and local config mech
let (tx_cli_lc, rx_cli_lc) = oneshot::channel::<bool>();
// dbg!("before lc");
let params_clone = params.clone();
// future to init work with local config
let lc_future = local_config_reciever(
params,
rx_pb_lc,
rx_cli_lc,
Arc::new(brd_tx)
);
// TODO! futures + select!
let lc_future = tokio::spawn(async move {
// let params = params.clone();
let _ = local_config_reciever(
params_clone,
rx_pb_lc,
rx_cli_lc,
Arc::new(brd_tx)
).await;
});
// dbg!("before pb");
// future to init work with pub sub mechanism
let pubsub_future = tokio::spawn(async move {
let _ = pubsub_config_reciever(
tx_pb_lc,
params.clone(),
local_config_brd_reciever
).await;
});
// dbg!("before cli");
// future to catch new configs from cli pipeline
let cli_future = tokio::spawn(async move {
from_cli_config_reciever(
cli_oneshot,
tx_cli_lc
).await;
});
// let _ = lc_future.await;
// dbg!("before select");
tokio::select! {
lc_result = lc_future => {dbg!("end of lc");},
pb_result = pubsub_future => {dbg!("end of pb");},
cli_config_option = cli_future => {dbg!("end of cli");},
}
// dbg!("after select");
// TODO! futures + select! [OK]
// TODO! tests config
}
pub async fn get_redis_connection(params: Arc<PrebootParams>) -> Option<Connection> {
@ -78,18 +111,22 @@ pub mod v2 {
tx_brd_local : BroadcastReceiver<Processes>,
) -> anyhow::Result<()>{
/*...*/
// dbg!("start of pb");
let mut tx_brd_local = tx_brd_local;
let mut _local_config = Processes::default();
return match get_redis_connection(params.clone()).await {
Some(mut conn) => {
//
let mut pub_sub = conn.as_pubsub();
match pub_sub.subscribe(get_container_id().unwrap_or(String::from("default"))) {
let channel_name = get_container_id().unwrap_or(String::from("default"));
let channel_name = channel_name.trim();
match pub_sub.subscribe(channel_name) {
Err(er) => {
error!("Cannot subscribe pubsub channel due to {}", &er);
Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
},
Ok(_) => {
info!("Successfully subscribed to {} pubsub channel", channel_name);
loop {
// brd check
// if let Ok(new_lc) = tx_brd_local.recv().await {
@ -108,6 +145,7 @@ pub mod v2 {
}
// pubsub check
if let Ok(msg) = pub_sub.get_message() {
// dbg!("ok on get message");
let payload : Result<String, _> = msg.get_payload();
match payload {
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
@ -115,7 +153,7 @@ pub mod v2 {
if let Some(remote) = parse_extern_config(&payload) {
match config_comparing(&_local_config, &remote) {
ConfigActuality::Local => {
warn!("Pulled new config from Redis channel. Current config is more actual ...");
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
},
ConfigActuality::Remote => {
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
@ -140,6 +178,7 @@ pub mod v2 {
}
}
// delay
// dbg!("before sleep pubsub");
sleep(Duration::from_millis(500)).await;
}
},
@ -158,7 +197,6 @@ pub mod v2 {
/*...*/
) -> anyhow::Result<()> {
/*...*/
// borrowing as mut
let mut pubsub_oneshot = pubsub_oneshot;
let mut cli_oneshot = cli_oneshot;
@ -193,6 +231,7 @@ pub mod v2 {
Ok(mut watcher) => {
loop {
let mut need_to_export_config = false;
// let mut need_to_recreate_watcher = false;
// return situations here
// 1) oneshot signal
// 2) if config was deleted -> recreate and fill with current config that is held here
@ -221,6 +260,7 @@ pub mod v2 {
if !params.config.exists() {
warn!("Local config file was deleted or moved. Recreating new one with saved data ...");
need_to_export_config = true;
// need_to_recreate_watcher = true;
} else {
// changes check
let mut buffer = [0; 128];
@ -236,6 +276,10 @@ pub mod v2 {
if !events.is_empty() {
warn!("Local config file was overwritten. Discarding changes ...");
need_to_export_config = true;
// events
// .iter()
// .any(|event| *event == EventMask::DELETE_SELF)
// .then(|| need_to_recreate_watcher = true);
}
}
}
@ -253,7 +297,7 @@ pub mod v2 {
}
}
}
sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(300)).await;
}
},
Err(_) => {
@ -270,6 +314,7 @@ pub mod v2 {
to_local_tx: OneShotSender<bool>
) -> Option<Processes> {
/* match awaits til channel*/
dbg!("start of cli");
match cli_oneshot.await {
Ok(config_from_cli) => {
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);