diff --git a/settings.json b/settings.json index 86ff13d..4c3d125 100644 --- a/settings.json +++ b/settings.json @@ -41,7 +41,7 @@ "filename" : "control-file", "src" : "/home/vladislav/web/", "triggers" : { - "onDelete" : "stop", + "onDelete" : "hold", "onChange" : "hold" } }], @@ -57,9 +57,9 @@ "hostname" : "localhost", "port" : 8080, "triggers" : { - "wait" : 20, - "delay" : 5, - "onLost" : "stop" + "wait" : 10, + "delay" : 2, + "onLost" : "hold" } }] } diff --git a/src/main.rs b/src/main.rs index ac9a455..7f662df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use tokio::io::Join; +use tokio::join; use std::env::join_paths; use std::fmt::Debug; use std::fs; @@ -66,7 +67,7 @@ struct FIleTriggers { on_change : String, } -#[tokio::main] +#[tokio::main(flavor = "multi_thread")] async fn main() { let processes = load_processes("settings.json"); // let mut error_counter = 0; @@ -78,7 +79,7 @@ async fn main() { let mut handler: Vec> = vec![]; for proc in processes.processes.iter() { - println!("\nProcess '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n", + println!("Process '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n", proc.name, proc.path, proc.dependencies.files.len(), @@ -108,25 +109,30 @@ async fn run_daemons( ) { loop { - let run_hand = running_handler(&proc.name, &proc.path, tx.clone()); - let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone()); - let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone()); + let run_hand = running_handler(&proc, tx.clone()); + // let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone()); + // let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone()); tokio::select! { _ = run_hand => {}, - _ = file_hand => {}, - _ = serv_hand => {}, + // _ = file_hand => {}, + // _ = serv_hand => {}, _val = rx.recv() => { match _val.unwrap() { // 1 - File-dependency handling error -> terminating (after waiting) 1 => { - println!("Dependency handling error: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - break; + if is_active(&proc.name) { + println!("Dependency handling error: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + } + // break; }, // 2 - File-dependency handling error -> holding (after waiting) 2 => { - println!("Error due to starting {} process", &proc.name); + if !is_frozen(&proc.name) { + println!("Dependency handling error: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + } }, // 3 - Running process error 3 => { @@ -138,14 +144,18 @@ async fn run_daemons( }, // 5 - Timeout of waiting service-dependency -> terminating (after waiting) 5 => { - println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); - terminate_process(&proc.name).await; - break; + if is_active(&proc.name) { + println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); + terminate_process(&proc.name).await; + } + // break; }, // 6 - Timeout of waiting service-dependency -> holding (after waiting) 6 => { - println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); - freeze_process(&proc.name).await; + if !is_frozen(&proc.name) { + println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); + freeze_process(&proc.name).await; + } }, // // 7 - File-dependency change -> terminating (after check) // 7 => {}, @@ -156,13 +166,17 @@ async fn run_daemons( // 10 - Process unfreaze call via file handler 10 => { - println!("Unfreezing process {} call via file handler...", &proc.name); - unfreeze_process(&proc.name).await; + if is_frozen(&proc.name) { + println!("Unfreezing process {} call...", &proc.name); + unfreeze_process(&proc.name).await; + } }, // 11 - Process unfreaze call via service handler 11 => { - println!("Unfreezing process {} call via service handler...", &proc.name); - unfreeze_process(&proc.name).await; + if is_frozen(&proc.name) { + println!("Unfreezing process {} call...", &proc.name); + unfreeze_process(&proc.name).await; + } }, // 101 - Impermissible trigger values in JSON 101 => { @@ -246,58 +260,69 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { } } // check process status daemon -async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ - println!("running daemon on {}", name); - let _ = Command::new("pidof") - .arg(name) - .output() - .expect("Failed to execute command 'pidof'"); +async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender){ + // println!("running daemon on {}", prc.name); + // loop { + // let _ = Command::new("pidof") + // .arg(&prc.name) + // .output() + // .expect("Failed to execute command 'pidof'"); + // services and files check (once) + let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); + let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - if !is_active(name) && !is_frozen(name){ - if start_process(name, path).await.is_err() { + let res = join!(files_check, services_check); + // if inactive -> spawn checks -> active is true + if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){ + if start_process(&prc.name, &prc.path).await.is_err() { tx.send(3).await.unwrap(); - // return; + return; } } + // if frozen -> spawn checks -> unfreeze is true + else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){ + tx.send(10).await.unwrap(); + return; + } tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; + // } } -async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { - loop { - println!("file daemon on {}", name); - if is_active(name) && !is_frozen(name){ +async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) -> Result<(), CustomError>{ + // println!("file daemon on {}", name); for file in files { if check_file(&file.filename, &file.src).is_err() { + if !is_active(name) || is_frozen(name) { + return Err(CustomError::Fatal); + } match file.triggers.on_delete.as_str() { "stay" => { continue; }, "stop" => { - tx.send(1).await.unwrap(); - return; + if is_active(name) { + tx.send(1).await.unwrap(); + } + return Err(CustomError::Fatal); }, "hold" => { - tx.send(2).await.unwrap(); - // continue; - // while check_file(&file.filename, &file.src).is_err() { - // println!("zov"); - // tokio::time::sleep(Duration::from_millis(100)).await; - // } - // tx.send(10).await.unwrap(); + if is_active(name) { + tx.send(2).await.unwrap(); + return Err(CustomError::Fatal); + } }, _ => { + tokio::time::sleep(Duration::from_millis(50)).await; tx.send(101).await.unwrap(); - return; + return Err(CustomError::Fatal); }, } } } - // if is_frozen(name) { - // tx.send(10).await.unwrap(); - // } - } tokio::time::sleep(Duration::from_millis(100)).await; - } + tokio::task::yield_now().await; + Ok(()) } fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let fileconcat = format!("{}{}", path, filename); @@ -310,40 +335,48 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { } // ?? -async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) { - loop { - println!("service daemon on {}", name); - if is_active(name) && !is_frozen(name) { +async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) -> Result<(), CustomError> { + // println!("service daemon on {}", name); + // let state = is_active(name); + // let condition = is_frozen(name); for serv in services { - if check_service(&serv.hostname, &serv.port).is_err() { - // println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); + if check_service(&serv.hostname, &serv.port).await.is_err() { + if !is_active(name) || is_frozen(name) { + return Err(CustomError::Fatal); + } + println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); match serv.triggers.on_lost.as_str() { "stay" => { }, "stop" => { - if looped_service_connecting(&name, serv).await.is_err() { + if looped_service_connecting(name, serv).await.is_err() { tx.send(5).await.unwrap(); - return; + tokio::time::sleep(Duration::from_millis(100)).await; + return Err(CustomError::Fatal); } }, "hold" => { - if looped_service_connecting(&name, serv).await.is_err() { + if is_frozen(name) { + return Err(CustomError::Fatal); + } + if looped_service_connecting(name, serv).await.is_err() { tx.send(6).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + return Err(CustomError::Fatal); } }, _ => { tx.send(101).await.unwrap(); - return; + return Err(CustomError::Fatal); }, } } } - // if is_frozen(name) { - // tx.send(11).await.unwrap(); - // } - } tokio::time::sleep(Duration::from_millis(100)).await; - } + tokio::task::yield_now().await; + Ok(()) + + } async fn looped_service_connecting( @@ -351,12 +384,14 @@ async fn looped_service_connecting( serv: &Services ) -> Result<(), CustomError> { + if serv.triggers.wait == 0 { loop { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); - match check_service(&serv.hostname, &serv.port) { + match check_service(&serv.hostname, &serv.port).await { Ok(_) => { + println!("SUCCESS!"); break; }, Err(_) => { @@ -370,8 +405,9 @@ async fn looped_service_connecting( while start.elapsed().as_secs() < serv.triggers.wait.into() { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); - match check_service(&serv.hostname, &serv.port) { + match check_service(&serv.hostname, &serv.port).await { Ok(_) => { + println!("SUCCESS!"); return Ok(()); }, Err(_) => { @@ -382,7 +418,7 @@ async fn looped_service_connecting( return Err(CustomError::Fatal); } } -fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { +async fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { let mut command = Command::new("bash"); command.args(["service-checker.sh", host, &port.to_string()]);