Compare commits

..

No commits in common. "621f2d46b7f5bb786172d80ed4ae461ec0d7123a" and "8c71453f2930b1589fb0b930e8194e89176de0ea" have entirely different histories.

7 changed files with 25 additions and 93 deletions

View File

@ -3,9 +3,6 @@ name = "runner-rs"
version = "0.5.5" version = "0.5.5"
edition = "2021" edition = "2021"
[profile.dev]
debug = true
[dependencies] [dependencies]
chrono = "0.4.38" chrono = "0.4.38"
env_logger = "0.11.3" env_logger = "0.11.3"

View File

@ -1,5 +1,5 @@
{ {
"dateOfCreation": "1721381809101", "dateOfCreation": "1721381809090",
"processes": [ "processes": [
{ {
"name": "web-server", "name": "web-server",
@ -55,7 +55,7 @@
"hostname": "google.com", "hostname": "google.com",
"port": 443, "port": 443,
"triggers": { "triggers": {
"wait": 5, "wait": 14,
"delay": 1, "delay": 1,
"onLost": "stop" "onLost": "stop"
} }
@ -64,7 +64,7 @@
"hostname": "localhost", "hostname": "localhost",
"port": 8080, "port": 8080,
"triggers": { "triggers": {
"wait": 6, "wait": 10,
"delay": 2, "delay": 2,
"onLost": "hold" "onLost": "hold"
} }

View File

@ -1,10 +1,7 @@
use crate::structs::*; use crate::structs::*;
use log::{error, info, warn}; use log::{error, info, warn};
use redis::{Client, Commands, Connection, RedisResult}; use redis::{Client, Commands, Connection, RedisResult};
use std::{env, fs}; use std::fs;
use std::os::unix::process::CommandExt;
use std::process::Command;
use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
static CONFIG_PATH: &str = "settings.json"; static CONFIG_PATH: &str = "settings.json";
@ -25,7 +22,6 @@ pub fn get_actual_config() -> Option<Processes> {
// let mut local = load_processes(&CONFIG_PATH); // let mut local = load_processes(&CONFIG_PATH);
match load_processes(CONFIG_PATH) { match load_processes(CONFIG_PATH) {
Some(local_conf) => { Some(local_conf) => {
info!("Found local configuration, version - {}", &local_conf.date_of_creation);
if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") { if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") {
return match config_comparing(&local_conf, &remote_conf) { return match config_comparing(&local_conf, &remote_conf) {
ConfigActuality::Local => { ConfigActuality::Local => {
@ -79,10 +75,7 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
return None; return None;
} }
match parse_extern_config(&config[0].1[0].1) { match parse_extern_config(&config[0].1[0].1) {
Some(prcs) => { Some(prcs) => Some(prcs),
info!("Config {} was pulled from Redis-Server", &prcs.date_of_creation);
Some(prcs)
},
None => { None => {
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!"); error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
None None
@ -158,9 +151,14 @@ fn get_stream_info_watcher(conn: &mut Connection) {
} }
fn invalid_config_watcher(conn: &mut Connection) -> Processes { fn invalid_config_watcher(conn: &mut Connection) -> Processes {
loop { loop {
if let Some(prcs) = get_remote_config(conn) { let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
info!("Got new config from Redis-Server, version - {}", &prcs.date_of_creation); if res.is_ok() {
return prcs; let config = &res.unwrap()[0];
if !config.is_empty() {
if let Some(conf) = parse_extern_config(&config[0].1[0].1) {
return conf;
}
}
} }
error!("Got INVALID configuration. Update config! Retrying..."); error!("Got INVALID configuration. Update config! Retrying...");
std::thread::sleep(Duration::from_secs(4)); std::thread::sleep(Duration::from_secs(4));
@ -169,53 +167,6 @@ fn invalid_config_watcher(conn: &mut Connection) -> Processes {
// ! end of watchers // ! end of watchers
fn get_remote_config(conn: &mut Connection) -> Option<Processes> {
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
if res.is_ok() {
let config = &res.unwrap()[0];
if !config.is_empty() {
return parse_extern_config(&config[0].1[0].1);
}
}
None
}
fn restart_main_thread() -> std::io::Result<()>{
let current_exe = env::current_exe()?;
Command::new(current_exe)
.exec();
Ok(())
}
pub async fn subscribe_config_stream(actual_prcs: Arc<Processes>) -> Result<(), CustomError> {
if let Ok(client) = Client::open("redis://localhost") {
if let Ok(mut conn) = client.get_connection() {
info!("Runner subscribed on config update");
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
if let Some(prcs) = get_remote_config(&mut conn) {
match config_comparing(&actual_prcs, &prcs) {
ConfigActuality::Remote => {
info!("New config was pulled. Saving and restarting...");
if save_new_config(&prcs, CONFIG_PATH).is_err() {
error!("Error with saving new config to {}", &CONFIG_PATH);
return Err(CustomError::Fatal)
}
if restart_main_thread().is_err() {
error!("Error with restarting Runner. Stopping sub mechanism...");
return Err(CustomError::Fatal)
}
},
_ => continue,
}
return Ok(());
}
}
}
}
error!("Error with subscribing Redis stream on update. Working only with selected config...");
Err(CustomError::Fatal)
}
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality { fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
let local_date: u64 = local.date_of_creation.parse().unwrap(); let local_date: u64 = local.date_of_creation.parse().unwrap();
let remote_date: u64 = remote.date_of_creation.parse().unwrap(); let remote_date: u64 = remote.date_of_creation.parse().unwrap();

View File

@ -33,7 +33,6 @@ pub async fn file_handler(
} }
match file.triggers.on_delete.as_str() { match file.triggers.on_delete.as_str() {
"stay" => { "stay" => {
tx.send(9).await.unwrap();
continue; continue;
} }
"stop" => { "stop" => {

View File

@ -12,7 +12,6 @@ use log::{error, info};
use logger::setup_logger; use logger::setup_logger;
use signals::set_valid_destructor; use signals::set_valid_destructor;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use structs::*; use structs::*;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::*; use utils::*;
@ -21,7 +20,7 @@ use utils::*;
async fn main() { async fn main() {
let _ = setup_logger(); let _ = setup_logger();
info!("Runner is configurating..."); log::info!("Runner is configurating...");
// setting up redis connection \ // setting up redis connection \
// then conf checks to choose the most actual \ // then conf checks to choose the most actual \
@ -30,13 +29,13 @@ async fn main() {
std::process::exit(101); std::process::exit(101);
}); });
info!( log::info!(
"Current runner configuration: {}", "Current runner configuration: {}",
&processes.date_of_creation &processes.date_of_creation
); );
info!("Runner is ready. Initializing..."); log::info!("Runner is ready. Initializing...");
if processes.processes.is_empty() { if processes.processes.len() == 0 {
error!("Processes list is null, runner-rs initialization is stopped"); error!("Processes list is null, runner-rs initialization is stopped");
return; return;
} }
@ -45,7 +44,7 @@ async fn main() {
let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![]; let mut senders: Vec<Arc<mpsc::Sender<u8>>> = vec![];
for proc in processes.processes.iter() { for proc in processes.processes.iter() {
info!( log::info!(
"Process '{}' on stage: {}. Depends on {} file(s), {} service(s)", "Process '{}' on stage: {}. Depends on {} file(s), {} service(s)",
proc.name, proc.name,
proc.path, proc.path,
@ -66,28 +65,16 @@ async fn main() {
}); });
handler.push(event); handler.push(event);
} }
// destructor addition // destructor addition
handler.push(tokio::spawn(async move { handler.push(tokio::spawn(async move {
if let Err(_) = set_valid_destructor(Arc::new(senders)).await { if let Err(_) = set_valid_destructor(Arc::new(senders)).await {
error!("Linux signals handler creation failed. Terminating main thread..."); error!("Linux signals handler creation failed. Returning...");
return;
}
// todo: rework this temp construction, use async/await in signals mod
tokio::time::sleep(Duration::from_millis(200)).await;
info!("End of job. Terminating main thread...");
std::process::exit(0);
}));
// remote config update subscription
handler.push(tokio::spawn(async move {
if let Err(_) = subscribe_config_stream(Arc::new(processes)).await {
return; return;
} }
})); }));
for i in handler { for i in handler {
i.await.unwrap(); i.await.unwrap();
} }
info!("End of job. Terminating main thread...");
return; return;
} }

View File

@ -31,9 +31,9 @@ pub async fn service_handler(
} }
} }
"hold" => { "hold" => {
// if is_frozen(name).await { if is_frozen(name).await {
// return Err(CustomError::Fatal); return Err(CustomError::Fatal);
// } }
if looped_service_connecting(name, serv).await.is_err() { if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap(); tx.send(6).await.unwrap();
tokio::time::sleep(Duration::from_millis(400)).await; tokio::time::sleep(Duration::from_millis(400)).await;
@ -53,7 +53,6 @@ pub async fn service_handler(
} }
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> { async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
let mut counter = 0;
if serv.triggers.wait == 0 { if serv.triggers.wait == 0 {
loop { loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
@ -80,9 +79,8 @@ async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), Cu
let start = Instant::now(); let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() { while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
counter += 1;
warn!( warn!(
"{counter} Attempting to connect from {} process to {}:{}", "Attempting to connect from {} process to {}:{}",
&name, &serv.hostname, &serv.port &name, &serv.hostname, &serv.port
); );
match check_service(&serv.hostname, &serv.port).await { match check_service(&serv.hostname, &serv.port).await {

View File

@ -95,7 +95,7 @@ pub async fn run_daemons(
}, },
// // 9 - File-dependency change -> staying (after check) // // 9 - File-dependency change -> staying (after check)
9 => { 9 => {
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name); warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name);
}, },
// 10 - Process unfreaze call via file handler // 10 - Process unfreaze call via file handler