reding config lc+pubsub rework
parent
064611823a
commit
35a21da431
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"dateOfCreation": "1721381809110",
|
||||
"configServer": "localhost",
|
||||
"dateOfCreation": "1721381809112",
|
||||
"configServer": "192.168.2.37",
|
||||
"processes": [
|
||||
{
|
||||
"name": "temp-process",
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use tokio::time::{sleep, Duration};
|
|||
use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}};
|
||||
// use std::io::BufReader;
|
||||
use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt};
|
||||
use tokio::{io::AsyncReadExt, net::UnixListener};
|
||||
use noxis_cli::Cli;
|
||||
use serde_json::from_str;
|
||||
|
||||
|
|
|
|||
|
|
@ -119,12 +119,6 @@ pub mod v2 {
|
|||
let _ = restart_main_thread();
|
||||
},
|
||||
cli_config_option = cli_future => {
|
||||
// match cli_config_option {
|
||||
// Some(config) => {},
|
||||
// None => {
|
||||
// error!("Cli pulling new config mechanism crushed, restarting ...")
|
||||
// },
|
||||
// }
|
||||
match cli_config_option {
|
||||
Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."),
|
||||
Ok(option_config) => {
|
||||
|
|
@ -144,22 +138,20 @@ pub mod v2 {
|
|||
// TODO! futures + select! [OK]
|
||||
// TODO! tests config
|
||||
}
|
||||
pub async fn get_redis_connection(params: Arc<PrebootParams>) -> Option<Connection> {
|
||||
if params.no_sub {
|
||||
return None;
|
||||
}
|
||||
let mut connection_delay: u64 = 1;
|
||||
loop {
|
||||
if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) {
|
||||
pub async fn get_redis_connection(params: &str) -> Option<Connection> {
|
||||
for i in 1..=3 {
|
||||
let redis_url = format!("redis://{}/", params);
|
||||
info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i);
|
||||
if let Ok(client) = Client::open(redis_url) {
|
||||
if let Ok(conn) = client.get_connection() {
|
||||
info!("Successfully opened Redis connection");
|
||||
return Some(conn);
|
||||
}
|
||||
}
|
||||
error!("Error with subscribing Redis stream on update. Retrying in {} secs...", connection_delay);
|
||||
sleep(Duration::from_secs(connection_delay)).await;
|
||||
connection_delay *= 2;
|
||||
error!("Error with subscribing Redis stream on update. Retrying in 5 secs...");
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
// loop checking redis pubsub
|
||||
|
|
@ -171,80 +163,88 @@ pub mod v2 {
|
|||
) -> anyhow::Result<()>{
|
||||
/*...*/
|
||||
// dbg!("start of pb");
|
||||
let mut tx_brd_local = tx_brd_local;
|
||||
let mut _local_config = Processes::default();
|
||||
return match get_redis_connection(params.clone()).await {
|
||||
Some(mut conn) => {
|
||||
//
|
||||
let mut pub_sub = conn.as_pubsub();
|
||||
let channel_name = get_container_id().unwrap_or(String::from("default"));
|
||||
let channel_name = channel_name.trim();
|
||||
match pub_sub.subscribe(channel_name) {
|
||||
Err(er) => {
|
||||
error!("Cannot subscribe pubsub channel due to {}", &er);
|
||||
Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
|
||||
},
|
||||
Ok(_) => {
|
||||
info!("Successfully subscribed to {} pubsub channel", channel_name);
|
||||
loop {
|
||||
// brd check
|
||||
// if let Ok(new_lc) = tx_brd_local.recv().await {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// }
|
||||
if !tx_brd_local.is_empty() {
|
||||
match tx_brd_local.recv().await {
|
||||
Ok(lc) => _local_config = lc,
|
||||
Err(er) => {
|
||||
error!("Cannot get imported local config due to {}", &er);
|
||||
return Err(anyhow::Error::msg(
|
||||
format!("Cannot get imported local config due to {}", er))
|
||||
)
|
||||
let mut tx_brd_local = tx_brd_local;
|
||||
let mut local_config = Processes::default();
|
||||
|
||||
for retry in 1..=5 {
|
||||
if !tx_brd_local.is_empty() {
|
||||
match tx_brd_local.recv().await {
|
||||
Ok(lc) => local_config = lc,
|
||||
Err(er) => {
|
||||
error!("Cannot get imported local config due to {}", &er);
|
||||
return Err(anyhow::Error::msg(
|
||||
format!("Cannot get imported local config due to {}", er))
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
match get_redis_connection(&local_config.config_server).await {
|
||||
Some(mut conn) => {
|
||||
//
|
||||
let mut pub_sub = conn.as_pubsub();
|
||||
let channel_name = get_container_id().unwrap_or(String::from("default"));
|
||||
let channel_name = channel_name.trim();
|
||||
match pub_sub.subscribe(channel_name) {
|
||||
Err(er) => {
|
||||
error!("Cannot subscribe pubsub channel due to {}", &er);
|
||||
return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er)))
|
||||
},
|
||||
Ok(_) => {
|
||||
info!("Successfully subscribed to {} pubsub channel", channel_name);
|
||||
loop {
|
||||
// pubsub check
|
||||
if let Ok(msg) = pub_sub.get_message() {
|
||||
// dbg!("ok on get message");
|
||||
let payload : Result<String, _> = msg.get_payload();
|
||||
match payload {
|
||||
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
|
||||
Ok(payload) => {
|
||||
if let Some(remote) = parse_extern_config(&payload) {
|
||||
match config_comparing(&local_config, &remote) {
|
||||
ConfigActuality::Local => {
|
||||
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
|
||||
},
|
||||
ConfigActuality::Remote => {
|
||||
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
|
||||
// to stop watching local config file mechanism
|
||||
let _ = local_conf_tx.send(true);
|
||||
let config_path = params.config.to_str().unwrap_or("settings.json");
|
||||
|
||||
if save_new_config(&remote, &config_path).is_err() {
|
||||
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
|
||||
return Err(anyhow::Error::msg(
|
||||
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
|
||||
))
|
||||
}
|
||||
return Ok(());
|
||||
},
|
||||
}
|
||||
}
|
||||
else {
|
||||
warn!("Invalid config was pulled from Redis channel")
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// delay
|
||||
// dbg!("before sleep pubsub");
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
// pubsub check
|
||||
if let Ok(msg) = pub_sub.get_message() {
|
||||
// dbg!("ok on get message");
|
||||
let payload : Result<String, _> = msg.get_payload();
|
||||
match payload {
|
||||
Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "),
|
||||
Ok(payload) => {
|
||||
if let Some(remote) = parse_extern_config(&payload) {
|
||||
match config_comparing(&_local_config, &remote) {
|
||||
ConfigActuality::Local => {
|
||||
warn!("Pulled new config from Redis channel, it's outdated. Ignoring ...");
|
||||
},
|
||||
ConfigActuality::Remote => {
|
||||
info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation);
|
||||
// to stop watching local config file mechanism
|
||||
let _ = local_conf_tx.send(true);
|
||||
let config_path = params.config.to_str().unwrap_or("settings.json");
|
||||
|
||||
if save_new_config(&remote, &config_path).is_err() {
|
||||
error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path);
|
||||
return Err(anyhow::Error::msg(
|
||||
format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path)
|
||||
))
|
||||
}
|
||||
return Ok(());
|
||||
},
|
||||
}
|
||||
}
|
||||
else {
|
||||
warn!("Invalid config was pulled from Redis channel")
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// delay
|
||||
// dbg!("before sleep pubsub");
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
None => {
|
||||
warn!("Cannot validly connect Redis connection. Blocking task for 20 secs and restarting tries (attempt {})", retry);
|
||||
sleep(Duration::from_secs(20)).await;
|
||||
}
|
||||
},
|
||||
None => Err(anyhow::Error::msg("Cannot create Redis connection"))
|
||||
}
|
||||
}
|
||||
error!("End of retries. Stopping pubsub...");
|
||||
return Err(anyhow::Error::msg(
|
||||
format!("End of retries. Stopping pubsub...")
|
||||
))
|
||||
}
|
||||
|
||||
//
|
||||
|
|
@ -373,7 +373,7 @@ pub mod v2 {
|
|||
to_local_tx: OneShotSender<bool>
|
||||
) -> Option<Processes> {
|
||||
/* match awaits til channel*/
|
||||
dbg!("start of cli");
|
||||
// dbg!("start of cli");
|
||||
match cli_oneshot.await {
|
||||
Ok(config_from_cli) => {
|
||||
info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);
|
||||
|
|
|
|||
Loading…
Reference in New Issue