refactor + getting out redis conf pulling mech
parent
8c71453f29
commit
16c08fd8b4
|
|
@ -3,6 +3,9 @@ name = "runner-rs"
|
|||
version = "0.5.5"
|
||||
edition = "2021"
|
||||
|
||||
[profile.dev]
|
||||
debug = true
|
||||
|
||||
[dependencies]
|
||||
chrono = "0.4.38"
|
||||
env_logger = "0.11.3"
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@
|
|||
"hostname": "google.com",
|
||||
"port": 443,
|
||||
"triggers": {
|
||||
"wait": 14,
|
||||
"wait": 5,
|
||||
"delay": 1,
|
||||
"onLost": "stop"
|
||||
}
|
||||
|
|
@ -64,7 +64,7 @@
|
|||
"hostname": "localhost",
|
||||
"port": 8080,
|
||||
"triggers": {
|
||||
"wait": 10,
|
||||
"wait": 6,
|
||||
"delay": 2,
|
||||
"onLost": "hold"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ pub fn get_actual_config() -> Option<Processes> {
|
|||
// let mut local = load_processes(&CONFIG_PATH);
|
||||
match load_processes(CONFIG_PATH) {
|
||||
Some(local_conf) => {
|
||||
info!("Found local configuration, version - {}", &local_conf.date_of_creation);
|
||||
if let Some(remote_conf) = once_get_remote_configuration("redis://localhost") {
|
||||
return match config_comparing(&local_conf, &remote_conf) {
|
||||
ConfigActuality::Local => {
|
||||
|
|
@ -75,7 +76,10 @@ fn once_get_remote_configuration(serv_info: &str) -> Option<Processes> {
|
|||
return None;
|
||||
}
|
||||
match parse_extern_config(&config[0].1[0].1) {
|
||||
Some(prcs) => Some(prcs),
|
||||
Some(prcs) => {
|
||||
info!("Config {} was pulled from Redis-Server", &prcs.date_of_creation);
|
||||
Some(prcs)
|
||||
},
|
||||
None => {
|
||||
error!("Invalid configuration was pulled (PARSING_ERROR). Check configs state!");
|
||||
None
|
||||
|
|
@ -151,20 +155,26 @@ fn get_stream_info_watcher(conn: &mut Connection) {
|
|||
}
|
||||
fn invalid_config_watcher(conn: &mut Connection) -> Processes {
|
||||
loop {
|
||||
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
|
||||
if res.is_ok() {
|
||||
let config = &res.unwrap()[0];
|
||||
if !config.is_empty() {
|
||||
if let Some(conf) = parse_extern_config(&config[0].1[0].1) {
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
if let Some(prcs) = get_remote_config(conn) {
|
||||
info!("Got new config from Redis-Server, version - {}", &prcs.date_of_creation);
|
||||
return prcs;
|
||||
}
|
||||
error!("Got INVALID configuration. Update config! Retrying...");
|
||||
std::thread::sleep(Duration::from_secs(4));
|
||||
}
|
||||
}
|
||||
|
||||
fn get_remote_config(conn: &mut Connection) -> Option<Processes> {
|
||||
let res: Res = conn.xrevrange_count("config_stream", "+", "-", 1);
|
||||
if res.is_ok() {
|
||||
let config = &res.unwrap()[0];
|
||||
if !config.is_empty() {
|
||||
return parse_extern_config(&config[0].1[0].1);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
// ! end of watchers
|
||||
|
||||
fn config_comparing(local: &Processes, remote: &Processes) -> ConfigActuality {
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ pub async fn file_handler(
|
|||
}
|
||||
match file.triggers.on_delete.as_str() {
|
||||
"stay" => {
|
||||
tx.send(9).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
"stop" => {
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ async fn main() {
|
|||
);
|
||||
log::info!("Runner is ready. Initializing...");
|
||||
|
||||
if processes.processes.len() == 0 {
|
||||
if processes.processes.is_empty() {
|
||||
error!("Processes list is null, runner-rs initialization is stopped");
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,9 +31,9 @@ pub async fn service_handler(
|
|||
}
|
||||
}
|
||||
"hold" => {
|
||||
if is_frozen(name).await {
|
||||
return Err(CustomError::Fatal);
|
||||
}
|
||||
// if is_frozen(name).await {
|
||||
// return Err(CustomError::Fatal);
|
||||
// }
|
||||
if looped_service_connecting(name, serv).await.is_err() {
|
||||
tx.send(6).await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(400)).await;
|
||||
|
|
@ -53,6 +53,7 @@ pub async fn service_handler(
|
|||
}
|
||||
|
||||
async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), CustomError> {
|
||||
let mut counter = 0;
|
||||
if serv.triggers.wait == 0 {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
|
|
@ -79,8 +80,9 @@ async fn looped_service_connecting(name: &str, serv: &Services) -> Result<(), Cu
|
|||
let start = Instant::now();
|
||||
while start.elapsed().as_secs() < serv.triggers.wait.into() {
|
||||
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
|
||||
counter += 1;
|
||||
warn!(
|
||||
"Attempting to connect from {} process to {}:{}",
|
||||
"{counter} Attempting to connect from {} process to {}:{}",
|
||||
&name, &serv.hostname, &serv.port
|
||||
);
|
||||
match check_service(&serv.hostname, &serv.port).await {
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ pub async fn run_daemons(
|
|||
},
|
||||
// // 9 - File-dependency change -> staying (after check)
|
||||
9 => {
|
||||
warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name);
|
||||
warn!("File-dependency warning (file changed). Ignoring event on {} process...", &proc.name);
|
||||
},
|
||||
|
||||
// 10 - Process unfreaze call via file handler
|
||||
|
|
|
|||
Loading…
Reference in New Issue