diff --git a/src/main.rs b/src/main.rs index 7f662df..69175fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,28 @@ use serde::{Deserialize, Serialize}; use serde_json; -use tokio::io::Join; use tokio::join; -use std::env::join_paths; use std::fmt::Debug; use std::fs; use std::path::Path; use std::process::{Command, Output}; -// to use in time-trigger use tokio::time::{Duration, Instant}; -// to store condition between asynchronous tasks -use tokio::sync::{futures, mpsc}; - +use tokio::sync::mpsc; +/// # an Error enum (nextly will be deleted and replaced) enum CustomError { Fatal, } +/// # struct for the 1st level in json conf file +/// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] struct Processes { #[serde(default)] processes : Vec, } +/// # struct for each process to contain info, such as name, path and dependencies +/// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] struct TrackingProcess { name : String, @@ -30,6 +30,8 @@ struct TrackingProcess { dependencies: Dependencies, } +/// # struct for processes' dependecies including files and services +/// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] struct Dependencies { #[serde(default)] @@ -37,6 +39,9 @@ struct Dependencies { #[serde(default)] services: Vec, } + +/// # struct for containing file object with its triggers to manipulate in daemons +/// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] struct Files { filename : String, @@ -44,6 +49,8 @@ struct Files { triggers : FIleTriggers, } +/// # struct for containing service object with its triggers to manipulate in daemons +/// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] struct Services { hostname : String, @@ -51,7 +58,8 @@ struct Services { triggers : ServiceTriggers, } -// policy +/// # struct for instancing each service's policies such as on lost or time to wait till reachable +/// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] struct ServiceTriggers { wait : u32, @@ -59,6 +67,9 @@ struct ServiceTriggers { #[serde(rename="onLost")] on_lost : String, } + +/// # struct for instancing each file's policies such as ondelete or onupdate events +/// > (needed in serialization and deserialization) #[derive(Debug, Serialize, Deserialize, Clone)] struct FIleTriggers { #[serde(rename="onDelete")] @@ -102,6 +113,9 @@ async fn main() { return; } +/// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form ) +/// > hint : give mpsc with capacity 1 to jump over potential errors during running process +/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") ** async fn run_daemons( proc: &TrackingProcess, tx: mpsc::Sender, @@ -110,13 +124,9 @@ async fn run_daemons( { loop { let run_hand = running_handler(&proc, tx.clone()); - // let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone()); - // let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone()); tokio::select! { _ = run_hand => {}, - // _ = file_hand => {}, - // _ = serv_hand => {}, _val = rx.recv() => { match _val.unwrap() { // 1 - File-dependency handling error -> terminating (after waiting) @@ -262,67 +272,61 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { // check process status daemon async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender){ // println!("running daemon on {}", prc.name); - // loop { - // let _ = Command::new("pidof") - // .arg(&prc.name) - // .output() - // .expect("Failed to execute command 'pidof'"); - // services and files check (once) - let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); - let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); + // services and files check (once) + let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone()); + let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone()); - let res = join!(files_check, services_check); - // if inactive -> spawn checks -> active is true - if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){ - if start_process(&prc.name, &prc.path).await.is_err() { - tx.send(3).await.unwrap(); - return; - } - } - // if frozen -> spawn checks -> unfreeze is true - else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){ - tx.send(10).await.unwrap(); - return; + let res = join!(files_check, services_check); + // if inactive -> spawn checks -> active is true + if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){ + if start_process(&prc.name, &prc.path).await.is_err() { + tx.send(3).await.unwrap(); + return; } - tokio::time::sleep(Duration::from_millis(100)).await; - tokio::task::yield_now().await; - // } + } + // if frozen -> spawn checks -> unfreeze is true + else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){ + tx.send(10).await.unwrap(); + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; } async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) -> Result<(), CustomError>{ // println!("file daemon on {}", name); - for file in files { - if check_file(&file.filename, &file.src).is_err() { - if !is_active(name) || is_frozen(name) { - return Err(CustomError::Fatal); - } - match file.triggers.on_delete.as_str() { - "stay" => { - continue; - }, - "stop" => { - if is_active(name) { - tx.send(1).await.unwrap(); - } - return Err(CustomError::Fatal); - }, - "hold" => { - if is_active(name) { - tx.send(2).await.unwrap(); - return Err(CustomError::Fatal); - } - }, - _ => { - tokio::time::sleep(Duration::from_millis(50)).await; - tx.send(101).await.unwrap(); - return Err(CustomError::Fatal); - }, - } + for file in files { + if check_file(&file.filename, &file.src).is_err() { + if !is_active(name) || is_frozen(name) { + return Err(CustomError::Fatal); + } + match file.triggers.on_delete.as_str() { + "stay" => { + continue; + }, + "stop" => { + if is_active(name) { + tx.send(1).await.unwrap(); } - } - tokio::time::sleep(Duration::from_millis(100)).await; - tokio::task::yield_now().await; - Ok(()) + return Err(CustomError::Fatal); + }, + "hold" => { + if is_active(name) { + tx.send(2).await.unwrap(); + return Err(CustomError::Fatal); + } + }, + _ => { + tokio::time::sleep(Duration::from_millis(50)).await; + tx.send(101).await.unwrap(); + return Err(CustomError::Fatal); + }, + } + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; + Ok(()) } fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let fileconcat = format!("{}{}", path, filename); @@ -334,49 +338,44 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { } } -// ?? async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) -> Result<(), CustomError> { // println!("service daemon on {}", name); - // let state = is_active(name); - // let condition = is_frozen(name); - for serv in services { - if check_service(&serv.hostname, &serv.port).await.is_err() { - if !is_active(name) || is_frozen(name) { - return Err(CustomError::Fatal); - } - println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); - match serv.triggers.on_lost.as_str() { - "stay" => { - }, - "stop" => { - if looped_service_connecting(name, serv).await.is_err() { - tx.send(5).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - return Err(CustomError::Fatal); - } - }, - "hold" => { - if is_frozen(name) { - return Err(CustomError::Fatal); - } - if looped_service_connecting(name, serv).await.is_err() { - tx.send(6).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - return Err(CustomError::Fatal); - } - }, - _ => { - tx.send(101).await.unwrap(); - return Err(CustomError::Fatal); - }, - } - } - } - tokio::time::sleep(Duration::from_millis(100)).await; - tokio::task::yield_now().await; - Ok(()) - - + for serv in services { + if check_service(&serv.hostname, &serv.port).await.is_err() { + if !is_active(name) || is_frozen(name) { + return Err(CustomError::Fatal); + } + println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); + match serv.triggers.on_lost.as_str() { + "stay" => { + }, + "stop" => { + if looped_service_connecting(name, serv).await.is_err() { + tx.send(5).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + return Err(CustomError::Fatal); + } + }, + "hold" => { + if is_frozen(name) { + return Err(CustomError::Fatal); + } + if looped_service_connecting(name, serv).await.is_err() { + tx.send(6).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + return Err(CustomError::Fatal); + } + }, + _ => { + tx.send(101).await.unwrap(); + return Err(CustomError::Fatal); + }, + } + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + tokio::task::yield_now().await; + Ok(()) } async fn looped_service_connecting( @@ -384,7 +383,6 @@ async fn looped_service_connecting( serv: &Services ) -> Result<(), CustomError> { - if serv.triggers.wait == 0 { loop { tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;