From 621f2d46b7f5bb786172d80ed4ae461ec0d7123a Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 16 Sep 2024 15:32:50 +0300 Subject: [PATCH] remote conf update sub + reworked signals handling (need to fix) --- settings.json | 2 +- src/config.rs | 43 +++++++++++++++++++++++++++++++++++++++++-- src/main.rs | 25 +++++++++++++++++++------ 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/settings.json b/settings.json index 1666ac9..8b4d4b9 100644 --- a/settings.json +++ b/settings.json @@ -1,5 +1,5 @@ { - "dateOfCreation": "1721381809090", + "dateOfCreation": "1721381809101", "processes": [ { "name": "web-server", diff --git a/src/config.rs b/src/config.rs index 813b84d..efb8f3f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,10 @@ use crate::structs::*; use log::{error, info, warn}; use redis::{Client, Commands, Connection, RedisResult}; -use std::fs; +use std::{env, fs}; +use std::os::unix::process::CommandExt; +use std::process::Command; +use std::sync::Arc; use tokio::time::Duration; static CONFIG_PATH: &str = "settings.json"; @@ -164,6 +167,8 @@ fn invalid_config_watcher(conn: &mut Connection) -> Processes { } } +// ! end of watchers + fn get_remote_config(conn: &mut Connection) -> Option { let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1); if res.is_ok() { @@ -175,7 +180,41 @@ fn get_remote_config(conn: &mut Connection) -> Option { None } -// ! end of watchers +fn restart_main_thread() -> std::io::Result<()>{ + let current_exe = env::current_exe()?; + Command::new(current_exe) + .exec(); + Ok(()) +} +pub async fn subscribe_config_stream(actual_prcs: Arc) -> Result<(), CustomError> { + if let Ok(client) = Client::open("redis://localhost") { + if let Ok(mut conn) = client.get_connection() { + info!("Runner subscribed on config update"); + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + if let Some(prcs) = get_remote_config(&mut conn) { + match config_comparing(&actual_prcs, &prcs) { + ConfigActuality::Remote => { + info!("New config was pulled. Saving and restarting..."); + if save_new_config(&prcs, CONFIG_PATH).is_err() { + error!("Error with saving new config to {}", &CONFIG_PATH); + return Err(CustomError::Fatal) + } + if restart_main_thread().is_err() { + error!("Error with restarting Runner. Stopping sub mechanism..."); + return Err(CustomError::Fatal) + } + }, + _ => continue, + } + return Ok(()); + } + } + } + } + error!("Error with subscribing Redis stream on update. Working only with selected config..."); + Err(CustomError::Fatal) +} fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { let local_date: u64 = local.date_of_creation.parse().unwrap(); diff --git a/src/main.rs b/src/main.rs index a50b2ab..6c79294 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use log::{error, info}; use logger::setup_logger; use signals::set_valid_destructor; use std::sync::Arc; +use std::time::Duration; use structs::*; use tokio::sync::mpsc; use utils::*; @@ -20,7 +21,7 @@ use utils::*; async fn main() { let _ = setup_logger(); - log::info!("Runner is configurating..."); + info!("Runner is configurating..."); // setting up redis connection \ // then conf checks to choose the most actual \ @@ -29,11 +30,11 @@ async fn main() { std::process::exit(101); }); - log::info!( + info!( "Current runner configuration: {}", &processes.date_of_creation ); - log::info!("Runner is ready. Initializing..."); + info!("Runner is ready. Initializing..."); if processes.processes.is_empty() { error!("Processes list is null, runner-rs initialization is stopped"); @@ -44,7 +45,7 @@ async fn main() { let mut senders: Vec>> = vec![]; for proc in processes.processes.iter() { - log::info!( + info!( "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", proc.name, proc.path, @@ -65,16 +66,28 @@ async fn main() { }); handler.push(event); } + // destructor addition handler.push(tokio::spawn(async move { if let Err(_) = set_valid_destructor(Arc::new(senders)).await { - error!("Linux signals handler creation failed. Returning..."); + error!("Linux signals handler creation failed. Terminating main thread..."); + return; + } + // todo: rework this temp construction, use async/await in signals mod + tokio::time::sleep(Duration::from_millis(200)).await; + info!("End of job. Terminating main thread..."); + std::process::exit(0); + })); + + // remote config update subscription + handler.push(tokio::spawn(async move { + if let Err(_) = subscribe_config_stream(Arc::new(processes)).await { return; } })); + for i in handler { i.await.unwrap(); } - info!("End of job. Terminating main thread..."); return; }