bug: freezing and unfreezing (f.e. freezing in file hand => 100% unfreezing in service hand)

pull/9/head
prplV 2024-07-04 09:28:30 -04:00
parent 38b9d77391
commit 70443881a1
2 changed files with 188 additions and 26 deletions

View File

@ -6,11 +6,19 @@
"files" : [
{
"filename" : "control-file",
"src" : "/home/vladislav/web/"
"src" : "/home/vladislav/web/",
"triggers" : {
"onDelete" : "hold",
"onChange" : "hold"
}
},
{
"filename" : "config-file",
"src" : "/home/vladislav/web/"
"src" : "/home/vladislav/web/",
"triggers" : {
"onDelete" : "hold",
"onChange" : "hold"
}
}
],
"services" : [{
@ -18,7 +26,8 @@
"port" : 443,
"triggers" : {
"wait" : 6,
"delay" : 1
"delay" : 1,
"onLost" : "stop"
}
}]
}
@ -30,14 +39,27 @@
"files" : [
{
"filename" : "control-file",
"src" : "/home/vladislav/web/"
"src" : "/home/vladislav/web/",
"triggers" : {
"onDelete" : "hold",
"onChange" : "hold"
}
}],
"services" : [{
"hostname" : "ya.ru",
"hostname" : "google.com",
"port" : 443,
"triggers" : {
"wait" : 14,
"delay" : 1
"delay" : 1,
"onLost" : "hold"
}
}, {
"hostname" : "localhost",
"port" : 8080,
"triggers" : {
"wait" : 20,
"delay" : 5,
"onLost" : "stop"
}
}]
}

View File

