diff --git a/settings.json b/settings.json index 3090586..cf5fc71 100644 --- a/settings.json +++ b/settings.json @@ -17,7 +17,7 @@ "hostname" : "ya.ru", "port" : 443, "triggers" : { - "wait" : 200, + "wait" : 12, "delay" : 3 } }] @@ -36,8 +36,8 @@ "hostname" : "localhost", "port" : 8080, "triggers" : { - "wait" : 200, - "delay" : 3 + "wait" : 21, + "delay" : 1 } }] } diff --git a/src/main.rs b/src/main.rs index c145216..75f6d8b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use std::fs; use std::path::Path; use std::process::Command; // to use in time-trigger -use tokio::time::{sleep, Duration}; +use tokio::time::{Duration, Instant}; // to store condition between asynchronous tasks use tokio::sync::mpsc; @@ -72,25 +72,25 @@ async fn main() { proc.dependencies.services.len()); // creating msg channel - let (tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); - for file in proc.dependencies.files.iter() { - if let Err(_) = check_file(&file.filename, &file.src) { - eprintln!("Error: Process {} cannot run without file {}{}", proc.name, file.src, file.filename); - error_counter += 1; - } - } - for ser in proc.dependencies.services.iter() { - if let Err(_) = check_service(&ser.hostname, &ser.port) { - eprintln!("Error: Process {} cannot run while service {}:{} is down", proc.name, ser.hostname, ser.port); - error_counter += 1; - } - } + // for file in proc.dependencies.files.iter() { + // if let Err(_) = check_file(&file.filename, &file.src) { + // eprintln!("Error: Process {} cannot run without file {}{}", proc.name, file.src, file.filename); + // error_counter += 1; + // } + // } + // for ser in proc.dependencies.services.iter() { + // if let Err(_) = check_service(&ser.hostname, &ser.port) { + // eprintln!("Error: Process {} cannot run while service {}:{} is down", proc.name, ser.hostname, ser.port); + // error_counter += 1; + // } + // } - if error_counter > 0 { - return; - } + // if error_counter > 0 { + // continue; + // } let proc_clone = proc.clone(); let tx_clone = tx.clone(); @@ -107,8 +107,8 @@ async fn main() { async fn run_daemons( proc: &TrackingProcess, - tx: mpsc::Sender, - rx: &mut mpsc::Receiver + tx: mpsc::Sender, + rx: &mut mpsc::Receiver ) { let run_hand = running_handler(&proc.name, &proc.path, tx.clone()); @@ -119,9 +119,21 @@ async fn run_daemons( _ = run_hand => {}, _ = file_hand => {}, _ = serv_hand => {}, - _ = rx.recv() => { - terminate_process(&proc.name).await; - println!("Dependency handling error: Terminating {} process ..." , &proc.name); + _val = rx.recv() => { + match _val.unwrap() { + 1 => { + terminate_process(&proc.name).await; + println!("Dependency handling error: Terminating {} process ..." , &proc.name); + }, + 2 => { + println!("Error due to starting {} process", &proc.name); + }, + 3 => { + terminate_process(&proc.name).await; + println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); + }, + _ => {}, + } }, } } @@ -138,18 +150,18 @@ fn is_active(name: &str)-> bool { !String::from_utf8_lossy(&output.stdout).trim().is_empty() } async fn terminate_process (name: &str) { - let output = Command::new("pkill") + let _ = Command::new("pkill") .arg(name) .output() .expect("Failed to execute command 'pkill'"); } -async fn start_process(name: &str, path: &str, tx: mpsc::Sender) -> Result<(), CustomError> { +async fn start_process(name: &str, path: &str, tx: mpsc::Sender) -> Result<(), CustomError> { let runsh = format!("{}{}", path, "/run.sh"); let mut command = Command::new("bash"); command.arg(runsh); match command.spawn() { - Ok(mut child) => { + Ok(_) => { println!("Process {} is running now!", name); Ok(()) }, @@ -160,19 +172,19 @@ async fn start_process(name: &str, path: &str, tx: mpsc::Sender) -> Resul } } // check process status daemon -async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ +async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ loop { - let output = Command::new("pidof") + // println!("running daemon on {}", name); + let _ = Command::new("pidof") .arg(name) .output() .expect("Failed to execute command 'pidof'"); - // is down if !is_active(name) { match start_process(name, path, tx.clone()).await { Ok(_) => {}, Err(_) => { - tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); }, } } @@ -180,19 +192,17 @@ async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ } } -async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { +async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { loop { - for file in files { - if !is_active(name) { - break; - } - match check_file(&file.filename, &file.src) { - Ok(_) => { - // println!("{} is still in directory!", &file.filename); - }, - Err(_) => { - tx.send(1).await.unwrap(); - }, + // println!("file daemon on {}", name); + if is_active(name) { + for file in files { + match check_file(&file.filename, &file.src) { + Ok(_) => {}, + Err(_) => { + tx.send(1).await.unwrap(); + }, + } } } tokio::time::sleep(Duration::from_millis(100)).await; @@ -208,24 +218,70 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { } } -async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) { - loop { - for serv in services { - if !is_active(name) { - break; +// ?? +async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) { + 'main_loop: loop { + // println!("service daemon on {}", name); + if is_active(name) { + for serv in services { + match check_service(&serv.hostname, &serv.port) { + Ok(_) => { + // println!("{} is up!", &serv.hostname); + }, + Err(_) => { + match looped_service_connecting(&name, serv).await { + Ok(_) => { + continue; + }, + Err(_) => { + tx.send(3).await.unwrap(); + }, + } + // tx.send(3).await.unwrap(); + }, + } } - match check_service(&serv.hostname, &serv.port) { - Ok(_) => { - // println!("{} is up!", &serv.hostname); - }, - Err(_) => { - tx.send(1).await.unwrap(); - }, - } } tokio::time::sleep(Duration::from_millis(100)).await; } } + +async fn looped_service_connecting( + name: &str, + serv: &Services +) -> Result<(), CustomError> +{ + if serv.triggers.wait == 0 { + loop { + tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; + println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); + match check_service(&serv.hostname, &serv.port) { + Ok(_) => { + break; + }, + Err(_) => { + continue; + }, + } + } + return Ok(()); + } else { + let start = Instant::now(); + while start.elapsed().as_secs() < serv.triggers.wait.into() { + tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; + println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); + match check_service(&serv.hostname, &serv.port) { + Ok(_) => { + return Ok(()); + }, + Err(_) => { + continue; + }, + } + } + return Err(CustomError::Fatal); + } +} fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { let mut command = Command::new("bash"); command.args(["service-checker.sh", host, &port.to_string()]);