Merge pull request #2 from prplV/feature/main-running-handler

merging feature/main-running-handler with master
pull/9/head
Vladislav Drozdov 2024-07-08 06:36:34 -04:00 committed by GitHub
commit 379eb97769
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 106 additions and 70 deletions

View File

@ -41,7 +41,7 @@
"filename" : "control-file", "filename" : "control-file",
"src" : "/home/vladislav/web/", "src" : "/home/vladislav/web/",
"triggers" : { "triggers" : {
"onDelete" : "stop", "onDelete" : "hold",
"onChange" : "hold" "onChange" : "hold"
} }
}], }],
@ -57,9 +57,9 @@
"hostname" : "localhost", "hostname" : "localhost",
"port" : 8080, "port" : 8080,
"triggers" : { "triggers" : {
"wait" : 20, "wait" : 10,
"delay" : 5, "delay" : 2,
"onLost" : "stop" "onLost" : "hold"
} }
}] }]
} }

View File

@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json; use serde_json;
use tokio::io::Join; use tokio::io::Join;
use tokio::join;
use std::env::join_paths; use std::env::join_paths;
use std::fmt::Debug; use std::fmt::Debug;
use std::fs; use std::fs;
@ -66,7 +67,7 @@ struct FIleTriggers {
on_change : String, on_change : String,
} }
#[tokio::main] #[tokio::main(flavor = "multi_thread")]
async fn main() { async fn main() {
let processes = load_processes("settings.json"); let processes = load_processes("settings.json");
// let mut error_counter = 0; // let mut error_counter = 0;
@ -78,7 +79,7 @@ async fn main() {
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![]; let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
for proc in processes.processes.iter() { for proc in processes.processes.iter() {
println!("\nProcess '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n", println!("Process '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n",
proc.name, proc.name,
proc.path, proc.path,
proc.dependencies.files.len(), proc.dependencies.files.len(),
@ -108,25 +109,30 @@ async fn run_daemons(
) )
{ {
loop { loop {
let run_hand = running_handler(&proc.name, &proc.path, tx.clone()); let run_hand = running_handler(&proc, tx.clone());
let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone()); // let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone());
let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone()); // let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone());
tokio::select! { tokio::select! {
_ = run_hand => {}, _ = run_hand => {},
_ = file_hand => {}, // _ = file_hand => {},
_ = serv_hand => {}, // _ = serv_hand => {},
_val = rx.recv() => { _val = rx.recv() => {
match _val.unwrap() { match _val.unwrap() {
// 1 - File-dependency handling error -> terminating (after waiting) // 1 - File-dependency handling error -> terminating (after waiting)
1 => { 1 => {
println!("Dependency handling error: Terminating {} process ..." , &proc.name); if is_active(&proc.name) {
terminate_process(&proc.name).await; println!("Dependency handling error: Terminating {} process ..." , &proc.name);
break; terminate_process(&proc.name).await;
}
// break;
}, },
// 2 - File-dependency handling error -> holding (after waiting) // 2 - File-dependency handling error -> holding (after waiting)
2 => { 2 => {
println!("Error due to starting {} process", &proc.name); if !is_frozen(&proc.name) {
println!("Dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
}
}, },
// 3 - Running process error // 3 - Running process error
3 => { 3 => {
@ -138,14 +144,18 @@ async fn run_daemons(
}, },
// 5 - Timeout of waiting service-dependency -> terminating (after waiting) // 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => { 5 => {
println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name); if is_active(&proc.name) {
terminate_process(&proc.name).await; println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
break; terminate_process(&proc.name).await;
}
// break;
}, },
// 6 - Timeout of waiting service-dependency -> holding (after waiting) // 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => { 6 => {
println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name); if !is_frozen(&proc.name) {
freeze_process(&proc.name).await; println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
}
}, },
// // 7 - File-dependency change -> terminating (after check) // // 7 - File-dependency change -> terminating (after check)
// 7 => {}, // 7 => {},
@ -156,13 +166,17 @@ async fn run_daemons(
// 10 - Process unfreaze call via file handler // 10 - Process unfreaze call via file handler
10 => { 10 => {
println!("Unfreezing process {} call via file handler...", &proc.name); if is_frozen(&proc.name) {
unfreeze_process(&proc.name).await; println!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
}, },
// 11 - Process unfreaze call via service handler // 11 - Process unfreaze call via service handler
11 => { 11 => {
println!("Unfreezing process {} call via service handler...", &proc.name); if is_frozen(&proc.name) {
unfreeze_process(&proc.name).await; println!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
}, },
// 101 - Impermissible trigger values in JSON // 101 - Impermissible trigger values in JSON
101 => { 101 => {
@ -246,58 +260,69 @@ async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
} }
} }
// check process status daemon // check process status daemon
async fn running_handler(name: &str, path: &str, tx: mpsc::Sender<u8>){ async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender<u8>){
println!("running daemon on {}", name); // println!("running daemon on {}", prc.name);
let _ = Command::new("pidof") // loop {
.arg(name) // let _ = Command::new("pidof")
.output() // .arg(&prc.name)
.expect("Failed to execute command 'pidof'"); // .output()
// .expect("Failed to execute command 'pidof'");
// services and files check (once)
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone());
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
if !is_active(name) && !is_frozen(name){ let res = join!(files_check, services_check);
if start_process(name, path).await.is_err() { // if inactive -> spawn checks -> active is true
if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){
if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap(); tx.send(3).await.unwrap();
// return; return;
} }
} }
// if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){
tx.send(10).await.unwrap();
return;
}
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
// }
} }
async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<u8>) { async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<u8>) -> Result<(), CustomError>{
loop { // println!("file daemon on {}", name);
println!("file daemon on {}", name);
if is_active(name) && !is_frozen(name){
for file in files { for file in files {
if check_file(&file.filename, &file.src).is_err() { if check_file(&file.filename, &file.src).is_err() {
if !is_active(name) || is_frozen(name) {
return Err(CustomError::Fatal);
}
match file.triggers.on_delete.as_str() { match file.triggers.on_delete.as_str() {
"stay" => { "stay" => {
continue; continue;
}, },
"stop" => { "stop" => {
tx.send(1).await.unwrap(); if is_active(name) {
return; tx.send(1).await.unwrap();
}
return Err(CustomError::Fatal);
}, },
"hold" => { "hold" => {
tx.send(2).await.unwrap(); if is_active(name) {
// continue; tx.send(2).await.unwrap();
// while check_file(&file.filename, &file.src).is_err() { return Err(CustomError::Fatal);
// println!("zov"); }
// tokio::time::sleep(Duration::from_millis(100)).await;
// }
// tx.send(10).await.unwrap();
}, },
_ => { _ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap(); tx.send(101).await.unwrap();
return; return Err(CustomError::Fatal);
}, },
} }
} }
} }
// if is_frozen(name) {
// tx.send(10).await.unwrap();
// }
}
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} tokio::task::yield_now().await;
Ok(())
} }
fn check_file(filename: &str, path: &str) -> Result<(), CustomError> { fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let fileconcat = format!("{}{}", path, filename); let fileconcat = format!("{}{}", path, filename);
@ -310,40 +335,48 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
} }
// ?? // ??
async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<u8>) { async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<u8>) -> Result<(), CustomError> {
loop { // println!("service daemon on {}", name);
println!("service daemon on {}", name); // let state = is_active(name);
if is_active(name) && !is_frozen(name) { // let condition = is_frozen(name);
for serv in services { for serv in services {
if check_service(&serv.hostname, &serv.port).is_err() { if check_service(&serv.hostname, &serv.port).await.is_err() {
// println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name); if !is_active(name) || is_frozen(name) {
return Err(CustomError::Fatal);
}
println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name);
match serv.triggers.on_lost.as_str() { match serv.triggers.on_lost.as_str() {
"stay" => { "stay" => {
}, },
"stop" => { "stop" => {
if looped_service_connecting(&name, serv).await.is_err() { if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap(); tx.send(5).await.unwrap();
return; tokio::time::sleep(Duration::from_millis(100)).await;
return Err(CustomError::Fatal);
} }
}, },
"hold" => { "hold" => {
if looped_service_connecting(&name, serv).await.is_err() { if is_frozen(name) {
return Err(CustomError::Fatal);
}
if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap(); tx.send(6).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
return Err(CustomError::Fatal);
} }
}, },
_ => { _ => {
tx.send(101).await.unwrap(); tx.send(101).await.unwrap();
return; return Err(CustomError::Fatal);
}, },
} }
} }
} }
// if is_frozen(name) {
// tx.send(11).await.unwrap();
// }
}
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} tokio::task::yield_now().await;
Ok(())
} }
async fn looped_service_connecting( async fn looped_service_connecting(
@ -351,12 +384,14 @@ async fn looped_service_connecting(
serv: &Services serv: &Services
) -> Result<(), CustomError> ) -> Result<(), CustomError>
{ {
if serv.triggers.wait == 0 { if serv.triggers.wait == 0 {
loop { loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
match check_service(&serv.hostname, &serv.port) { match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { Ok(_) => {
println!("SUCCESS!");
break; break;
}, },
Err(_) => { Err(_) => {
@ -370,8 +405,9 @@ async fn looped_service_connecting(
while start.elapsed().as_secs() < serv.triggers.wait.into() { while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await; tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port ); println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
match check_service(&serv.hostname, &serv.port) { match check_service(&serv.hostname, &serv.port).await {
Ok(_) => { Ok(_) => {
println!("SUCCESS!");
return Ok(()); return Ok(());
}, },
Err(_) => { Err(_) => {
@ -382,7 +418,7 @@ async fn looped_service_connecting(
return Err(CustomError::Fatal); return Err(CustomError::Fatal);
} }
} }
fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { async fn check_service(host: &str, port: &u32) -> Result<(), CustomError> {
let mut command = Command::new("bash"); let mut command = Command::new("bash");
command.args(["service-checker.sh", host, &port.to_string()]); command.args(["service-checker.sh", host, &port.to_string()]);