new methadology (looks like more correct)

pull/9/head
prplV 2024-07-04 05:03:52 -04:00
parent b6002a171f
commit 26f5a1f543
2 changed files with 113 additions and 57 deletions

View File

@ -17,7 +17,7 @@
"hostname" : "ya.ru", "hostname" : "ya.ru",
"port" : 443, "port" : 443,
"triggers" : { "triggers" : {
"wait" : 200, "wait" : 12,
"delay" : 3 "delay" : 3
} }
}] }]
@ -36,8 +36,8 @@
"hostname" : "localhost", "hostname" : "localhost",
"port" : 8080, "port" : 8080,
"triggers" : { "triggers" : {
"wait" : 200, "wait" : 21,
"delay" : 3 "delay" : 1
} }
}] }]
} }

View File

@ -5,7 +5,7 @@ use std::fs;
use std::path::Path; use std::path::Path;
use std::process::Command; use std::process::Command;
// to use in time-trigger // to use in time-trigger
use tokio::time::{sleep, Duration}; use tokio::time::{Duration, Instant};
// to store condition between asynchronous tasks // to store condition between asynchronous tasks
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -72,25 +72,25 @@ async fn main() {
proc.dependencies.services.len()); proc.dependencies.services.len());
// creating msg channel // creating msg channel
let (tx, mut rx) = mpsc::channel::<usize>(1); let (tx, mut rx) = mpsc::channel::<u8>(1);
for file in proc.dependencies.files.iter() { // for file in proc.dependencies.files.iter() {
if let Err(_) = check_file(&file.filename, &file.src) { // if let Err(_) = check_file(&file.filename, &file.src) {
eprintln!("Error: Process {} cannot run without file {}{}", proc.name, file.src, file.filename); // eprintln!("Error: Process {} cannot run without file {}{}", proc.name, file.src, file.filename);
error_counter += 1; // error_counter += 1;
} // }
} // }
for ser in proc.dependencies.services.iter() { // for ser in proc.dependencies.services.iter() {
if let Err(_) = check_service(&ser.hostname, &ser.port) { // if let Err(_) = check_service(&ser.hostname, &ser.port) {
eprintln!("Error: Process {} cannot run while service {}:{} is down", proc.name, ser.hostname, ser.port); // eprintln!("Error: Process {} cannot run while service {}:{} is down", proc.name, ser.hostname, ser.port);
error_counter += 1; // error_counter += 1;
} // }
} // }
if error_counter > 0 { // if error_counter > 0 {
return; // continue;
} // }
let proc_clone = proc.clone(); let proc_clone = proc.clone();
let tx_clone = tx.clone(); let tx_clone = tx.clone();
@ -107,8 +107,8 @@ async fn main() {
async fn run_daemons( async fn run_daemons(
proc: &TrackingProcess, proc: &TrackingProcess,
tx: mpsc::Sender<usize>, tx: mpsc::Sender<u8>,
rx: &mut mpsc::Receiver<usize> rx: &mut mpsc::Receiver<u8>
) { ) {
let run_hand = running_handler(&proc.name, &proc.path, tx.clone()); let run_hand = running_handler(&proc.name, &proc.path, tx.clone());
@ -119,9 +119,21 @@ async fn run_daemons(
_ = run_hand => {}, _ = run_hand => {},
_ = file_hand => {}, _ = file_hand => {},
_ = serv_hand => {}, _ = serv_hand => {},
_ = rx.recv() => { _val = rx.recv() => {
terminate_process(&proc.name).await; match _val.unwrap() {
println!("Dependency handling error: Terminating {} process ..." , &proc.name); 1 => {
terminate_process(&proc.name).await;
println!("Dependency handling error: Terminating {} process ..." , &proc.name);
},
2 => {
println!("Error due to starting {} process", &proc.name);
},
3 => {
terminate_process(&proc.name).await;
println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
},
_ => {},
}
}, },
} }
} }
@ -138,18 +150,18 @@ fn is_active(name: &str)-> bool {
!String::from_utf8_lossy(&output.stdout).trim().is_empty() !String::from_utf8_lossy(&output.stdout).trim().is_empty()
} }
async fn terminate_process (name: &str) { async fn terminate_process (name: &str) {
let output = Command::new("pkill") let _ = Command::new("pkill")
.arg(name) .arg(name)
.output() .output()
.expect("Failed to execute command 'pkill'"); .expect("Failed to execute command 'pkill'");
} }
async fn start_process(name: &str, path: &str, tx: mpsc::Sender<usize>) -> Result<(), CustomError> { async fn start_process(name: &str, path: &str, tx: mpsc::Sender<u8>) -> Result<(), CustomError> {
let runsh = format!("{}{}", path, "/run.sh"); let runsh = format!("{}{}", path, "/run.sh");
let mut command = Command::new("bash"); let mut command = Command::new("bash");
command.arg(runsh); command.arg(runsh);
match command.spawn() { match command.spawn() {
Ok(mut child) => { Ok(_) => {
println!("Process {} is running now!", name); println!("Process {} is running now!", name);
Ok(()) Ok(())
}, },
@ -160,19 +172,19 @@ async fn start_process(name: &str, path: &str, tx: mpsc::Sender<usize>) -> Resul
} }
} }
// check process status daemon // check process status daemon
async fn running_handler(name: &str, path: &str, tx: mpsc::Sender<usize>){ async fn running_handler(name: &str, path: &str, tx: mpsc::Sender<u8>){
loop { loop {
let output = Command::new("pidof") // println!("running daemon on {}", name);
let _ = Command::new("pidof")
.arg(name) .arg(name)
.output() .output()
.expect("Failed to execute command 'pidof'"); .expect("Failed to execute command 'pidof'");
// is down
if !is_active(name) { if !is_active(name) {
match start_process(name, path, tx.clone()).await { match start_process(name, path, tx.clone()).await {
Ok(_) => {}, Ok(_) => {},
Err(_) => { Err(_) => {
tx.send(1).await.unwrap(); tx.send(2).await.unwrap();
}, },
} }
} }
@ -180,19 +192,17 @@ async fn running_handler(name: &str, path: &str, tx: mpsc::Sender<usize>){
} }
} }
async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<usize>) { async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<u8>) {
loop { loop {
for file in files { // println!("file daemon on {}", name);
if !is_active(name) { if is_active(name) {
break; for file in files {
} match check_file(&file.filename, &file.src) {
match check_file(&file.filename, &file.src) { Ok(_) => {},
Ok(_) => { Err(_) => {
// println!("{} is still in directory!", &file.filename); tx.send(1).await.unwrap();
}, },
Err(_) => { }
tx.send(1).await.unwrap();
},
} }
} }
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
@ -208,24 +218,70 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
} }
} }
async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<usize>) { // ??
loop { async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<u8>) {
for serv in services { 'main_loop: loop {
if !is_active(name) { // println!("service daemon on {}", name);
break; if is_active(name) {
} for serv in services {
match check_service(&serv.hostname, &serv.port) { match check_service(&serv.hostname, &serv.port) {
Ok(_) => { Ok(_) => {
// println!("{} is up!", &serv.hostname); // println!("{} is up!", &serv.hostname);
}, },
Err(_) => { Err(_) => {
tx.send(1).await.unwrap(); match looped_service_connecting(&name, serv).await {
}, Ok(_) => {
continue;
},
Err(_) => {
tx.send(3).await.unwrap();
},
}
// tx.send(3).await.unwrap();
},
}
} }
} }
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
} }
async fn looped_service_connecting(
name: &str,
serv: &Services
) -> Result<(), CustomError>
{
if serv.triggers.wait == 0 {
loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
match check_service(&serv.hostname, &serv.port) {
Ok(_) => {
break;
},
Err(_) => {
continue;
},
}
}
return Ok(());
} else {
let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
match check_service(&serv.hostname, &serv.port) {
Ok(_) => {
return Ok(());
},
Err(_) => {
continue;
},
}
}
return Err(CustomError::Fatal);
}
}
fn check_service(host: &str, port: &u32) -> Result<(), CustomError> { fn check_service(host: &str, port: &u32) -> Result<(), CustomError> {
let mut command = Command::new("bash"); let mut command = Command::new("bash");
command.args(["service-checker.sh", host, &port.to_string()]); command.args(["service-checker.sh", host, &port.to_string()]);