remote conf update sub + reworked signals handling (need to fix)

pull/9/head
prplV 2024-09-16 15:32:50 +03:00
parent 16c08fd8b4
commit 621f2d46b7
3 changed files with 61 additions and 9 deletions

View File

@ -1,5 +1,5 @@
{ {
"dateOfCreation": "1721381809090", "dateOfCreation": "1721381809101",
"processes": [ "processes": [
{ {
"name": "web-server", "name": "web-server",

View File

@ -1,7 +1,10 @@
use crate::structs::*; use crate::structs::*;
use log::{error, info, warn}; use log::{error, info, warn};
use redis::{Client, Commands, Connection, RedisResult}; use redis::{Client, Commands, Connection, RedisResult};
use std::fs; use std::{env, fs};
use std::os::unix::process::CommandExt;
use std::process::Command;
use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
static CONFIG_PATH: &str = "settings.json"; static CONFIG_PATH: &str = "settings.json";
@ -164,6 +167,8 @@ fn invalid_config_watcher(conn: &mut Connection) -> Processes {
} }
} }
// ! end of watchers
fn get_remote_config(conn: &mut Connection) -> Option<Processes> { fn get_remote_config(conn: &mut Connection) -> Option<Processes> {
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1); let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
if res.is_ok() { if res.is_ok() {
@ -175,7 +180,41 @@ fn get_remote_config(conn: &mut Connection) -> Option<Processes> {
None None
} }
// ! end of watchers fn restart_main_thread() -> std::io::Result<()>{
let current_exe = env::current_exe()?;
Command::new(current_exe)
.exec();
Ok(())
}
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> {
if let Ok(client) = Client::open("redis://localhost") {
if let Ok(mut conn) = client.get_connection() {
info!("Runner subscribed on config update");
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
if let Some(prcs) = get_remote_config(&mut conn) {
match config_comparing(&actual_prcs, &prcs) {
ConfigActuality::Remote => {
info!("New config was pulled. Saving and restarting...");
if save_new_config(&prcs, CONFIG_PATH).is_err() {
error!("Error with saving new config to {}", &CONFIG_PATH);
return Err(CustomError::Fatal)
}
if restart_main_thread().is_err() {
error!("Error with restarting Runner. Stopping sub mechanism...");
return Err(CustomError::Fatal)
}
},
_ => continue,
}
return Ok(());
}
}
}
}
error!("Error with subscribing Redis stream on update. Working only with selected config...");
Err(CustomError::Fatal)
}
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
let local_date: u64 = local.date_of_creation.parse().unwrap(); let local_date: u64 = local.date_of_creation.parse().unwrap();

View File

@ -12,6 +12,7 @@ use log::{error, info};
use logger::setup_logger; use logger::setup_logger;
use signals::set_valid_destructor; use signals::set_valid_destructor;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use structs::*; use structs::*;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::*; use utils::*;
@ -20,7 +21,7 @@ use utils::*;
async fn main() { async fn main() {
let _ = setup_logger(); let _ = setup_logger();
log::info!("Runner is configurating..."); info!("Runner is configurating...");
// setting up redis connection \ // setting up redis connection \
// then conf checks to choose the most actual \ // then conf checks to choose the most actual \
@ -29,11 +30,11 @@ async fn main() {
std::process::exit(101); std::process::exit(101);
}); });
log::info!( info!(
"Current runner configuration: {}", "Current runner configuration: {}",
&processes.date_of_creation &processes.date_of_creation
); );
log::info!("Runner is ready. Initializing..."); info!("Runner is ready. Initializing...");
if processes.processes.is_empty() { if processes.processes.is_empty() {
error!("Processes list is null, runner-rs initialization is stopped"); error!("Processes list is null, runner-rs initialization is stopped");
@ -44,7 +45,7 @@ async fn main() {
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![]; let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
for proc in processes.processes.iter() { for proc in processes.processes.iter() {
log::info!( info!(
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
proc.name, proc.name,
proc.path, proc.path,
@ -65,16 +66,28 @@ async fn main() {
}); });
handler.push(event); handler.push(event);
} }
// destructor addition // destructor addition
handler.push(tokio::spawn(async move { handler.push(tokio::spawn(async move {
if let Err(_) = set_valid_destructor(Arc::new(senders)).await { if let Err(_) = set_valid_destructor(Arc::new(senders)).await {
error!("Linux signals handler creation failed. Returning..."); error!("Linux signals handler creation failed. Terminating main thread...");
return;
}
// todo: rework this temp construction, use async/await in signals mod
tokio::time::sleep(Duration::from_millis(200)).await;
info!("End of job. Terminating main thread...");
std::process::exit(0);
}));
// remote config update subscription
handler.push(tokio::spawn(async move {
if let Err(_) = subscribe_config_stream(Arc::new(processes)).await {
return; return;
} }
})); }));
for i in handler { for i in handler {
i.await.unwrap(); i.await.unwrap();
} }
info!("End of job. Terminating main thread...");
return; return;
} }