zero state

pull/9/head
prplV 2024-07-08 03:42:42 -04:00
parent c90e1938ef
commit 244508509c
2 changed files with 83 additions and 108 deletions

View File

@ -8,7 +8,7 @@
"filename" : "control-file",
"src" : "/home/vladislav/web/",
"triggers" : {
"onDelete" : "hold",
"onDelete" : "stop",
"onChange" : "hold"
}
},
@ -16,7 +16,7 @@
"filename" : "config-file",
"src" : "/home/vladislav/web/",
"triggers" : {
"onDelete" : "hold",
"onDelete" : "stop",
"onChange" : "hold"
}
}
@ -27,7 +27,7 @@
"triggers" : {
"wait" : 6,
"delay" : 1,
"onLost" : "hold"
"onLost" : "stop"
}
}]
}
@ -41,7 +41,7 @@
"filename" : "control-file",
"src" : "/home/vladislav/web/",
"triggers" : {
"onDelete" : "hold",
"onDelete" : "stop",
"onChange" : "hold"
}
}],
@ -51,7 +51,7 @@
"triggers" : {
"wait" : 14,
"delay" : 1,
"onLost" : "hold"
"onLost" : "stop"
}
}, {
"hostname" : "localhost",

View File

@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};
use serde_json;
use tokio::io::Join;
use std::env::join_paths;
use std::fmt::Debug;
use std::fs;
use std::path::Path;
@ -7,7 +9,7 @@ use std::process::{Command, Output};
// to use in time-trigger
use tokio::time::{Duration, Instant};
// to store condition between asynchronous tasks
use tokio::sync::mpsc;
use tokio::sync::{futures, mpsc};
enum CustomError {
@ -49,7 +51,6 @@ struct Services {
}
// policy
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ServiceTriggers {
wait : u32,
@ -86,24 +87,6 @@ async fn main() {
// creating msg channel
let (tx, mut rx) = mpsc::channel::<u8>(1);
// for file in proc.dependencies.files.iter() {
// if let Err(_) = check_file(&file.filename, &file.src) {
// eprintln!("Error: Process {} cannot run without file {}{}", proc.name, file.src, file.filename);
// error_counter += 1;
// }
// }
// for ser in proc.dependencies.services.iter() {
// if let Err(_) = check_service(&ser.hostname, &ser.port) {
// eprintln!("Error: Process {} cannot run while service {}:{} is down", proc.name, ser.hostname, ser.port);
// error_counter += 1;
// }
// }
// if error_counter > 0 {
// continue;
// }
let proc_clone = proc.clone();
let tx_clone = tx.clone();
let event = tokio::spawn(async move {
@ -114,6 +97,7 @@ async fn main() {
for i in handler {
i.await.unwrap();
}
return;
}
@ -189,6 +173,7 @@ async fn run_daemons(
}
},
}
// tokio::task::yield_now().await;
}
}
@ -262,32 +247,27 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
}
// check process status daemon
async fn running_handler(name: &str, path: &str, tx: mpsc::Sender<u8>){
// println!("running daemon on {}", name);
println!("running daemon on {}", name);
let _ = Command::new("pidof")
.arg(name)
.output()
.expect("Failed to execute command 'pidof'");
if !is_active(name) && !is_frozen(name){
match start_process(name, path).await {
Ok(_) => {},
Err(_) => {
if start_process(name, path).await.is_err() {
tx.send(3).await.unwrap();
},
// return;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<u8>) {
// println!("file daemon on {}", name);
if is_active(name) {
loop {
println!("file daemon on {}", name);
if is_active(name) && !is_frozen(name){
for file in files {
match check_file(&file.filename, &file.src) {
Ok(_) => {
continue;
},
Err(_) => {
if check_file(&file.filename, &file.src).is_err() {
match file.triggers.on_delete.as_str() {
"stay" => {
continue;
@ -298,22 +278,27 @@ async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<u8>) {
},
"hold" => {
tx.send(2).await.unwrap();
return;
// continue;
// while check_file(&file.filename, &file.src).is_err() {
// println!("zov");
// tokio::time::sleep(Duration::from_millis(100)).await;
// }
// tx.send(10).await.unwrap();
},
_ => {
tx.send(101).await.unwrap();
return;
},
}
},
}
}
if is_frozen(name) {
tx.send(10).await.unwrap();
}
// if is_frozen(name) {
// tx.send(10).await.unwrap();
// }
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let fileconcat = format!("{}{}", path, filename);
let path = Path::new(&fileconcat);
@ -326,34 +311,24 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
// ??
async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<u8>) {
// println!("service daemon on {}", name);
if is_active(name) {
loop {
println!("service daemon on {}", name);
if is_active(name) && !is_frozen(name) {
for serv in services {
match check_service(&serv.hostname, &serv.port) {
Ok(_) => {
continue;
},
Err(_) => {
println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name);
if check_service(&serv.hostname, &serv.port).is_err() {
// println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name);
match serv.triggers.on_lost.as_str() {
"stay" => {
},
"stop" => {
match looped_service_connecting(&name, serv).await {
Ok(_) => {},
Err(_) => {
if looped_service_connecting(&name, serv).await.is_err() {
tx.send(5).await.unwrap();
break;
},
return;
}
},
"hold" => {
match looped_service_connecting(&name, serv).await {
Ok(_) => {},
Err(_) => {
if looped_service_connecting(&name, serv).await.is_err() {
tx.send(6).await.unwrap();
break;
},
}
},
_ => {
@ -361,15 +336,15 @@ async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<
return;
},
}
},
}
}
if is_frozen(name) {
tx.send(11).await.unwrap();
}
// if is_frozen(name) {
// tx.send(11).await.unwrap();
// }
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn looped_service_connecting(
name: &str,