diff --git a/Cargo.lock b/Cargo.lock index 311ef9a..a1de0a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,7 +199,7 @@ dependencies = [ [[package]] name = "runner-rs" -version = "0.1.0" +version = "0.1.10" dependencies = [ "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 77fcacd..271b552 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "runner-rs" -version = "0.1.0" +version = "0.1.10" edition = "2021" [dependencies] diff --git a/src/main.rs b/src/main.rs index 04fe493..dec8264 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,13 @@ use serde::{Deserialize, Serialize}; use serde_json; use tokio::join; +use core::panic; use std::fmt::Debug; use std::fs; -use std::ops::RangeInclusive; +use std::future::IntoFuture; use std::path::Path; use std::process::{Command, Output}; +use std::rc::Rc; use std::sync::Arc; use tokio::time::{Duration, Instant}; use tokio::sync::mpsc; @@ -99,6 +101,7 @@ async fn main() { proc.dependencies.services.len()); // creating msg channel + // can or should be executed in new thread (with Arc -> Rc) let (tx, mut rx) = mpsc::channel::(1); let proc = Arc::new(proc.clone()); @@ -124,16 +127,17 @@ async fn run_daemons( rx: &mut mpsc::Receiver ) { + // clone name + // clone path loop { - let run_hand = running_handler(&proc, tx.clone()); - + let run_hand = running_handler(proc.clone(), tx.clone()); tokio::select! { _ = run_hand => {}, _val = rx.recv() => { match _val.unwrap() { // 1 - File-dependency handling error -> terminating (after waiting) 1 => { - if is_active(&proc.name) { + if is_active(&proc.name).await { println!("Dependency handling error: Terminating {} process ..." , &proc.name); terminate_process(&proc.name).await; } @@ -141,7 +145,7 @@ async fn run_daemons( }, // 2 - File-dependency handling error -> holding (after waiting) 2 => { - if !is_frozen(&proc.name) { + if !is_frozen(&proc.name).await { println!("Dependency handling error: Freezing {} process ..." , &proc.name); freeze_process(&proc.name).await; } @@ -156,7 +160,7 @@ async fn run_daemons( }, // 5 - Timeout of waiting service-dependency -> terminating (after waiting) 5 => { - if is_active(&proc.name) { + if is_active(&proc.name).await { println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); terminate_process(&proc.name).await; } @@ -164,7 +168,7 @@ async fn run_daemons( }, // 6 - Timeout of waiting service-dependency -> holding (after waiting) 6 => { - if !is_frozen(&proc.name) { + if !is_frozen(&proc.name).await { println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); freeze_process(&proc.name).await; } @@ -178,14 +182,14 @@ async fn run_daemons( // 10 - Process unfreaze call via file handler 10 => { - if is_frozen(&proc.name) { + if is_frozen(&proc.name).await { println!("Unfreezing process {} call...", &proc.name); unfreeze_process(&proc.name).await; } }, // 11 - Process unfreaze call via service handler 11 => { - if is_frozen(&proc.name) { + if is_frozen(&proc.name).await { println!("Unfreezing process {} call...", &proc.name); unfreeze_process(&proc.name).await; } @@ -201,38 +205,54 @@ async fn run_daemons( } // tokio::task::yield_now().await; } - } +// 4ever sync fn load_processes(json_filename: &str) -> Processes{ let read = fs::read_to_string(json_filename).expect(format!("Missing '{}' file. Cannot start runner", json_filename).as_str()); serde_json::from_str::(&read).expect(format!("Parsing error in '{}' file. Cannot start runner", json_filename).as_str()) } -fn get_pid(name: &str) -> Output { - Command::new("pidof") - .arg(name) +async fn get_pid(name: &str) -> Output { + let name = Arc::new(name.to_string()); + tokio::task::spawn_blocking(move || { + Command::new("pidof") + .arg(&*name) .output() .expect("Failed to execute command 'pidof'") + }) + .await + .unwrap() } -fn is_active(name: &str)-> bool { - let output = Command::new("pidof") - .arg(name) +async fn is_active(name: &str)-> bool { + let arc_name = Arc::new(name.to_string()); + tokio::task::spawn_blocking(move || { + let output = Command::new("pidof") + .arg(&*arc_name) .output() .expect("Failed to execute command 'pidof'"); !String::from_utf8_lossy(&output.stdout).trim().is_empty() + }) + .await + .unwrap() } -fn is_frozen(name: &str) -> bool { - let temp = get_pid(name); +async fn is_frozen(name: &str) -> bool { + let temp = get_pid(name).await; let pid = String::from_utf8_lossy(&temp.stdout); - if pid.trim().is_empty(){ + let pid = pid.trim(); + let arc_pid = Arc::new(pid.to_string()); + if pid.is_empty(){ return false; } else { - let cmd = Command::new("ps") - .args(["-o", "stat=", "-p", pid.trim()]) - .output() - .expect("Failed to execute ps command"); - return !(String::from_utf8_lossy(&cmd.stdout) == "Sl+\n"); + tokio::task::spawn_blocking(move || { + let cmd = Command::new("ps") + .args(["-o", "stat=", "-p", &arc_pid]) + .output() + .expect("Failed to execute ps command"); + !(String::from_utf8_lossy(&cmd.stdout) == "Sl+\n") + }) + .await + .unwrap() } } async fn terminate_process (name: &str) { @@ -272,7 +292,7 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { } } // check process status daemon -async fn running_handler(prc: &TrackingProcess, tx: Arc>){ +async fn running_handler(prc: Arc, tx: Arc>){ // println!("running daemon on {}", prc.name); // services and files check (once) let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); @@ -280,14 +300,14 @@ async fn running_handler(prc: &TrackingProcess, tx: Arc>){ let res = join!(files_check, services_check); // if inactive -> spawn checks -> active is true - if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){ + if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ if start_process(&prc.name, &prc.path).await.is_err() { tx.send(3).await.unwrap(); return; } } // if frozen -> spawn checks -> unfreeze is true - else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){ + else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){ tx.send(10).await.unwrap(); return; } @@ -298,8 +318,8 @@ async fn running_handler(prc: &TrackingProcess, tx: Arc>){ async fn file_handler(name: &str, files: &Vec, tx: Arc>) -> Result<(), CustomError>{ // println!("file daemon on {}", name); for file in files { - if check_file(&file.filename, &file.src).is_err() { - if !is_active(name) || is_frozen(name) { + if check_file(&file.filename, &file.src).await.is_err() { + if !is_active(name).await || is_frozen(name).await { return Err(CustomError::Fatal); } match file.triggers.on_delete.as_str() { @@ -307,13 +327,13 @@ async fn file_handler(name: &str, files: &Vec, tx: Arc>) continue; }, "stop" => { - if is_active(name) { + if is_active(name).await { tx.send(1).await.unwrap(); } return Err(CustomError::Fatal); }, "hold" => { - if is_active(name) { + if is_active(name).await { tx.send(2).await.unwrap(); return Err(CustomError::Fatal); } @@ -330,21 +350,29 @@ async fn file_handler(name: &str, files: &Vec, tx: Arc>) tokio::task::yield_now().await; Ok(()) } -fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { - let fileconcat = format!("{}{}", path, filename); - let path = Path::new(&fileconcat); - if path.exists() { - Ok(()) - } else { - Err(CustomError::Fatal) - } +async fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { + let arc_name = Arc::new(filename.to_string()); + let arc_path = Arc::new(path.to_string()); + tokio::task::spawn_blocking(move || { + let fileconcat = format!("{}{}", arc_path, arc_name); + let path = Path::new(&fileconcat); + if path.exists() { + Ok(()) + } else { + Err(CustomError::Fatal) + } + }) + .await + .unwrap_or_else(|_| { + panic!("Corrupted while file check process"); + }) } async fn service_handler(name: &str, services: &Vec, tx: Arc>) -> Result<(), CustomError> { // println!("service daemon on {}", name); for serv in services { if check_service(&serv.hostname, &serv.port).await.is_err() { - if !is_active(name) || is_frozen(name) { + if !is_active(name).await || is_frozen(name).await { return Err(CustomError::Fatal); } println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); @@ -359,7 +387,7 @@ async fn service_handler(name: &str, services: &Vec, tx: Arc { - if is_frozen(name) { + if is_frozen(name).await { return Err(CustomError::Fatal); } if looped_service_connecting(name, serv).await.is_err() { @@ -391,7 +419,7 @@ async fn looped_service_connecting( println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); match check_service(&serv.hostname, &serv.port).await { Ok(_) => { - println!("SUCCESS!"); + println!("Successfully connected to {} from {} process!", &serv.hostname, &name); break; }, Err(_) => {