monitor/src/main.rs

245 lines
6.9 KiB
Rust

use serde::{Deserialize, Serialize};
use serde_json;
use std::fmt::Debug;
use std::fs;
use std::path::Path;
use std::process::Command;
// to use in time-trigger
use tokio::time::{sleep, Duration};
// to store condition between asynchronous tasks
use tokio::sync::mpsc;
enum CustomError {
Fatal,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Processes {
#[serde(default)]
processes : Vec<TrackingProcess>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct TrackingProcess {
name : String,
path : String,
dependencies: Dependencies,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Dependencies {
#[serde(default)]
files : Vec<Files>,
#[serde(default)]
services: Vec<Services>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Files {
filename : String,
src : String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Services {
hostname : String,
port : u32,
triggers : ServiceTriggers,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ServiceTriggers {
wait : u32,
delay: u32,
}
#[tokio::main]
async fn main() {
let processes = load_processes("settings.json");
let mut error_counter = 0;
if processes.processes.len() == 0 {
eprintln!("Error: Processes list is null, runner-rs initialization is stopped");
return;
}
let mut handler: Vec<tokio::task::JoinHandle<()>> = vec![];
for proc in processes.processes.iter() {
println!("\nProcess '{}' on stage:\n{}\nDepends on {} file(s), {} service(s)\n",
proc.name,
proc.path,
proc.dependencies.files.len(),
proc.dependencies.services.len());
// creating msg channel
let (tx, mut rx) = mpsc::channel::<usize>(1);
for file in proc.dependencies.files.iter() {
if let Err(_) = check_file(&file.filename, &file.src) {
eprintln!("Error: Process {} cannot run without file {}{}", proc.name, file.src, file.filename);
error_counter += 1;
}
}
for ser in proc.dependencies.services.iter() {
if let Err(_) = check_service(&ser.hostname, &ser.port) {
eprintln!("Error: Process {} cannot run while service {}:{} is down", proc.name, ser.hostname, ser.port);
error_counter += 1;
}
}
if error_counter > 0 {
return;
}
let proc_clone = proc.clone();
let tx_clone = tx.clone();
let event = tokio::spawn(async move {
run_daemons(&proc_clone, tx_clone, &mut rx).await;
});
handler.push(event);
}
for i in handler {
i.await.unwrap();
}
return;
}
async fn run_daemons(
proc: &TrackingProcess,
tx: mpsc::Sender<usize>,
rx: &mut mpsc::Receiver<usize>
) {
let run_hand = running_handler(&proc.name, &proc.path, tx.clone());
let file_hand = file_handler(&proc.name,&proc.dependencies.files, tx.clone());
let serv_hand = service_handler(&proc.name, &proc.dependencies.services, tx.clone());
tokio::select! {
_ = run_hand => {},
_ = file_hand => {},
_ = serv_hand => {},
_ = rx.recv() => {
terminate_process(&proc.name).await;
println!("Dependency handling error: Terminating {} process ..." , &proc.name);
},
}
}
fn load_processes(json_filename: &str) -> Processes{
let read = fs::read_to_string(json_filename).expect(format!("Missing '{}' file. Cannot start runner", json_filename).as_str());
serde_json::from_str::<Processes>(&read).expect(format!("Parsing error in '{}' file. Cannot start runner", json_filename).as_str())
}
fn is_active(name: &str)-> bool {
let output = Command::new("pidof")
.arg(name)
.output()
.expect("Failed to execute command 'pidof'");
!String::from_utf8_lossy(&output.stdout).trim().is_empty()
}
async fn terminate_process (name: &str) {
let output = Command::new("pkill")
.arg(name)
.output()
.expect("Failed to execute command 'pkill'");
}
async fn start_process(name: &str, path: &str, tx: mpsc::Sender<usize>) -> Result<(), CustomError> {
let runsh = format!("{}{}", path, "/run.sh");
let mut command = Command::new("bash");
command.arg(runsh);
match command.spawn() {
Ok(mut child) => {
println!("Process {} is running now!", name);
Ok(())
},
Err(_) => {
let _ = tx.send(1).await;
return Err(CustomError::Fatal)
},
}
}
// check process status daemon
async fn running_handler(name: &str, path: &str, tx: mpsc::Sender<usize>){
loop {
let output = Command::new("pidof")
.arg(name)
.output()
.expect("Failed to execute command 'pidof'");
// is down
if !is_active(name) {
match start_process(name, path, tx.clone()).await {
Ok(_) => {},
Err(_) => {
tx.send(1).await.unwrap();
},
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn file_handler(name: &str, files: &Vec<Files>, tx: mpsc::Sender<usize>) {
loop {
for file in files {
if !is_active(name) {
break;
}
match check_file(&file.filename, &file.src) {
Ok(_) => {
// println!("{} is still in directory!", &file.filename);
},
Err(_) => {
tx.send(1).await.unwrap();
},
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
fn check_file(filename: &str, path: &str) -> Result<(), CustomError> {
let fileconcat = format!("{}{}", path, filename);
let path = Path::new(&fileconcat);
if path.exists() {
Ok(())
} else {
Err(CustomError::Fatal)
}
}
async fn service_handler(name: &str, services: &Vec<Services>, tx: mpsc::Sender<usize>) {
loop {
for serv in services {
if !is_active(name) {
break;
}
match check_service(&serv.hostname, &serv.port) {
Ok(_) => {
// println!("{} is up!", &serv.hostname);
},
Err(_) => {
tx.send(1).await.unwrap();
},
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
fn check_service(host: &str, port: &u32) -> Result<(), CustomError> {
let mut command = Command::new("bash");
command.args(["service-checker.sh", host, &port.to_string()]);
match command.output() {
Ok(output) => {
if output.status.success() {
return Ok(());
} else {
return Err(CustomError::Fatal);
}
},
Err(_) => {
return Err(CustomError::Fatal);
},
};
}