@ -3,7 +3,7 @@ use serde_json;
use std::fmt::Debug;
use std::fs;
use std::path::Path;
use std::process::Command;
use std::process::{Command, Output};
// to use in time-trigger
use tokio::time::{Duration, Instant};
// to store condition between asynchronous tasks
@ -38,6 +38,7 @@ struct Dependencies {
struct Files {
filename : String,
src : String,
triggers : FIleTriggers,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -47,10 +48,21 @@ struct Services {
triggers : ServiceTriggers,
}
// policy
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ServiceTriggers {
wait : u32,
delay: u32,
#[serde(rename="onLost")]
on_lost : String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct FIleTriggers {
#[serde(rename="onDelete")]
on_delete : String,
#[serde(rename="onChange")]
on_change : String,
}
#[tokio::main]
@ -122,16 +134,56 @@ async fn run_daemons(
_ = serv_hand => {},
_val = rx.recv() => {
match _val.unwrap() {
// 1 - File-dependency handling error -> terminating (after waiting)
1 => {
println!("Dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
break;
},
// 2 - File-dependency handling error -> holding (after waiting)
2 => {
println!("Error due to starting {} process", &proc.name);
},
// 3 - Running process error
3 => {
println!("Error due to starting {} process", &proc.name);
},
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
4 => {
println!("Timeout of waiting service-dependency: Ignoring for {} process ..." , &proc.name);
},
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => {
println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
break;
},
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => {
println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
},
// // 7 - File-dependency change -> terminating (after check)
// 7 => {},
// // 8 - File-dependency change -> restarting (after check)
// 8 => {},
// // 9 - File-dependency change -> staying (after check)
// 9 => {},
// 10 - Process unfreaze call via file handler
10 => {
println!("Unfreezing process {} call via file handler...", &proc.name);
unfreeze_process(&proc.name).await;
},
// 11 - Process unfreaze call via service handler
11 => {
println!("Unfreezing process {} call via service handler...", &proc.name);
unfreeze_process(&proc.name).await;
},
// 101 - Impermissible trigger values in JSON
101 => {
println!("Impermissible trigger values in JSON");
break;
},
_ => {},
}
@ -145,6 +197,13 @@ fn load_processes(json_filename: &str) -> Processes{
let read = fs::read_to_string(json_filename).expect(format!("Missing '{}' file. Cannot start runner", json_filename).as_str());
serde_json::from_str::<Processes>(&read).expect(format!("Parsing error in '{}' file. Cannot start runner", json_filename).as_str())
}
fn get_pid(name: &str) -> Output {
Command::new("pidof")
.arg(name)
.output()
.expect("Failed to execute command 'pidof'")
}
fn is_active(name: &str)-> bool {
let output = Command::new("pidof")
.arg(name)
@ -152,13 +211,41 @@ fn is_active(name: &str)-> bool {
.expect("Failed to execute command 'pidof'");
!String::from_utf8_lossy(&output.stdout).trim().is_empty()
}
fn is_frozen(name: &str) -> bool {
let temp = get_pid(name);
let pid = String::from_utf8_lossy(&temp.stdout);
if pid.trim().is_empty(){
return false;
} else {
let cmd = Command::new("ps")
.args(["-o", "stat=", "-p", pid.trim()])
.output()
.expect("Failed to execute ps command");
return !(String::from_utf8_lossy(&cmd.stdout) == "Sl+\n");
}
}
async fn terminate_process (name: &str) {
let _ = Command::new("pkill")
.arg(name)
.output()
.expect("Failed to execute command 'pkill'");
}
async fn start_process(name: &str, path: &str, tx: mpsc::Sender<u8>) -> Result<(), CustomError> {
async fn freeze_process(name: &str) {
// let pid = get_pid(name);
let _ = Command::new("pkill")
.args(["-STOP", name])
.output()
.expect("Failed to freeze process");
}
async fn unfreeze_process(name: &str) {
// let pid = get_pid(name);
let _ = Command::new("pkill")
.args(["-CONT", name])
.output()
.expect("Failed to unfreeze process");
}
async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
let runsh = format!("{}{}", path, "/run.sh");
let mut command = Command::new("bash");
command.arg(runsh);
@ -169,7 +256,6 @@ async fn start_process(name: &str, path: &str, tx: mpsc::Sender<u8>) -> Result<(
Ok(())
},
Err(_) => {
let _ = tx.send(1).await;
return Err(CustomError::Fatal)
},
}
@ -177,36 +263,61 @@ async fn start_process(name: &str, path: &str, tx: mpsc::Sender<u8>) -> Result<(
// check process status daemon
async fn running_handler(name: &str, path: &str, tx: mpsc::Sender<u8>){
loop {
println!("running daemon on {}", name);
// println!("running daemon on {}", name);
let _ = Command::new("pidof")
.arg(name)
.output()
.expect("Failed to execute command 'pidof'");
if !is_active(name) {
match start_process(name, path, tx.clone()).await {
if !is_active(name) && !is_frozen(name){
match start_process(name, path).await {
Ok(_) => {},
Err(_) => {
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
},
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<u8>) {
loop {
println!("file daemon on {}", name);
// println!("file daemon on {}", name);
if is_active(name) {
let mut er = 0;
for file in files {
match check_file(&file.filename, &file.src) {
Ok(_) => {},
Ok(_) => {
continue;
},
Err(_) => {
tx.send(1).await.unwrap();
println!("{}", file.triggers.on_delete.as_str());
match file.triggers.on_delete.as_str() {
"stay" => {
continue;
},
"stop" => {
er += 1;
tx.send(1).await.unwrap();
break;
},
"hold" => {
er += 1;
tx.send(2).await.unwrap();
break;
},
_ => {
tx.send(101).await.unwrap();
return;
},
}
},
}
}
if is_frozen(name) && er == 0 {
tx.send(10).await.unwrap();
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
@ -224,26 +335,55 @@ fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
// ??
async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<u8>) {
loop {
println!("service daemon on {}", name);
// println!("service daemon on {}", name);
if is_active(name) {
for serv in services {
let mut er = 0;
'for_loop: for serv in services {
match check_service(&serv.hostname, &serv.port) {
Ok(_) => {
// println!("{} is up!", &serv.hostname);
continue;
},
Err(_) => {
match looped_service_connecting(&name, serv).await {
Ok(_) => {
println!("Service {}:{} is inreachable for process {}", &serv.hostname, &serv.port, &name);
match serv.triggers.on_lost.as_str() {
"stay" => {
continue;
},
Err(_) => {
tx.send(3).await.unwrap();
},
"stop" => {
match looped_service_connecting(&name, serv).await {
Ok(_) => {
continue 'for_loop;
},
Err(_) => {
tx.send(5).await.unwrap();
er += 1;
return;
},
}
},
"hold" => {
match looped_service_connecting(&name, serv).await {
Ok(_) => {
continue 'for_loop;
},
Err(_) => {
tx.send(6).await.unwrap();
er += 1;
break 'for_loop;
},
}
},
_ => {
tx.send(101).await.unwrap();
return;
},
}
// tx.send(3).await.unwrap();
},
}
}
if is_frozen(name) && er == 0 {
tx.send(11).await.unwrap();
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}