From 244508509cf8d96ba977a20b977b65cbdc62fcd6 Mon Sep 17 00:00:00 2001 From: prplV Date: Mon, 8 Jul 2024 03:42:42 -0400 Subject: [PATCH] zero state --- settings.json | 10 +-- src/main.rs | 181 ++++++++++++++++++++++---------------------------- 2 files changed, 83 insertions(+), 108 deletions(-) diff --git a/settings.json b/settings.json index cf5fcc0..86ff13d 100644 --- a/settings.json +++ b/settings.json @@ -8,7 +8,7 @@ "filename" : "control-file", "src" : "/home/vladislav/web/", "triggers" : { - "onDelete" : "hold", + "onDelete" : "stop", "onChange" : "hold" } }, @@ -16,7 +16,7 @@ "filename" : "config-file", "src" : "/home/vladislav/web/", "triggers" : { - "onDelete" : "hold", + "onDelete" : "stop", "onChange" : "hold" } } @@ -27,7 +27,7 @@ "triggers" : { "wait" : 6, "delay" : 1, - "onLost" : "hold" + "onLost" : "stop" } }] } @@ -41,7 +41,7 @@ "filename" : "control-file", "src" : "/home/vladislav/web/", "triggers" : { - "onDelete" : "hold", + "onDelete" : "stop", "onChange" : "hold" } }], @@ -51,7 +51,7 @@ "triggers" : { "wait" : 14, "delay" : 1, - "onLost" : "hold" + "onLost" : "stop" } }, { "hostname" : "localhost", diff --git a/src/main.rs b/src/main.rs index 6283d81..ac9a455 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; use serde_json; +use tokio::io::Join; +use std::env::join_paths; use std::fmt::Debug; use std::fs; use std::path::Path; @@ -7,7 +9,7 @@ use std::process::{Command, Output}; // to use in time-trigger use tokio::time::{Duration, Instant}; // to store condition between asynchronous tasks -use tokio::sync::mpsc; +use tokio::sync::{futures, mpsc}; enum CustomError { @@ -49,7 +51,6 @@ struct Services { } // policy - #[derive(Debug, Serialize, Deserialize, Clone)] struct ServiceTriggers { wait : u32, @@ -85,24 +86,6 @@ async fn main() { // creating msg channel let (tx, mut rx) = mpsc::channel::(1); - - - // for file in proc.dependencies.files.iter() { - // if let Err(_) = check_file(&file.filename, &file.src) { - // eprintln!("Error: Process {} cannot run without file {}{}", proc.name, file.src, file.filename); - // error_counter += 1; - // } - // } - // for ser in proc.dependencies.services.iter() { - // if let Err(_) = check_service(&ser.hostname, &ser.port) { - // eprintln!("Error: Process {} cannot run while service {}:{} is down", proc.name, ser.hostname, ser.port); - // error_counter += 1; - // } - // } - - // if error_counter > 0 { - // continue; - // } let proc_clone = proc.clone(); let tx_clone = tx.clone(); @@ -114,6 +97,7 @@ async fn main() { for i in handler { i.await.unwrap(); } + return; } @@ -189,6 +173,7 @@ async fn run_daemons( } }, } + // tokio::task::yield_now().await; } } @@ -262,57 +247,57 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> { } // check process status daemon async fn running_handler(name: &str, path: &str, tx: mpsc::Sender){ - // println!("running daemon on {}", name); - let _ = Command::new("pidof") - .arg(name) - .output() - .expect("Failed to execute command 'pidof'"); + println!("running daemon on {}", name); + let _ = Command::new("pidof") + .arg(name) + .output() + .expect("Failed to execute command 'pidof'"); - if !is_active(name) && !is_frozen(name){ - match start_process(name, path).await { - Ok(_) => {}, - Err(_) => { - tx.send(3).await.unwrap(); - }, - } - } - tokio::time::sleep(Duration::from_millis(100)).await; + if !is_active(name) && !is_frozen(name){ + if start_process(name, path).await.is_err() { + tx.send(3).await.unwrap(); + // return; + } + } + tokio::time::sleep(Duration::from_millis(100)).await; } async fn file_handler(name: &str, files: &Vec, tx: mpsc::Sender) { - // println!("file daemon on {}", name); - if is_active(name) { - for file in files { - match check_file(&file.filename, &file.src) { - Ok(_) => { - continue; - }, - Err(_) => { - match file.triggers.on_delete.as_str() { - "stay" => { - continue; - }, - "stop" => { - tx.send(1).await.unwrap(); - return; - }, - "hold" => { - tx.send(2).await.unwrap(); - return; - }, - _ => { - tx.send(101).await.unwrap(); - return; - }, + loop { + println!("file daemon on {}", name); + if is_active(name) && !is_frozen(name){ + for file in files { + if check_file(&file.filename, &file.src).is_err() { + match file.triggers.on_delete.as_str() { + "stay" => { + continue; + }, + "stop" => { + tx.send(1).await.unwrap(); + return; + }, + "hold" => { + tx.send(2).await.unwrap(); + // continue; + // while check_file(&file.filename, &file.src).is_err() { + // println!("zov"); + // tokio::time::sleep(Duration::from_millis(100)).await; + // } + // tx.send(10).await.unwrap(); + }, + _ => { + tx.send(101).await.unwrap(); + return; + }, } - }, + } } + // if is_frozen(name) { + // tx.send(10).await.unwrap(); + // } } - if is_frozen(name) { - tx.send(10).await.unwrap(); - } + tokio::time::sleep(Duration::from_millis(100)).await; } - tokio::time::sleep(Duration::from_millis(100)).await; } fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { let fileconcat = format!("{}{}", path, filename); @@ -326,49 +311,39 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { // ?? async fn service_handler(name: &str, services: &Vec, tx: mpsc::Sender) { - // println!("service daemon on {}", name); - if is_active(name) { - for serv in services { - match check_service(&serv.hostname, &serv.port) { - Ok(_) => { - continue; - }, - Err(_) => { - println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); - match serv.triggers.on_lost.as_str() { - "stay" => { - }, - "stop" => { - match looped_service_connecting(&name, serv).await { - Ok(_) => {}, - Err(_) => { - tx.send(5).await.unwrap(); - break; - }, - } - }, - "hold" => { - match looped_service_connecting(&name, serv).await { - Ok(_) => {}, - Err(_) => { - tx.send(6).await.unwrap(); - break; - }, - } - }, - _ => { - tx.send(101).await.unwrap(); - return; - }, + loop { + println!("service daemon on {}", name); + if is_active(name) && !is_frozen(name) { + for serv in services { + if check_service(&serv.hostname, &serv.port).is_err() { + // println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); + match serv.triggers.on_lost.as_str() { + "stay" => { + }, + "stop" => { + if looped_service_connecting(&name, serv).await.is_err() { + tx.send(5).await.unwrap(); + return; + } + }, + "hold" => { + if looped_service_connecting(&name, serv).await.is_err() { + tx.send(6).await.unwrap(); + } + }, + _ => { + tx.send(101).await.unwrap(); + return; + }, } - }, - } - } - if is_frozen(name) { - tx.send(11).await.unwrap(); + } + } + // if is_frozen(name) { + // tx.send(11).await.unwrap(); + // } } + tokio::time::sleep(Duration::from_millis(100)).await; } - tokio::time::sleep(Duration::from_millis(100)).await; } async fn looped_service_connecting(