diff --git a/noxis-rs/settings.json b/noxis-rs/settings.json index ba3eabf..10485c8 100644 --- a/noxis-rs/settings.json +++ b/noxis-rs/settings.json @@ -1,6 +1,6 @@ { - "dateOfCreation": "1721381809110", - "configServer": "localhost", + "dateOfCreation": "1721381809112", + "configServer": "192.168.2.37", "processes": [ { "name": "temp-process", diff --git a/noxis-rs/src/options/cli_pipeline.rs b/noxis-rs/src/options/cli_pipeline.rs index ad6a670..5456d83 100644 --- a/noxis-rs/src/options/cli_pipeline.rs +++ b/noxis-rs/src/options/cli_pipeline.rs @@ -5,6 +5,7 @@ use tokio::time::{sleep, Duration}; use std::{borrow::BorrowMut, net::{IpAddr, Ipv4Addr}}; // use std::io::BufReader; use tokio::io::{BufReader, AsyncWriteExt, AsyncBufReadExt}; +use tokio::{io::AsyncReadExt, net::UnixListener}; use noxis_cli::Cli; use serde_json::from_str; diff --git a/noxis-rs/src/options/config.rs b/noxis-rs/src/options/config.rs index 1bd106e..a1f3e56 100644 --- a/noxis-rs/src/options/config.rs +++ b/noxis-rs/src/options/config.rs @@ -119,12 +119,6 @@ pub mod v2 { let _ = restart_main_thread(); }, cli_config_option = cli_future => { - // match cli_config_option { - // Some(config) => {}, - // None => { - // error!("Cli pulling new config mechanism crushed, restarting ...") - // }, - // } match cli_config_option { Err(_) => error!("Cli pulling new config mechanism crushed, restarting ..."), Ok(option_config) => { @@ -144,22 +138,20 @@ pub mod v2 { // TODO! futures + select! [OK] // TODO! tests config } - pub async fn get_redis_connection(params: Arc) -> Option { - if params.no_sub { - return None; - } - let mut connection_delay: u64 = 1; - loop { - if let Ok(client) = Client::open(format!("redis://{}/", ¶ms.remote_server_url)) { + pub async fn get_redis_connection(params: &str) -> Option { + for i in 1..=3 { + let redis_url = format!("redis://{}/", params); + info!("Trying to connect Redis pubsub `{}`. Attempt {}", &redis_url, i); + if let Ok(client) = Client::open(redis_url) { if let Ok(conn) = client.get_connection() { info!("Successfully opened Redis connection"); return Some(conn); } } - error!("Error with subscribing Redis stream on update. Retrying in {} secs...", connection_delay); - sleep(Duration::from_secs(connection_delay)).await; - connection_delay *= 2; + error!("Error with subscribing Redis stream on update. Retrying in 5 secs..."); + sleep(Duration::from_secs(5)).await; } + None } // loop checking redis pubsub @@ -171,80 +163,88 @@ pub mod v2 { ) -> anyhow::Result<()>{ /*...*/ // dbg!("start of pb"); - let mut tx_brd_local = tx_brd_local; - let mut _local_config = Processes::default(); - return match get_redis_connection(params.clone()).await { - Some(mut conn) => { - // - let mut pub_sub = conn.as_pubsub(); - let channel_name = get_container_id().unwrap_or(String::from("default")); - let channel_name = channel_name.trim(); - match pub_sub.subscribe(channel_name) { - Err(er) => { - error!("Cannot subscribe pubsub channel due to {}", &er); - Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) - }, - Ok(_) => { - info!("Successfully subscribed to {} pubsub channel", channel_name); - loop { - // brd check - // if let Ok(new_lc) = tx_brd_local.recv().await { + sleep(Duration::from_secs(1)).await; - // } - if !tx_brd_local.is_empty() { - match tx_brd_local.recv().await { - Ok(lc) => _local_config = lc, - Err(er) => { - error!("Cannot get imported local config due to {}", &er); - return Err(anyhow::Error::msg( - format!("Cannot get imported local config due to {}", er)) - ) + let mut tx_brd_local = tx_brd_local; + let mut local_config = Processes::default(); + + for retry in 1..=5 { + if !tx_brd_local.is_empty() { + match tx_brd_local.recv().await { + Ok(lc) => local_config = lc, + Err(er) => { + error!("Cannot get imported local config due to {}", &er); + return Err(anyhow::Error::msg( + format!("Cannot get imported local config due to {}", er)) + ) + } + } + } + match get_redis_connection(&local_config.config_server).await { + Some(mut conn) => { + // + let mut pub_sub = conn.as_pubsub(); + let channel_name = get_container_id().unwrap_or(String::from("default")); + let channel_name = channel_name.trim(); + match pub_sub.subscribe(channel_name) { + Err(er) => { + error!("Cannot subscribe pubsub channel due to {}", &er); + return Err(anyhow::Error::msg(format!("Cannot subscribe pubsub channel due to {}", er))) + }, + Ok(_) => { + info!("Successfully subscribed to {} pubsub channel", channel_name); + loop { + // pubsub check + if let Ok(msg) = pub_sub.get_message() { + // dbg!("ok on get message"); + let payload : Result = msg.get_payload(); + match payload { + Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), + Ok(payload) => { + if let Some(remote) = parse_extern_config(&payload) { + match config_comparing(&local_config, &remote) { + ConfigActuality::Local => { + warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); + }, + ConfigActuality::Remote => { + info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); + // to stop watching local config file mechanism + let _ = local_conf_tx.send(true); + let config_path = params.config.to_str().unwrap_or("settings.json"); + + if save_new_config(&remote, &config_path).is_err() { + error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); + return Err(anyhow::Error::msg( + format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) + )) + } + return Ok(()); + }, + } + } + else { + warn!("Invalid config was pulled from Redis channel") + } + }, } } + // delay + // dbg!("before sleep pubsub"); + sleep(Duration::from_millis(500)).await; } - // pubsub check - if let Ok(msg) = pub_sub.get_message() { - // dbg!("ok on get message"); - let payload : Result = msg.get_payload(); - match payload { - Err(_) => error!("Cannot read new config from Redis channel. Check network or Redis configuration "), - Ok(payload) => { - if let Some(remote) = parse_extern_config(&payload) { - match config_comparing(&_local_config, &remote) { - ConfigActuality::Local => { - warn!("Pulled new config from Redis channel, it's outdated. Ignoring ..."); - }, - ConfigActuality::Remote => { - info!("Pulled new actual config from Redis channel, version - `{}`", remote.date_of_creation); - // to stop watching local config file mechanism - let _ = local_conf_tx.send(true); - let config_path = params.config.to_str().unwrap_or("settings.json"); - - if save_new_config(&remote, &config_path).is_err() { - error!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path); - return Err(anyhow::Error::msg( - format!("Error with saving new config to {}. Stopping pubsub mechanism...", config_path) - )) - } - return Ok(()); - }, - } - } - else { - warn!("Invalid config was pulled from Redis channel") - } - }, - } - } - // delay - // dbg!("before sleep pubsub"); - sleep(Duration::from_millis(500)).await; - } - }, + }, + } + }, + None => { + warn!("Cannot validly connect Redis connection. Blocking task for 20 secs and restarting tries (attempt {})", retry); + sleep(Duration::from_secs(20)).await; } - }, - None => Err(anyhow::Error::msg("Cannot create Redis connection")) + } } + error!("End of retries. Stopping pubsub..."); + return Err(anyhow::Error::msg( + format!("End of retries. Stopping pubsub...") + )) } // @@ -373,7 +373,7 @@ pub mod v2 { to_local_tx: OneShotSender ) -> Option { /* match awaits til channel*/ - dbg!("start of cli"); + // dbg!("start of cli"); match cli_oneshot.await { Ok(config_from_cli) => { info!("New actual config `{}` from CLI was pulled. Saving and restaring ...", &config_from_cli.date_of_creation);