From 928c13ae11d70891e8940f1b34de9accc45b8aed Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 8 Jul 2024 04:49:31 -0400 Subject: [PATCH 1/4] 123 --- src/main.rs | 63 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/src/main.rs b/src/main.rs index ac9a455..08f7bbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,7 +66,7 @@ struct FIleTriggers { on_change : String, } -#[tokio::main] +#[tokio::main(flavor = "multi_thread")] async fn main() { let processes = load_processes("settings.json"); // let mut error_counter = 0; @@ -247,11 +247,13 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { } // check process status daemon async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ - println!("running daemon on {}", name); + // println!("running daemon on {}", name); let _ = Command::new("pidof") .arg(name) .output() .expect("Failed to execute command 'pidof'"); + // services and files check (once) + // if inactive -> if !is_active(name) && !is_frozen(name){ if start_process(name, path).await.is_err() { @@ -264,7 +266,7 @@ async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { loop { - println!("file daemon on {}", name); + // println!("file daemon on {}", name); if is_active(name) && !is_frozen(name){ for file in files { if check_file(&file.filename, &file.src).is_err() { @@ -297,6 +299,7 @@ async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { // } } tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; } } fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { @@ -311,23 +314,22 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { // ?? async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) { - loop { - println!("service daemon on {}", name); + // println!("service daemon on {}", name); if is_active(name) && !is_frozen(name) { for serv in services { - if check_service(&serv.hostname, &serv.port).is_err() { + if check_service(&serv.hostname, &serv.port).await.is_err() { // println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); match serv.triggers.on_lost.as_str() { "stay" => { }, "stop" => { - if looped_service_connecting(&name, serv).await.is_err() { + if looped_service_connecting(name, serv).await.is_err() { tx.send(5).await.unwrap(); return; } }, "hold" => { - if looped_service_connecting(&name, serv).await.is_err() { + if looped_service_connecting(name, serv).await.is_err() { tx.send(6).await.unwrap(); } }, @@ -343,7 +345,9 @@ async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender< // } } tokio::time::sleep(Duration::from_millis(100)).await; - } + tokio::task::yield_now().await; + + } async fn looped_service_connecting( @@ -351,11 +355,12 @@ async fn looped_service_connecting( serv: &Services ) -> Result<(), CustomError> { + if serv.triggers.wait == 0 { loop { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); - match check_service(&serv.hostname, &serv.port) { + match check_service(&serv.hostname, &serv.port).await { Ok(_) => { break; }, @@ -370,7 +375,7 @@ async fn looped_service_connecting( while start.elapsed().as_secs() < serv.triggers.wait.into() { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); - match check_service(&serv.hostname, &serv.port) { + match check_service(&serv.hostname, &serv.port).await { Ok(_) => { return Ok(()); }, @@ -382,20 +387,26 @@ async fn looped_service_connecting( return Err(CustomError::Fatal); } } -fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { - let mut command = Command::new("bash"); - command.args(["service-checker.sh", host, &port.to_string()]); +async fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { + let host = host.to_string(); + let port = *port; + + let result = tokio::task::spawn_blocking(move || { + let mut command = Command::new("bash"); + command.args(["service-checker.sh", &host, &port.to_string()]); - match command.output() { - Ok(output) => { - if output.status.success() { - return Ok(()); - } else { - return Err(CustomError::Fatal); - } - }, - Err(_) => { - return Err(CustomError::Fatal); - }, - }; + match command.output() { + Ok(output) => { + if output.status.success() { + Ok(()) + } else { + Err(CustomError::Fatal) + } + }, + Err(_) => { + Err(CustomError::Fatal) + }, + } + }).await; + result.unwrap_or(Err(CustomError::Fatal)) } \ No newline at end of file From 759ff645320732ff0fd682826425d3532f64588c Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 8 Jul 2024 05:01:03 -0400 Subject: [PATCH 2/4] fixed blocking bug --- settings.json | 4 ++-- src/main.rs | 37 +++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/settings.json b/settings.json index 86ff13d..8d1b56d 100644 --- a/settings.json +++ b/settings.json @@ -57,8 +57,8 @@ "hostname" : "localhost", "port" : 8080, "triggers" : { - "wait" : 20, - "delay" : 5, + "wait" : 10, + "delay" : 2, "onLost" : "stop" } }] diff --git a/src/main.rs b/src/main.rs index 08f7bbd..4f940f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use tokio::io::Join; +use tokio::join; use std::env::join_paths; use std::fmt::Debug; use std::fs; @@ -108,14 +109,14 @@ async fn run_daemons( ) { loop { - let run_hand = running_handler(&proc.name, &proc.path, tx.clone()); - let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone()); - let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone()); + let run_hand = running_handler(&proc, tx.clone()); + // let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone()); + // let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone()); tokio::select! { _ = run_hand => {}, - _ = file_hand => {}, - _ = serv_hand => {}, + // _ = file_hand => {}, + // _ = serv_hand => {}, _val = rx.recv() => { match _val.unwrap() { // 1 - File-dependency handling error -> terminating (after waiting) @@ -246,26 +247,39 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { } } // check process status daemon -async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ +async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender){ // println!("running daemon on {}", name); + loop { let _ = Command::new("pidof") - .arg(name) + .arg(&prc.name) .output() .expect("Failed to execute command 'pidof'"); // services and files check (once) - // if inactive -> + let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); + let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); + + join!(files_check, services_check); - if !is_active(name) && !is_frozen(name){ - if start_process(name, path).await.is_err() { + // if inactive -> spawn checks -> active is true + + if !is_active(&prc.name){ + if start_process(&prc.name, &prc.path).await.is_err() { tx.send(3).await.unwrap(); // return; } } + + // if frozen -> spawn checks -> unfreeze is true + + else if is_frozen(&prc.name){ + + } tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; + } } async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { - loop { // println!("file daemon on {}", name); if is_active(name) && !is_frozen(name){ for file in files { @@ -300,7 +314,6 @@ async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { } tokio::time::sleep(Duration::from_millis(100)).await; tokio::task::yield_now().await; - } } fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let fileconcat = format!("{}{}", path, filename); From 73daa8320b4aced24760f2f5b2cc52bf891ed3ed Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 8 Jul 2024 05:41:17 -0400 Subject: [PATCH 3/4] all fixed but... --- settings.json | 2 +- src/main.rs | 106 +++++++++++++++++++++++++++----------------------- 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/settings.json b/settings.json index 8d1b56d..b877d66 100644 --- a/settings.json +++ b/settings.json @@ -59,7 +59,7 @@ "triggers" : { "wait" : 10, "delay" : 2, - "onLost" : "stop" + "onLost" : "hold" } }] } diff --git a/src/main.rs b/src/main.rs index 4f940f8..3a2984a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -121,13 +121,18 @@ async fn run_daemons( match _val.unwrap() { // 1 - File-dependency handling error -> terminating (after waiting) 1 => { - println!("Dependency handling error: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - break; + if is_active(&proc.name) { + println!("Dependency handling error: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + } + // break; }, // 2 - File-dependency handling error -> holding (after waiting) 2 => { - println!("Error due to starting {} process", &proc.name); + if !is_frozen(&proc.name) { + println!("Dependency handling error: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + } }, // 3 - Running process error 3 => { @@ -139,14 +144,18 @@ async fn run_daemons( }, // 5 - Timeout of waiting service-dependency -> terminating (after waiting) 5 => { - println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - break; + if is_active(&proc.name) { + println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + } + // break; }, // 6 - Timeout of waiting service-dependency -> holding (after waiting) 6 => { - println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; + if !is_frozen(&proc.name) { + println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + } }, // // 7 - File-dependency change -> terminating (after check) // 7 => {}, @@ -157,13 +166,17 @@ async fn run_daemons( // 10 - Process unfreaze call via file handler 10 => { - println!("Unfreezing process {} call via file handler...", &proc.name); - unfreeze_process(&proc.name).await; + if is_frozen(&proc.name) { + println!("Unfreezing process {} call...", &proc.name); + unfreeze_process(&proc.name).await; + } }, // 11 - Process unfreaze call via service handler 11 => { - println!("Unfreezing process {} call via service handler...", &proc.name); - unfreeze_process(&proc.name).await; + if is_frozen(&proc.name) { + println!("Unfreezing process {} call...", &proc.name); + unfreeze_process(&proc.name).await; + } }, // 101 - Impermissible trigger values in JSON 101 => { @@ -248,8 +261,8 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { } // check process status daemon async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender){ - // println!("running daemon on {}", name); - loop { + // println!("running daemon on {}", prc.name); + // loop { let _ = Command::new("pidof") .arg(&prc.name) .output() @@ -257,31 +270,27 @@ async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender){ // services and files check (once) let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - - join!(files_check, services_check); + let res = join!(files_check, services_check); // if inactive -> spawn checks -> active is true - - if !is_active(&prc.name){ + if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){ if start_process(&prc.name, &prc.path).await.is_err() { tx.send(3).await.unwrap(); - // return; + return; } } - // if frozen -> spawn checks -> unfreeze is true - - else if is_frozen(&prc.name){ - + else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){ + tx.send(10).await.unwrap(); + return; } tokio::time::sleep(Duration::from_millis(100)).await; tokio::task::yield_now().await; - } + // } } -async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { +async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) -> Result<(), CustomError>{ // println!("file daemon on {}", name); - if is_active(name) && !is_frozen(name){ for file in files { if check_file(&file.filename, &file.src).is_err() { match file.triggers.on_delete.as_str() { @@ -289,31 +298,30 @@ async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { continue; }, "stop" => { - tx.send(1).await.unwrap(); - return; + if is_active(name) { + tx.send(1).await.unwrap(); + } + tokio::time::sleep(Duration::from_millis(50)).await; + return Err(CustomError::Fatal); }, "hold" => { - tx.send(2).await.unwrap(); - // continue; - // while check_file(&file.filename, &file.src).is_err() { - // println!("zov"); - // tokio::time::sleep(Duration::from_millis(100)).await; - // } - // tx.send(10).await.unwrap(); + if is_active(name) { + tokio::time::sleep(Duration::from_millis(50)).await; + tx.send(2).await.unwrap(); + return Err(CustomError::Fatal); + } }, _ => { + tokio::time::sleep(Duration::from_millis(50)).await; tx.send(101).await.unwrap(); - return; + return Err(CustomError::Fatal); }, } } } - // if is_frozen(name) { - // tx.send(10).await.unwrap(); - // } - } tokio::time::sleep(Duration::from_millis(100)).await; tokio::task::yield_now().await; + Ok(()) } fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let fileconcat = format!("{}{}", path, filename); @@ -326,9 +334,8 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { } // ?? -async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) { +async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) -> Result<(), CustomError> { // println!("service daemon on {}", name); - if is_active(name) && !is_frozen(name) { for serv in services { if check_service(&serv.hostname, &serv.port).await.is_err() { // println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); @@ -338,27 +345,28 @@ async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender< "stop" => { if looped_service_connecting(name, serv).await.is_err() { tx.send(5).await.unwrap(); - return; + return Err(CustomError::Fatal); } }, "hold" => { + if is_frozen(name) { + return Err(CustomError::Fatal); + } if looped_service_connecting(name, serv).await.is_err() { tx.send(6).await.unwrap(); + return Err(CustomError::Fatal); } }, _ => { tx.send(101).await.unwrap(); - return; + return Err(CustomError::Fatal); }, } } } - // if is_frozen(name) { - // tx.send(11).await.unwrap(); - // } - } tokio::time::sleep(Duration::from_millis(100)).await; tokio::task::yield_now().await; + Ok(()) } From f9cb2c7cf8a8e58a1b063b0de0e4594d4d1d6d38 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 8 Jul 2024 06:17:48 -0400 Subject: [PATCH 4/4] fixed delay (almost) + repeating output --- settings.json | 2 +- src/main.rs | 60 +++++++++++++++++++++++++++------------------------ 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/settings.json b/settings.json index b877d66..4c3d125 100644 --- a/settings.json +++ b/settings.json @@ -41,7 +41,7 @@ "filename" : "control-file", "src" : "/home/vladislav/web/", "triggers" : { - "onDelete" : "stop", + "onDelete" : "hold", "onChange" : "hold" } }], diff --git a/src/main.rs b/src/main.rs index 3a2984a..7f662df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -79,7 +79,7 @@ async fn main() { let mut handler: Vec> = vec![]; for proc in processes.processes.iter() { - println!("\nProcess '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n", + println!("Process '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n", proc.name, proc.path, proc.dependencies.files.len(), @@ -263,10 +263,10 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender){ // println!("running daemon on {}", prc.name); // loop { - let _ = Command::new("pidof") - .arg(&prc.name) - .output() - .expect("Failed to execute command 'pidof'"); + // let _ = Command::new("pidof") + // .arg(&prc.name) + // .output() + // .expect("Failed to execute command 'pidof'"); // services and files check (once) let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); @@ -293,6 +293,9 @@ async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) -> R // println!("file daemon on {}", name); for file in files { if check_file(&file.filename, &file.src).is_err() { + if !is_active(name) || is_frozen(name) { + return Err(CustomError::Fatal); + } match file.triggers.on_delete.as_str() { "stay" => { continue; @@ -301,12 +304,10 @@ async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) -> R if is_active(name) { tx.send(1).await.unwrap(); } - tokio::time::sleep(Duration::from_millis(50)).await; return Err(CustomError::Fatal); }, "hold" => { if is_active(name) { - tokio::time::sleep(Duration::from_millis(50)).await; tx.send(2).await.unwrap(); return Err(CustomError::Fatal); } @@ -336,15 +337,21 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { // ?? async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) -> Result<(), CustomError> { // println!("service daemon on {}", name); + // let state = is_active(name); + // let condition = is_frozen(name); for serv in services { if check_service(&serv.hostname, &serv.port).await.is_err() { - // println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); + if !is_active(name) || is_frozen(name) { + return Err(CustomError::Fatal); + } + println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); match serv.triggers.on_lost.as_str() { "stay" => { }, "stop" => { if looped_service_connecting(name, serv).await.is_err() { tx.send(5).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; return Err(CustomError::Fatal); } }, @@ -354,6 +361,7 @@ async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender< } if looped_service_connecting(name, serv).await.is_err() { tx.send(6).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; return Err(CustomError::Fatal); } }, @@ -383,6 +391,7 @@ async fn looped_service_connecting( println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); match check_service(&serv.hostname, &serv.port).await { Ok(_) => { + println!("SUCCESS!"); break; }, Err(_) => { @@ -398,6 +407,7 @@ async fn looped_service_connecting( println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); match check_service(&serv.hostname, &serv.port).await { Ok(_) => { + println!("SUCCESS!"); return Ok(()); }, Err(_) => { @@ -409,25 +419,19 @@ async fn looped_service_connecting( } } async fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { - let host = host.to_string(); - let port = *port; - - let result = tokio::task::spawn_blocking(move || { - let mut command = Command::new("bash"); - command.args(["service-checker.sh", &host, &port.to_string()]); + let mut command = Command::new("bash"); + command.args(["service-checker.sh", host, &port.to_string()]); - match command.output() { - Ok(output) => { - if output.status.success() { - Ok(()) - } else { - Err(CustomError::Fatal) - } - }, - Err(_) => { - Err(CustomError::Fatal) - }, - } - }).await; - result.unwrap_or(Err(CustomError::Fatal)) + match command.output() { + Ok(output) => { + if output.status.success() { + return Ok(()); + } else { + return Err(CustomError::Fatal); + } + }, + Err(_) => { + return Err(CustomError::Fatal); + }, + }; } \ No newline at end of file