From 16c08fd8b473505e4bc358bc18feac39eb89bb80 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 16 Sep 2024 12:41:01 +0300 Subject: [PATCH] refactor + getting out redis conf pulling mech --- Cargo.toml | 3 +++ settings.json | 4 ++-- src/config.rs | 28 +++++++++++++++++++--------- src/files.rs | 1 + src/main.rs | 2 +- src/services.rs | 10 ++++++---- src/utils.rs | 2 +- 7 files changed, 33 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 258c689..cac2a04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,9 @@ name = "runner-rs" version = "0.5.5" edition = "2021" +[profile.dev] +debug = true + [dependencies] chrono = "0.4.38" env_logger = "0.11.3" diff --git a/settings.json b/settings.json index 688a11f..1666ac9 100644 --- a/settings.json +++ b/settings.json @@ -55,7 +55,7 @@ "hostname": "google.com", "port": 443, "triggers": { - "wait": 14, + "wait": 5, "delay": 1, "onLost": "stop" } @@ -64,7 +64,7 @@ "hostname": "localhost", "port": 8080, "triggers": { - "wait": 10, + "wait": 6, "delay": 2, "onLost": "hold" } diff --git a/src/config.rs b/src/config.rs index e8701ae..813b84d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,6 +22,7 @@ pub fn get_actual_config() -> Option { // let mut local = load_processes(&CONFIG_PATH); match load_processes(CONFIG_PATH) { Some(local_conf) => { + info!("Found local configuration, version - {}", &local_conf.date_of_creation); if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") { return match config_comparing(&local_conf, &remote_conf) { ConfigActuality::Local => { @@ -75,7 +76,10 @@ fn once_get_remote_configuration(serv_info: &str) -> Option { return None; } match parse_extern_config(&config[0].1[0].1) { - Some(prcs) => Some(prcs), + Some(prcs) => { + info!("Config {} was pulled from Redis-Server", &prcs.date_of_creation); + Some(prcs) + }, None => { error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); None @@ -151,20 +155,26 @@ fn get_stream_info_watcher(conn: &mut Connection) { } fn invalid_config_watcher(conn: &mut Connection) -> Processes { loop { - let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1); - if res.is_ok() { - let config = &res.unwrap()[0]; - if !config.is_empty() { - if let Some(conf) = parse_extern_config(&config[0].1[0].1) { - return conf; - } - } + if let Some(prcs) = get_remote_config(conn) { + info!("Got new config from Redis-Server, version - {}", &prcs.date_of_creation); + return prcs; } error!("Got INVALID configuration. Update config! Retrying..."); std::thread::sleep(Duration::from_secs(4)); } } +fn get_remote_config(conn: &mut Connection) -> Option { + let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1); + if res.is_ok() { + let config = &res.unwrap()[0]; + if !config.is_empty() { + return parse_extern_config(&config[0].1[0].1); + } + } + None +} + // ! end of watchers fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { diff --git a/src/files.rs b/src/files.rs index 88c5331..dfaae9e 100644 --- a/src/files.rs +++ b/src/files.rs @@ -33,6 +33,7 @@ pub async fn file_handler( } match file.triggers.on_delete.as_str() { "stay" => { + tx.send(9).await.unwrap(); continue; } "stop" => { diff --git a/src/main.rs b/src/main.rs index 21e8c72..a50b2ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,7 @@ async fn main() { ); log::info!("Runner is ready. Initializing..."); - if processes.processes.len() == 0 { + if processes.processes.is_empty() { error!("Processes list is null, runner-rs initialization is stopped"); return; } diff --git a/src/services.rs b/src/services.rs index b420ad7..e13d610 100644 --- a/src/services.rs +++ b/src/services.rs @@ -31,9 +31,9 @@ pub async fn service_handler( } } "hold" => { - if is_frozen(name).await { - return Err(CustomError::Fatal); - } + // if is_frozen(name).await { + // return Err(CustomError::Fatal); + // } if looped_service_connecting(name, serv).await.is_err() { tx.send(6).await.unwrap(); tokio::time::sleep(Duration::from_millis(400)).await; @@ -53,6 +53,7 @@ pub async fn service_handler( } async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { + let mut counter = 0; if serv.triggers.wait == 0 { loop { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; @@ -79,8 +80,9 @@ async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), Cu let start = Instant::now(); while start.elapsed().as_secs() < serv.triggers.wait.into() { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; + counter += 1; warn!( - "Attempting to connect from {} process to {}:{}", + "{counter} Attempting to connect from {} process to {}:{}", &name, &serv.hostname, &serv.port ); match check_service(&serv.hostname, &serv.port).await { diff --git a/src/utils.rs b/src/utils.rs index c68e6e2..9da20f0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -95,7 +95,7 @@ pub async fn run_daemons( }, // // 9 - File-dependency change -> staying (after check) 9 => { - warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name); + warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); }, // 10 - Process unfreaze call via file handler