monitor/src/main.rs

435 lines
15 KiB
Rust

use serde::{Deserialize, Serialize};
use serde_json;
use tokio::join;
use std::fmt::Debug;
use std::fs;
use std::path::Path;
use std::process::{Command, Output};
use tokio::time::{Duration, Instant};
use tokio::sync::mpsc;
/// # an Error enum (nextly will be deleted and replaced)
enum CustomError {
Fatal,
}
/// # struct for the 1st level in json conf file
/// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Processes {
#[serde(default)]
processes : Vec<TrackingProcess>,
}
/// # struct for each process to contain info, such as name, path and dependencies
/// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)]
struct TrackingProcess {
name : String,
path : String,
dependencies: Dependencies,
}
/// # struct for processes' dependecies including files and services
/// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Dependencies {
#[serde(default)]
files : Vec<Files>,
#[serde(default)]
services: Vec<Services>,
}
/// # struct for containing file object with its triggers to manipulate in daemons
/// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Files {
filename : String,
src : String,
triggers : FIleTriggers,
}
/// # struct for containing service object with its triggers to manipulate in daemons
/// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Services {
hostname : String,
port : u32,
triggers : ServiceTriggers,
}
/// # struct for instancing each service's policies such as on lost or time to wait till reachable
/// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ServiceTriggers {
wait : u32,
delay: u32,
#[serde(rename="onLost")]
on_lost : String,
}
/// # struct for instancing each file's policies such as ondelete or onupdate events
/// > (needed in serialization and deserialization)
#[derive(Debug, Serialize, Deserialize, Clone)]
struct FIleTriggers {
#[serde(rename="onDelete")]
on_delete : String,
#[serde(rename="onChange")]
on_change : String,
}
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let processes = load_processes("settings.json");
// let mut error_counter = 0;
if processes.processes.len() == 0 {
eprintln!("Error: Processes list is null, runner-rs initialization is stopped");
return;
}
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
for proc in processes.processes.iter() {
println!("Process '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n",
proc.name,
proc.path,
proc.dependencies.files.len(),
proc.dependencies.services.len());
// creating msg channel
let (tx, mut rx) = mpsc::channel::<u8>(1);
let proc_clone = proc.clone();
let tx_clone = tx.clone();
let event = tokio::spawn(async move {
run_daemons(&proc_clone, tx_clone, &mut rx).await;
});
handler.push(event);
}
for i in handler {
i.await.unwrap();
}
return;
}
/// # async func to run 3 main daemons (now its more like tree-form than classiacl 0.1.0 form )
/// > hint : give mpsc with capacity 1 to jump over potential errors during running process
/// ** in [developing](https://github.com/prplV/runner-rs "REPOSITORY") **
async fn run_daemons(
proc: &TrackingProcess,
tx: mpsc::Sender<u8>,
rx: &mut mpsc::Receiver<u8>
)
{
loop {
let run_hand = running_handler(&proc, tx.clone());
tokio::select! {
_ = run_hand => {},
_val = rx.recv() => {
match _val.unwrap() {
// 1 - File-dependency handling error -> terminating (after waiting)
1 => {
if is_active(&proc.name) {
println!("Dependency handling error: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
}
// break;
},
// 2 - File-dependency handling error -> holding (after waiting)
2 => {
if !is_frozen(&proc.name) {
println!("Dependency handling error: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
}
},
// 3 - Running process error
3 => {
println!("Error due to starting {} process", &proc.name);
},
// 4 - Timeout of waiting service-dependency -> staying (after waiting)
4 => {
println!("Timeout of waiting service-dependency: Ignoring for {} process ..." , &proc.name);
},
// 5 - Timeout of waiting service-dependency -> terminating (after waiting)
5 => {
if is_active(&proc.name) {
println!("Timeout of waiting service-dependency: Terminating {} process ..." , &proc.name);
terminate_process(&proc.name).await;
}
// break;
},
// 6 - Timeout of waiting service-dependency -> holding (after waiting)
6 => {
if !is_frozen(&proc.name) {
println!("Timeout of waiting service-dependency: Freezing {} process ..." , &proc.name);
freeze_process(&proc.name).await;
}
},
// // 7 - File-dependency change -> terminating (after check)
// 7 => {},
// // 8 - File-dependency change -> restarting (after check)
// 8 => {},
// // 9 - File-dependency change -> staying (after check)
// 9 => {},
// 10 - Process unfreaze call via file handler
10 => {
if is_frozen(&proc.name) {
println!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
},
// 11 - Process unfreaze call via service handler
11 => {
if is_frozen(&proc.name) {
println!("Unfreezing process {} call...", &proc.name);
unfreeze_process(&proc.name).await;
}
},
// 101 - Impermissible trigger values in JSON
101 => {
println!("Impermissible trigger values in JSON");
break;
},
_ => {},
}
},
}
// tokio::task::yield_now().await;
}
}
fn load_processes(json_filename: &str) -> Processes{
let read = fs::read_to_string(json_filename).expect(format!("Missing '{}' file. Cannot start runner", json_filename).as_str());
serde_json::from_str::<Processes>(&read).expect(format!("Parsing error in '{}' file. Cannot start runner", json_filename).as_str())
}
fn get_pid(name: &str) -> Output {
Command::new("pidof")
.arg(name)
.output()
.expect("Failed to execute command 'pidof'")
}
fn is_active(name: &str)-> bool {
let output = Command::new("pidof")
.arg(name)
.output()
.expect("Failed to execute command 'pidof'");
!String::from_utf8_lossy(&output.stdout).trim().is_empty()
}
fn is_frozen(name: &str) -> bool {
let temp = get_pid(name);
let pid = String::from_utf8_lossy(&temp.stdout);
if pid.trim().is_empty(){
return false;
} else {
let cmd = Command::new("ps")
.args(["-o", "stat=", "-p", pid.trim()])
.output()
.expect("Failed to execute ps command");
return !(String::from_utf8_lossy(&cmd.stdout) == "Sl+\n");
}
}
async fn terminate_process (name: &str) {
let _ = Command::new("pkill")
.arg(name)
.output()
.expect("Failed to execute command 'pkill'");
}
async fn freeze_process(name: &str) {
// let pid = get_pid(name);
let _ = Command::new("pkill")
.args(["-STOP", name])
.output()
.expect("Failed to freeze process");
}
async fn unfreeze_process(name: &str) {
// let pid = get_pid(name);
let _ = Command::new("pkill")
.args(["-CONT", name])
.output()
.expect("Failed to unfreeze process");
}
async fn start_process(name: &str, path: &str) -> Result<(), CustomError> {
let runsh = format!("{}{}", path, "/run.sh");
let mut command = Command::new("bash");
command.arg(runsh);
match command.spawn() {
Ok(_) => {
println!("Process {} is running now!", name);
Ok(())
},
Err(_) => {
return Err(CustomError::Fatal)
},
}
}
// check process status daemon
async fn running_handler(prc: &TrackingProcess, tx: mpsc::Sender<u8>){
// println!("running daemon on {}", prc.name);
// services and files check (once)
let files_check = file_handler(&prc.name, &prc.dependencies.files, tx.clone());
let services_check = service_handler(&prc.name, &prc.dependencies.services, tx.clone());
let res = join!(files_check, services_check);
// if inactive -> spawn checks -> active is true
if !is_active(&prc.name) && res.0.is_ok() && res.1.is_ok(){
if start_process(&prc.name, &prc.path).await.is_err() {
tx.send(3).await.unwrap();
return;
}
}
// if frozen -> spawn checks -> unfreeze is true
else if is_frozen(&prc.name) && res.0.is_ok() && res.1.is_ok(){
tx.send(10).await.unwrap();
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
}
async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<u8>) -> Result<(), CustomError>{
// println!("file daemon on {}", name);
for file in files {
if check_file(&file.filename, &file.src).is_err() {
if !is_active(name) || is_frozen(name) {
return Err(CustomError::Fatal);
}
match file.triggers.on_delete.as_str() {
"stay" => {
continue;
},
"stop" => {
if is_active(name) {
tx.send(1).await.unwrap();
}
return Err(CustomError::Fatal);
},
"hold" => {
if is_active(name) {
tx.send(2).await.unwrap();
return Err(CustomError::Fatal);
}
},
_ => {
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(101).await.unwrap();
return Err(CustomError::Fatal);
},
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
Ok(())
}
fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let fileconcat = format!("{}{}", path, filename);
let path = Path::new(&fileconcat);
if path.exists() {
Ok(())
} else {
Err(CustomError::Fatal)
}
}
async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<u8>) -> Result<(), CustomError> {
// println!("service daemon on {}", name);
for serv in services {
if check_service(&serv.hostname, &serv.port).await.is_err() {
if !is_active(name) || is_frozen(name) {
return Err(CustomError::Fatal);
}
println!("Service {}:{} is unreachable for process {}", &serv.hostname, &serv.port, &name);
match serv.triggers.on_lost.as_str() {
"stay" => {
},
"stop" => {
if looped_service_connecting(name, serv).await.is_err() {
tx.send(5).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
return Err(CustomError::Fatal);
}
},
"hold" => {
if is_frozen(name) {
return Err(CustomError::Fatal);
}
if looped_service_connecting(name, serv).await.is_err() {
tx.send(6).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
return Err(CustomError::Fatal);
}
},
_ => {
tx.send(101).await.unwrap();
return Err(CustomError::Fatal);
},
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
Ok(())
}
async fn looped_service_connecting(
name: &str,
serv: &Services
) -> Result<(), CustomError>
{
if serv.triggers.wait == 0 {
loop {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
println!("SUCCESS!");
break;
},
Err(_) => {
continue;
},
}
}
return Ok(());
} else {
let start = Instant::now();
while start.elapsed().as_secs() < serv.triggers.wait.into() {
tokio::time::sleep(Duration::from_secs(serv.triggers.delay.into())).await;
println!("Attempting to connect from {} process to {}:{}",&name, &serv.hostname, &serv.port );
match check_service(&serv.hostname, &serv.port).await {
Ok(_) => {
println!("SUCCESS!");
return Ok(());
},
Err(_) => {
continue;
},
}
}
return Err(CustomError::Fatal);
}
}
async fn check_service(host: &str, port: &u32) -> Result<(), CustomError> {
let mut command = Command::new("bash");
command.args(["service-checker.sh", host, &port.to_string()]);
match command.output() {
Ok(output) => {
if output.status.success() {
return Ok(());
} else {
return Err(CustomError::Fatal);
}
},
Err(_) => {
return Err(CustomError::Fatal);
},
};
}