monitor/src/utils.rs

176 lines
7.5 KiB
Rust

use std::sync::Arc;
use crate::structs::TrackingProcess;
use tokio::sync::mpsc;
use inotify::Inotify;
use std::process::Command;
use crate::files::create_watcher;
use log::{error, warn};
use crate::prcs::{
is_active,
is_frozen,
terminate_process,
restart_process,
freeze_process,
unfreeze_process,
start_process
};
use tokio::time::Duration;
use tokio::join;
use crate::files::file_handler;
use crate::services::service_handler;
static GET_ID_CMD : &'static str = r"cat /proc/self/mountinfo | grep '/docker/containers/' | head -1 | awk -F '/' '{print \$6}'";
/// # async func to run 3 main daemons (now it's more like tree-form than classical 0.1.0 form )
/// > hint : give mpsc with capacity 1 to jump over potential errors during running process
/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") **
pub async fn run_daemons(
proc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>,
rx: &mut mpsc::Receiver<u8>
)
{
// creating watchers + ---buffers---
let mut watchers: Vec<Inotify> = vec![];
for file in proc.dependencies.files.clone().into_iter() {
watchers.push(create_watcher(&file.filename, &file.src).await.unwrap());
}
let watchers_clone: Arc<tokio::sync::Mutex<Vec<Inotify>>> = Arc::new(tokio::sync::Mutex::new(watchers));
loop {
let run_hand = running_handler(proc.clone(), tx.clone(), watchers_clone.clone());
tokio::select! {
_ = run_hand => {},
_val = rx.recv() => {
match _val.unwrap() {
// 1 - File-dependency handling error -> terminating (after waiting)
1 => {
if is_active(&proc.name).await {
error!("Dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// break;
},
// 2 - File-dependency handling error -> holding (after waiting)
2 => {
if !is_frozen(&proc.name).await {
error!("Dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
},
// 3 - Running process error
3 => {
error!("Error due to starting {} process", &proc.name);
},
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
4 => {
warn!("Timeout of waiting service-dependency: Ignoring on {} process ..." , &proc.name);
},
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => {
if is_active(&proc.name).await {
error!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// break;
},
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => {
if !is_frozen(&proc.name).await {
error!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
},
// // 7 - File-dependency change -> terminating (after check)
7 => {
error!("File-dependency warning (file changed). Terminating {} process...", &proc.name);
terminate_process(&proc.name).await;
tokio::time::sleep(Duration::from_millis(100)).await;
},
// // 8 - File-dependency change -> restarting (after check)
8 => {
warn!("File-dependency warning (file changed). Restarting {} process...", &proc.name);
let _ = restart_process(&proc.name, &proc.path).await;
tokio::time::sleep(Duration::from_millis(100)).await;
},
// // 9 - File-dependency change -> staying (after check)
9 => {
warn!("File-dependency warning (file changed). Ignoring on {} process...", &proc.name);
},
// 10 - Process unfreaze call via file handler
10 => {
if is_frozen(&proc.name).await {
warn!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
},
// 11 - Process unfreaze call via service handler
11 => {
if is_frozen(&proc.name).await {
warn!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
},
// 101 - Impermissible trigger values in JSON
101 => {
error!("Impermissible trigger values in JSON");
if is_active(&proc.name).await {
terminate_process(&proc.name).await;
}
break;
},
_ => {},
}
},
}
// tokio::task::yield_now().await;
}
}
// check process status daemon
pub async fn running_handler
(
prc: Arc<TrackingProcess>,
tx: Arc<mpsc::Sender<u8>>,
watchers: Arc<tokio::sync::Mutex<Vec<Inotify>>>
)
{
// println!("running daemon on {}", prc.name);
// services and files check (once)
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone(), watchers.clone());
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
let res = join!(files_check, services_check);
// if inactive -> spawn checks -> active is true
if !is_active(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap();
return;
}
}
// if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name).await && res.0.is_ok() && res.1.is_ok(){
tx.send(10).await.unwrap();
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
}
// todo: cmd across cat /proc/self/mountinfo | grep "/docker/containers/" | head -1 | awk -F '/' '{print $5}'
pub fn get_container_id() -> Option<String> {
match Command::new("sh -c").arg(GET_ID_CMD).output() {
Ok(output) => {
Some(String::from_utf8_lossy(&output.stdout).to_string())
},
Err(_) => {
None
},
}
}